]> git.r.bdr.sh - rbdr/mobius/blame - hotline/server.go
Add initial HTTP API endpoints
[rbdr/mobius] / hotline / server.go
CommitLineData
6988a057
JH
1package hotline
2
3import (
7cd900d6 4 "bufio"
5cc444c8 5 "bytes"
6988a057 6 "context"
f8e4cd54 7 "crypto/rand"
6988a057
JH
8 "encoding/binary"
9 "errors"
10 "fmt"
2e1aec0f 11 "golang.org/x/text/encoding/charmap"
6988a057 12 "io"
a6216dd8
JH
13 "log"
14 "log/slog"
6988a057 15 "net"
b6e3be94 16 "os"
6988a057
JH
17 "strings"
18 "sync"
19 "time"
6988a057
JH
20)
21
7cd900d6
JH
22type contextKey string
23
24var contextKeyReq = contextKey("req")
25
26type requestCtx struct {
27 remoteAddr string
7cd900d6
JH
28}
29
2e1aec0f
JH
30// Converts bytes from Mac Roman encoding to UTF-8
31var txtDecoder = charmap.Macintosh.NewDecoder()
32
33// Converts bytes from UTF-8 to Mac Roman encoding
34var txtEncoder = charmap.Macintosh.NewEncoder()
35
6988a057 36type Server struct {
a2ef262a
JH
37 NetInterface string
38 Port int
a2ef262a 39
fd740bc4
JH
40 handlers map[TranType]HandlerFunc
41
42 Config Config
43 Logger *slog.Logger
c1c44744 44
40414f92 45 TrackerPassID [4]byte
00913df3 46
d9bc63a1 47 Stats Counter
6988a057 48
7cd900d6 49 FS FileStore // Storage backend to use for File storage
6988a057
JH
50
51 outbox chan Transaction
52
fd740bc4
JH
53 Agreement io.ReadSeeker
54 Banner []byte
8eb43f95 55
d9bc63a1
JH
56 FileTransferMgr FileTransferMgr
57 ChatMgr ChatManager
58 ClientMgr ClientManager
59 AccountManager AccountManager
60 ThreadedNewsMgr ThreadedNewsMgr
61 BanList BanMgr
46862572 62
d9bc63a1 63 MessageBoard io.ReadWriteSeeker
6988a057
JH
64}
65
fd740bc4 66type Option = func(s *Server)
d9bc63a1 67
fd740bc4
JH
68func WithConfig(config Config) func(s *Server) {
69 return func(s *Server) {
70 s.Config = config
d9bc63a1 71 }
fd740bc4 72}
d9bc63a1 73
fd740bc4
JH
74func WithLogger(logger *slog.Logger) func(s *Server) {
75 return func(s *Server) {
76 s.Logger = logger
d9bc63a1 77 }
fd740bc4 78}
d9bc63a1 79
fd740bc4
JH
80// WithPort optionally overrides the default TCP port.
81func WithPort(port int) func(s *Server) {
82 return func(s *Server) {
83 s.Port = port
d9bc63a1 84 }
fd740bc4 85}
d9bc63a1 86
fd740bc4
JH
87// WithInterface optionally sets a specific interface to listen on.
88func WithInterface(netInterface string) func(s *Server) {
89 return func(s *Server) {
90 s.NetInterface = netInterface
d9bc63a1 91 }
fd740bc4 92}
d9bc63a1 93
fd740bc4
JH
94type ServerConfig struct {
95}
d9bc63a1 96
fd740bc4
JH
97func NewServer(options ...Option) (*Server, error) {
98 server := Server{
99 handlers: make(map[TranType]HandlerFunc),
100 outbox: make(chan Transaction),
101 FS: &OSFileStore{},
102 ChatMgr: NewMemChatManager(),
103 ClientMgr: NewMemClientMgr(),
104 FileTransferMgr: NewMemFileTransferMgr(),
105 Stats: NewStats(),
106 }
d9bc63a1 107
fd740bc4
JH
108 for _, opt := range options {
109 opt(&server)
d9bc63a1 110 }
00913df3 111
fd740bc4
JH
112 // generate a new random passID for tracker registration
113 _, err := rand.Read(server.TrackerPassID[:])
114 if err != nil {
115 return nil, err
116 }
00913df3 117
d9bc63a1 118 return &server, nil
00913df3
JH
119}
120
d9bc63a1
JH
121func (s *Server) CurrentStats() map[string]interface{} {
122 return s.Stats.Values()
6988a057
JH
123}
124
a2ef262a 125func (s *Server) ListenAndServe(ctx context.Context) error {
fd740bc4
JH
126 go s.registerWithTrackers(ctx)
127 go s.keepaliveHandler(ctx)
128 go s.processOutbox()
129
6988a057
JH
130 var wg sync.WaitGroup
131
132 wg.Add(1)
8168522a 133 go func() {
2d0f2abe 134 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port))
8168522a 135 if err != nil {
a6216dd8 136 log.Fatal(err)
8168522a
JH
137 }
138
a6216dd8 139 log.Fatal(s.Serve(ctx, ln))
8168522a 140 }()
6988a057
JH
141
142 wg.Add(1)
8168522a 143 go func() {
2d0f2abe 144 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port+1))
8168522a 145 if err != nil {
a6216dd8 146 log.Fatal(err)
8168522a
JH
147 }
148
a6216dd8 149 log.Fatal(s.ServeFileTransfers(ctx, ln))
8168522a 150 }()
6988a057
JH
151
152 wg.Wait()
153
154 return nil
155}
156
7cd900d6 157func (s *Server) ServeFileTransfers(ctx context.Context, ln net.Listener) error {
6988a057
JH
158 for {
159 conn, err := ln.Accept()
160 if err != nil {
161 return err
162 }
163
164 go func() {
7cd900d6
JH
165 defer func() { _ = conn.Close() }()
166
167 err = s.handleFileTransfer(
a2ef262a 168 context.WithValue(ctx, contextKeyReq, requestCtx{remoteAddr: conn.RemoteAddr().String()}),
7cd900d6
JH
169 conn,
170 )
171
172 if err != nil {
a6216dd8 173 s.Logger.Error("file transfer error", "reason", err)
6988a057
JH
174 }
175 }()
176 }
177}
178
179func (s *Server) sendTransaction(t Transaction) error {
fd740bc4 180 client := s.ClientMgr.Get(t.ClientID)
a2ef262a 181
d9bc63a1 182 if client == nil {
a2ef262a 183 return nil
6988a057 184 }
6988a057 185
a2ef262a 186 _, err := io.Copy(client.Connection, &t)
75e4191b 187 if err != nil {
fd740bc4 188 return fmt.Errorf("failed to send transaction to client %v: %v", t.ClientID, err)
6988a057 189 }
3178ae58 190
6988a057
JH
191 return nil
192}
193
67db911d
JH
194func (s *Server) processOutbox() {
195 for {
196 t := <-s.outbox
197 go func() {
198 if err := s.sendTransaction(t); err != nil {
a6216dd8 199 s.Logger.Error("error sending transaction", "err", err)
67db911d
JH
200 }
201 }()
202 }
203}
204
7cd900d6 205func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
6988a057 206 for {
fd740bc4
JH
207 select {
208 case <-ctx.Done():
209 s.Logger.Info("Server shutting down")
210 return ctx.Err()
211 default:
212 conn, err := ln.Accept()
213 if err != nil {
214 s.Logger.Error("Error accepting connection", "err", err)
215 continue
216 }
6988a057 217
fd740bc4
JH
218 go func() {
219 connCtx := context.WithValue(ctx, contextKeyReq, requestCtx{
220 remoteAddr: conn.RemoteAddr().String(),
221 })
222
223 s.Logger.Info("Connection established", "addr", conn.RemoteAddr())
224 defer conn.Close()
225
226 if err := s.handleNewConnection(connCtx, conn, conn.RemoteAddr().String()); err != nil {
227 if err == io.EOF {
228 s.Logger.Info("Client disconnected", "RemoteAddr", conn.RemoteAddr())
229 } else {
230 s.Logger.Error("Error serving request", "RemoteAddr", conn.RemoteAddr(), "err", err)
231 }
6988a057 232 }
fd740bc4
JH
233 }()
234 }
6988a057
JH
235 }
236}
237
fd740bc4
JH
238// time in seconds between tracker re-registration
239const trackerUpdateFrequency = 300
240
241// registerWithTrackers runs every trackerUpdateFrequency seconds to update the server's tracker entry on all configured
242// trackers.
243func (s *Server) registerWithTrackers(ctx context.Context) {
244 ticker := time.NewTicker(trackerUpdateFrequency * time.Second)
245 defer ticker.Stop()
6988a057 246
d9bc63a1 247 for {
fd740bc4
JH
248 select {
249 case <-ctx.Done():
250 return
251 case <-ticker.C:
252 if s.Config.EnableTrackerRegistration {
253 tr := &TrackerRegistration{
254 UserCount: len(s.ClientMgr.List()),
255 PassID: s.TrackerPassID,
256 Name: s.Config.Name,
257 Description: s.Config.Description,
258 }
259 binary.BigEndian.PutUint16(tr.Port[:], uint16(s.Port))
260
261 for _, t := range s.Config.Trackers {
262 if err := register(&RealDialer{}, t, tr); err != nil {
263 s.Logger.Error(fmt.Sprintf("Unable to register with tracker %v", t), "error", err)
264 }
265 }
6988a057 266 }
d9bc63a1 267 }
d9bc63a1 268 }
6988a057 269
fd740bc4 270}
d9bc63a1 271
fd740bc4
JH
272const (
273 userIdleSeconds = 300 // time in seconds before an inactive user is marked idle
274 idleCheckInterval = 10 // time in seconds to check for idle users
275)
6988a057 276
fd740bc4
JH
277// keepaliveHandler runs every idleCheckInterval seconds and increments a user's idle time by idleCheckInterval seconds.
278// If the updated idle time exceeds userIdleSeconds and the user was not previously idle, we notify all connected clients
279// that the user has gone idle. For most clients, this turns the user grey in the user list.
280func (s *Server) keepaliveHandler(ctx context.Context) {
281 ticker := time.NewTicker(idleCheckInterval * time.Second)
282 defer ticker.Stop()
d9bc63a1 283
fd740bc4
JH
284 for {
285 select {
286 case <-ctx.Done():
287 return
288 case <-ticker.C:
289 for _, c := range s.ClientMgr.List() {
290 c.mu.Lock()
291 c.IdleTime += idleCheckInterval
292
293 // Check if the user
294 if c.IdleTime > userIdleSeconds && !c.Flags.IsSet(UserFlagAway) {
295 c.Flags.Set(UserFlagAway, 1)
296
297 c.SendAll(
298 TranNotifyChangeUser,
299 NewField(FieldUserID, c.ID[:]),
300 NewField(FieldUserFlags, c.Flags[:]),
301 NewField(FieldUserName, c.UserName),
302 NewField(FieldUserIconID, c.Icon),
303 )
304 }
305 c.mu.Unlock()
6988a057
JH
306 }
307 }
46862572 308 }
6988a057
JH
309}
310
67db911d 311func (s *Server) NewClientConn(conn io.ReadWriteCloser, remoteAddr string) *ClientConn {
6988a057 312 clientConn := &ClientConn{
a2ef262a 313 Icon: []byte{0, 0}, // TODO: make array type
6988a057
JH
314 Connection: conn,
315 Server: s,
d4c152a4 316 RemoteAddr: remoteAddr,
d2810ae9 317
d9bc63a1 318 ClientFileTransferMgr: NewClientFileTransferMgr(),
180d6544
JH
319 }
320
d9bc63a1 321 s.ClientMgr.Add(clientConn)
6988a057 322
d9bc63a1 323 return clientConn
6988a057
JH
324}
325
a2ef262a
JH
326func sendBanMessage(rwc io.Writer, message string) {
327 t := NewTransaction(
328 TranServerMsg,
329 [2]byte{0, 0},
330 NewField(FieldData, []byte(message)),
331 NewField(FieldChatOptions, []byte{0, 0}),
332 )
333 _, _ = io.Copy(rwc, &t)
334 time.Sleep(1 * time.Second)
335}
336
6988a057 337// handleNewConnection takes a new net.Conn and performs the initial login sequence
3178ae58 338func (s *Server) handleNewConnection(ctx context.Context, rwc io.ReadWriteCloser, remoteAddr string) error {
c4208f86
JH
339 defer dontPanic(s.Logger)
340
fd740bc4
JH
341 if err := performHandshake(rwc); err != nil {
342 return fmt.Errorf("perform handshake: %w", err)
343 }
344
a2ef262a
JH
345 // Check if remoteAddr is present in the ban list
346 ipAddr := strings.Split(remoteAddr, ":")[0]
d9bc63a1 347 if isBanned, banUntil := s.BanList.IsBanned(ipAddr); isBanned {
a2ef262a
JH
348 // permaban
349 if banUntil == nil {
350 sendBanMessage(rwc, "You are permanently banned on this server")
351 s.Logger.Debug("Disconnecting permanently banned IP", "remoteAddr", ipAddr)
352 return nil
353 }
354
355 // temporary ban
356 if time.Now().Before(*banUntil) {
357 sendBanMessage(rwc, "You are temporarily banned on this server")
358 s.Logger.Debug("Disconnecting temporarily banned IP", "remoteAddr", ipAddr)
359 return nil
360 }
361 }
362
3178ae58
JH
363 // Create a new scanner for parsing incoming bytes into transaction tokens
364 scanner := bufio.NewScanner(rwc)
365 scanner.Split(transactionScanner)
366
367 scanner.Scan()
6988a057 368
f4cdaddc
JH
369 // Make a new []byte slice and copy the scanner bytes to it. This is critical to avoid a data race as the
370 // scanner re-uses the buffer for subsequent scans.
371 buf := make([]byte, len(scanner.Bytes()))
372 copy(buf, scanner.Bytes())
373
854a92fc 374 var clientLogin Transaction
f4cdaddc 375 if _, err := clientLogin.Write(buf); err != nil {
a2ef262a 376 return fmt.Errorf("error writing login transaction: %w", err)
46862572 377 }
e9c043c0
JH
378
379 c := s.NewClientConn(rwc, remoteAddr)
6988a057 380 defer c.Disconnect()
6988a057 381
d005ef04
JH
382 encodedPassword := clientLogin.GetField(FieldUserPassword).Data
383 c.Version = clientLogin.GetField(FieldVersion).Data
6988a057 384
d9bc63a1 385 login := clientLogin.GetField(FieldUserLogin).DecodeObfuscatedString()
6988a057
JH
386 if login == "" {
387 login = GuestAccount
388 }
389
fd740bc4 390 c.Logger = s.Logger.With("ip", ipAddr, "login", login)
0da28a1f 391
6988a057
JH
392 // If authentication fails, send error reply and close connection
393 if !c.Authenticate(login, encodedPassword) {
a2ef262a 394 t := c.NewErrReply(&clientLogin, "Incorrect login.")[0]
95159e55
JH
395
396 _, err := io.Copy(rwc, &t)
72dd37f1
JH
397 if err != nil {
398 return err
399 }
0da28a1f 400
fd740bc4 401 c.Logger.Info("Login failed", "clientVersion", fmt.Sprintf("%x", c.Version))
0da28a1f
JH
402
403 return nil
6988a057
JH
404 }
405
d005ef04
JH
406 if clientLogin.GetField(FieldUserIconID).Data != nil {
407 c.Icon = clientLogin.GetField(FieldUserIconID).Data
59097464
JH
408 }
409
d9bc63a1
JH
410 c.Account = c.Server.AccountManager.Get(login)
411 if c.Account == nil {
412 return nil
413 }
59097464 414
d005ef04 415 if clientLogin.GetField(FieldUserName).Data != nil {
d9bc63a1 416 if c.Authorize(AccessAnyName) {
d005ef04 417 c.UserName = clientLogin.GetField(FieldUserName).Data
ea5d8c51
JH
418 } else {
419 c.UserName = []byte(c.Account.Name)
420 }
6988a057
JH
421 }
422
d9bc63a1 423 if c.Authorize(AccessDisconUser) {
a2ef262a 424 c.Flags.Set(UserFlagAdmin, 1)
6988a057
JH
425 }
426
854a92fc 427 s.outbox <- c.NewReply(&clientLogin,
d005ef04
JH
428 NewField(FieldVersion, []byte{0x00, 0xbe}),
429 NewField(FieldCommunityBannerID, []byte{0, 0}),
430 NewField(FieldServerName, []byte(s.Config.Name)),
6988a057
JH
431 )
432
433 // Send user access privs so client UI knows how to behave
a2ef262a 434 c.Server.outbox <- NewTransaction(TranUserAccess, c.ID, NewField(FieldUserAccess, c.Account.Access[:]))
6988a057 435
d9bc63a1 436 // Accounts with AccessNoAgreement do not receive the server agreement on login. The behavior is different between
d005ef04
JH
437 // client versions. For 1.2.3 client, we do not send TranShowAgreement. For other client versions, we send
438 // TranShowAgreement but with the NoServerAgreement field set to 1.
d9bc63a1 439 if c.Authorize(AccessNoAgreement) {
a322be02
JH
440 // If client version is nil, then the client uses the 1.2.3 login behavior
441 if c.Version != nil {
a2ef262a 442 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldNoServerAgreement, []byte{1}))
a322be02 443 }
688c86d2 444 } else {
fd740bc4
JH
445 _, _ = c.Server.Agreement.Seek(0, 0)
446 data, _ := io.ReadAll(c.Server.Agreement)
447
448 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldData, data))
688c86d2 449 }
6988a057 450
2f8472fa
JH
451 // If the client has provided a username as part of the login, we can infer that it is using the 1.2.3 login
452 // flow and not the 1.5+ flow.
453 if len(c.UserName) != 0 {
454 // Add the client username to the logger. For 1.5+ clients, we don't have this information yet as it comes as
d005ef04 455 // part of TranAgreed
fd740bc4
JH
456 c.Logger = c.Logger.With("name", string(c.UserName))
457 c.Logger.Info("Login successful")
2f8472fa
JH
458
459 // Notify other clients on the server that the new user has logged in. For 1.5+ clients we don't have this
d005ef04 460 // information yet, so we do it in TranAgreed instead
d9bc63a1 461 for _, t := range c.NotifyOthers(
a2ef262a
JH
462 NewTransaction(
463 TranNotifyChangeUser, [2]byte{0, 0},
d005ef04 464 NewField(FieldUserName, c.UserName),
a2ef262a 465 NewField(FieldUserID, c.ID[:]),
d005ef04 466 NewField(FieldUserIconID, c.Icon),
a2ef262a 467 NewField(FieldUserFlags, c.Flags[:]),
2f8472fa
JH
468 ),
469 ) {
470 c.Server.outbox <- t
471 }
6988a057 472 }
bd1ce113 473
d9bc63a1
JH
474 c.Server.Stats.Increment(StatConnectionCounter, StatCurrentlyConnected)
475 defer c.Server.Stats.Decrement(StatCurrentlyConnected)
476
477 if len(s.ClientMgr.List()) > c.Server.Stats.Get(StatConnectionPeak) {
478 c.Server.Stats.Set(StatConnectionPeak, len(s.ClientMgr.List()))
00913df3 479 }
6988a057 480
3178ae58
JH
481 // Scan for new transactions and handle them as they come in.
482 for scanner.Scan() {
a2ef262a 483 // Copy the scanner bytes to a new slice to it to avoid a data race when the scanner re-uses the buffer.
fd740bc4
JH
484 tmpBuf := make([]byte, len(scanner.Bytes()))
485 copy(tmpBuf, scanner.Bytes())
6988a057 486
854a92fc 487 var t Transaction
fd740bc4 488 if _, err := t.Write(tmpBuf); err != nil {
854a92fc 489 return err
6988a057 490 }
854a92fc 491
a2ef262a 492 c.handleTransaction(t)
6988a057 493 }
3178ae58 494 return nil
6988a057
JH
495}
496
85767504 497// handleFileTransfer receives a client net.Conn from the file transfer server, performs the requested transfer type, then closes the connection
7cd900d6 498func (s *Server) handleFileTransfer(ctx context.Context, rwc io.ReadWriter) error {
37a954c8 499 defer dontPanic(s.Logger)
0a92e50b 500
a2ef262a 501 // The first 16 bytes contain the file transfer.
df2735b2 502 var t transfer
a2ef262a
JH
503 if _, err := io.CopyN(&t, rwc, 16); err != nil {
504 return fmt.Errorf("error reading file transfer: %w", err)
6988a057
JH
505 }
506
d9bc63a1
JH
507 fileTransfer := s.FileTransferMgr.Get(t.ReferenceNumber)
508 if fileTransfer == nil {
509 return errors.New("invalid transaction ID")
510 }
511
0a92e50b 512 defer func() {
d9bc63a1 513 s.FileTransferMgr.Delete(t.ReferenceNumber)
df1ade54 514
94742e2f
JH
515 // Wait a few seconds before closing the connection: this is a workaround for problems
516 // observed with Windows clients where the client must initiate close of the TCP connection before
517 // the server does. This is gross and seems unnecessary. TODO: Revisit?
518 time.Sleep(3 * time.Second)
0a92e50b 519 }()
6988a057 520
7cd900d6
JH
521 rLogger := s.Logger.With(
522 "remoteAddr", ctx.Value(contextKeyReq).(requestCtx).remoteAddr,
df1ade54 523 "login", fileTransfer.ClientConn.Account.Login,
95159e55 524 "Name", string(fileTransfer.ClientConn.UserName),
7cd900d6
JH
525 )
526
fd740bc4 527 fullPath, err := ReadPath(s.Config.FileRoot, fileTransfer.FilePath, fileTransfer.FileName)
df1ade54
JH
528 if err != nil {
529 return err
530 }
531
6988a057 532 switch fileTransfer.Type {
d9bc63a1 533 case BannerDownload:
fd740bc4
JH
534 if _, err := io.Copy(rwc, bytes.NewBuffer(s.Banner)); err != nil {
535 return fmt.Errorf("error sending Banner: %w", err)
9067f234 536 }
6988a057 537 case FileDownload:
d9bc63a1 538 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
94742e2f 539 defer func() {
d9bc63a1 540 s.Stats.Decrement(StatDownloadsInProgress)
94742e2f 541 }()
23411fc2 542
a2ef262a 543 err = DownloadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, true)
6988a057 544 if err != nil {
d9bc63a1 545 return fmt.Errorf("file download: %w", err)
7cd900d6
JH
546 }
547
6988a057 548 case FileUpload:
d9bc63a1
JH
549 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
550 defer func() {
551 s.Stats.Decrement(StatUploadsInProgress)
552 }()
23411fc2 553
a2ef262a 554 err = UploadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
7cd900d6 555 if err != nil {
a2ef262a 556 return fmt.Errorf("file upload error: %w", err)
6988a057 557 }
85767504 558
6988a057 559 case FolderDownload:
d9bc63a1
JH
560 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
561 defer func() {
562 s.Stats.Decrement(StatDownloadsInProgress)
563 }()
00913df3 564
a2ef262a 565 err = DownloadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
67db911d 566 if err != nil {
a2ef262a 567 return fmt.Errorf("file upload error: %w", err)
67db911d
JH
568 }
569
6988a057 570 case FolderUpload:
d9bc63a1
JH
571 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
572 defer func() {
573 s.Stats.Decrement(StatUploadsInProgress)
574 }()
575
a6216dd8 576 rLogger.Info(
6988a057 577 "Folder upload started",
df1ade54
JH
578 "dstPath", fullPath,
579 "TransferSize", binary.BigEndian.Uint32(fileTransfer.TransferSize),
6988a057
JH
580 "FolderItemCount", fileTransfer.FolderItemCount,
581 )
582
a2ef262a
JH
583 err = UploadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
584 if err != nil {
585 return fmt.Errorf("file upload error: %w", err)
6988a057 586 }
6988a057 587 }
6988a057
JH
588 return nil
589}
b6e3be94
JH
590
591func (s *Server) SendAll(t TranType, fields ...Field) {
592 for _, c := range s.ClientMgr.List() {
593 s.outbox <- NewTransaction(t, c.ID, fields...)
594 }
595}
596
597func (s *Server) Shutdown(msg []byte) {
598 s.Logger.Info("Shutdown signal received")
599 s.SendAll(TranDisconnectMsg, NewField(FieldData, msg))
600
601 time.Sleep(3 * time.Second)
602
603 os.Exit(0)
604}