]> git.r.bdr.sh - rbdr/mobius/blob - hotline/server.go
Migrate user account yaml files to new Access flag format if needed
[rbdr/mobius] / hotline / server.go
1 package hotline
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "crypto/rand"
8 "encoding/binary"
9 "errors"
10 "fmt"
11 "golang.org/x/text/encoding/charmap"
12 "golang.org/x/time/rate"
13 "io"
14 "log"
15 "log/slog"
16 "net"
17 "os"
18 "strings"
19 "sync"
20 "time"
21 )
22
23 type contextKey string
24
25 var contextKeyReq = contextKey("req")
26
27 type requestCtx struct {
28 remoteAddr string
29 }
30
31 // Converts bytes from Mac Roman encoding to UTF-8
32 var txtDecoder = charmap.Macintosh.NewDecoder()
33
34 // Converts bytes from UTF-8 to Mac Roman encoding
35 var txtEncoder = charmap.Macintosh.NewEncoder()
36
37 type Server struct {
38 NetInterface string
39 Port int
40
41 rateLimiters map[string]*rate.Limiter
42
43 handlers map[TranType]HandlerFunc
44
45 Config Config
46 Logger *slog.Logger
47
48 TrackerPassID [4]byte
49
50 Stats Counter
51
52 FS FileStore // Storage backend to use for File storage
53
54 outbox chan Transaction
55
56 Agreement io.ReadSeeker
57 Banner []byte
58
59 FileTransferMgr FileTransferMgr
60 ChatMgr ChatManager
61 ClientMgr ClientManager
62 AccountManager AccountManager
63 ThreadedNewsMgr ThreadedNewsMgr
64 BanList BanMgr
65
66 MessageBoard io.ReadWriteSeeker
67 }
68
69 type Option = func(s *Server)
70
71 func WithConfig(config Config) func(s *Server) {
72 return func(s *Server) {
73 s.Config = config
74 }
75 }
76
77 func WithLogger(logger *slog.Logger) func(s *Server) {
78 return func(s *Server) {
79 s.Logger = logger
80 }
81 }
82
83 // WithPort optionally overrides the default TCP port.
84 func WithPort(port int) func(s *Server) {
85 return func(s *Server) {
86 s.Port = port
87 }
88 }
89
90 // WithInterface optionally sets a specific interface to listen on.
91 func WithInterface(netInterface string) func(s *Server) {
92 return func(s *Server) {
93 s.NetInterface = netInterface
94 }
95 }
96
97 type ServerConfig struct {
98 }
99
100 func NewServer(options ...Option) (*Server, error) {
101 server := Server{
102 handlers: make(map[TranType]HandlerFunc),
103 outbox: make(chan Transaction),
104 rateLimiters: make(map[string]*rate.Limiter),
105 FS: &OSFileStore{},
106 ChatMgr: NewMemChatManager(),
107 ClientMgr: NewMemClientMgr(),
108 FileTransferMgr: NewMemFileTransferMgr(),
109 Stats: NewStats(),
110 }
111
112 for _, opt := range options {
113 opt(&server)
114 }
115
116 // generate a new random passID for tracker registration
117 _, err := rand.Read(server.TrackerPassID[:])
118 if err != nil {
119 return nil, err
120 }
121
122 return &server, nil
123 }
124
125 func (s *Server) CurrentStats() map[string]interface{} {
126 return s.Stats.Values()
127 }
128
129 func (s *Server) ListenAndServe(ctx context.Context) error {
130 go s.registerWithTrackers(ctx)
131 go s.keepaliveHandler(ctx)
132 go s.processOutbox()
133
134 var wg sync.WaitGroup
135
136 wg.Add(1)
137 go func() {
138 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port))
139 if err != nil {
140 log.Fatal(err)
141 }
142
143 log.Fatal(s.Serve(ctx, ln))
144 }()
145
146 wg.Add(1)
147 go func() {
148 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port+1))
149 if err != nil {
150 log.Fatal(err)
151 }
152
153 log.Fatal(s.ServeFileTransfers(ctx, ln))
154 }()
155
156 wg.Wait()
157
158 return nil
159 }
160
161 func (s *Server) ServeFileTransfers(ctx context.Context, ln net.Listener) error {
162 for {
163 conn, err := ln.Accept()
164 if err != nil {
165 return err
166 }
167
168 go func() {
169 defer func() { _ = conn.Close() }()
170
171 err = s.handleFileTransfer(
172 context.WithValue(ctx, contextKeyReq, requestCtx{remoteAddr: conn.RemoteAddr().String()}),
173 conn,
174 )
175
176 if err != nil {
177 s.Logger.Error("file transfer error", "err", err)
178 }
179 }()
180 }
181 }
182
183 func (s *Server) sendTransaction(t Transaction) error {
184 client := s.ClientMgr.Get(t.ClientID)
185
186 if client == nil {
187 return nil
188 }
189
190 _, err := io.Copy(client.Connection, &t)
191 if err != nil {
192 return fmt.Errorf("failed to send transaction to client %v: %v", t.ClientID, err)
193 }
194
195 return nil
196 }
197
198 func (s *Server) processOutbox() {
199 for {
200 t := <-s.outbox
201 go func() {
202 if err := s.sendTransaction(t); err != nil {
203 s.Logger.Error("error sending transaction", "err", err)
204 }
205 }()
206 }
207 }
208
209 // perIPRateLimit controls how frequently an IP address can connect before being throttled.
210 // 0.5 = 1 connection every 2 seconds
211 const perIPRateLimit = rate.Limit(0.5)
212
213 func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
214 for {
215 select {
216 case <-ctx.Done():
217 s.Logger.Info("Server shutting down")
218 return ctx.Err()
219 default:
220 conn, err := ln.Accept()
221 if err != nil {
222 s.Logger.Error("Error accepting connection", "err", err)
223 continue
224 }
225
226 go func() {
227 ipAddr := strings.Split(conn.RemoteAddr().(*net.TCPAddr).String(), ":")[0]
228
229 connCtx := context.WithValue(ctx, contextKeyReq, requestCtx{
230 remoteAddr: conn.RemoteAddr().String(),
231 })
232
233 s.Logger.Info("Connection established", "ip", ipAddr)
234 defer conn.Close()
235
236 // Check if we have an existing rate limit for the IP and create one if we do not.
237 rl, ok := s.rateLimiters[ipAddr]
238 if !ok {
239 rl = rate.NewLimiter(perIPRateLimit, 1)
240 s.rateLimiters[ipAddr] = rl
241 }
242
243 // Check if the rate limit is exceeded and close the connection if so.
244 if !rl.Allow() {
245 s.Logger.Info("Rate limit exceeded", "RemoteAddr", conn.RemoteAddr())
246 conn.Close()
247 return
248 }
249
250 if err := s.handleNewConnection(connCtx, conn, conn.RemoteAddr().String()); err != nil {
251 if err == io.EOF {
252 s.Logger.Info("Client disconnected", "RemoteAddr", conn.RemoteAddr())
253 } else {
254 s.Logger.Error("Error serving request", "RemoteAddr", conn.RemoteAddr(), "err", err)
255 }
256 }
257 }()
258 }
259 }
260 }
261
262 // time in seconds between tracker re-registration
263 const trackerUpdateFrequency = 300
264
265 // registerWithTrackers runs every trackerUpdateFrequency seconds to update the server's tracker entry on all configured
266 // trackers.
267 func (s *Server) registerWithTrackers(ctx context.Context) {
268 if s.Config.EnableTrackerRegistration {
269 s.Logger.Info("Tracker registration enabled", "trackers", s.Config.Trackers)
270 }
271
272 for {
273 if s.Config.EnableTrackerRegistration {
274 for _, t := range s.Config.Trackers {
275 tr := &TrackerRegistration{
276 UserCount: len(s.ClientMgr.List()),
277 PassID: s.TrackerPassID,
278 Name: s.Config.Name,
279 Description: s.Config.Description,
280 }
281 binary.BigEndian.PutUint16(tr.Port[:], uint16(s.Port))
282
283 // Check the tracker string for a password. This is janky but avoids a breaking change to the Config
284 // Trackers field.
285 splitAddr := strings.Split(":", t)
286 if len(splitAddr) == 3 {
287 tr.Password = splitAddr[2]
288 }
289
290 if err := register(&RealDialer{}, t, tr); err != nil {
291 s.Logger.Error(fmt.Sprintf("Unable to register with tracker %v", t), "error", err)
292 }
293 }
294 }
295 // Using time.Ticker with for/select would be more idiomatic, but it's super annoying that it doesn't tick on
296 // first pass. Revist, maybe.
297 // https://github.com/golang/go/issues/17601
298 time.Sleep(trackerUpdateFrequency * time.Second)
299 }
300 }
301
302 const (
303 userIdleSeconds = 300 // time in seconds before an inactive user is marked idle
304 idleCheckInterval = 10 // time in seconds to check for idle users
305 )
306
307 // keepaliveHandler runs every idleCheckInterval seconds and increments a user's idle time by idleCheckInterval seconds.
308 // If the updated idle time exceeds userIdleSeconds and the user was not previously idle, we notify all connected clients
309 // that the user has gone idle. For most clients, this turns the user grey in the user list.
310 func (s *Server) keepaliveHandler(ctx context.Context) {
311 ticker := time.NewTicker(idleCheckInterval * time.Second)
312 defer ticker.Stop()
313
314 for {
315 select {
316 case <-ctx.Done():
317 return
318 case <-ticker.C:
319 for _, c := range s.ClientMgr.List() {
320 c.mu.Lock()
321 c.IdleTime += idleCheckInterval
322
323 // Check if the user
324 if c.IdleTime > userIdleSeconds && !c.Flags.IsSet(UserFlagAway) {
325 c.Flags.Set(UserFlagAway, 1)
326
327 c.SendAll(
328 TranNotifyChangeUser,
329 NewField(FieldUserID, c.ID[:]),
330 NewField(FieldUserFlags, c.Flags[:]),
331 NewField(FieldUserName, c.UserName),
332 NewField(FieldUserIconID, c.Icon),
333 )
334 }
335 c.mu.Unlock()
336 }
337 }
338 }
339 }
340
341 func (s *Server) NewClientConn(conn io.ReadWriteCloser, remoteAddr string) *ClientConn {
342 clientConn := &ClientConn{
343 Icon: []byte{0, 0}, // TODO: make array type
344 Connection: conn,
345 Server: s,
346 RemoteAddr: remoteAddr,
347
348 ClientFileTransferMgr: NewClientFileTransferMgr(),
349 }
350
351 s.ClientMgr.Add(clientConn)
352
353 return clientConn
354 }
355
356 func sendBanMessage(rwc io.Writer, message string) {
357 t := NewTransaction(
358 TranServerMsg,
359 [2]byte{0, 0},
360 NewField(FieldData, []byte(message)),
361 NewField(FieldChatOptions, []byte{0, 0}),
362 )
363 _, _ = io.Copy(rwc, &t)
364 time.Sleep(1 * time.Second)
365 }
366
367 // handleNewConnection takes a new net.Conn and performs the initial login sequence
368 func (s *Server) handleNewConnection(ctx context.Context, rwc io.ReadWriteCloser, remoteAddr string) error {
369 defer dontPanic(s.Logger)
370
371 if err := performHandshake(rwc); err != nil {
372 return fmt.Errorf("perform handshake: %w", err)
373 }
374
375 // Check if remoteAddr is present in the ban list
376 ipAddr := strings.Split(remoteAddr, ":")[0]
377 if isBanned, banUntil := s.BanList.IsBanned(ipAddr); isBanned {
378 // permaban
379 if banUntil == nil {
380 sendBanMessage(rwc, "You are permanently banned on this server")
381 s.Logger.Debug("Disconnecting permanently banned IP", "remoteAddr", ipAddr)
382 return nil
383 }
384
385 // temporary ban
386 if time.Now().Before(*banUntil) {
387 sendBanMessage(rwc, "You are temporarily banned on this server")
388 s.Logger.Debug("Disconnecting temporarily banned IP", "remoteAddr", ipAddr)
389 return nil
390 }
391 }
392
393 // Create a new scanner for parsing incoming bytes into transaction tokens
394 scanner := bufio.NewScanner(rwc)
395 scanner.Split(transactionScanner)
396
397 scanner.Scan()
398
399 // Make a new []byte slice and copy the scanner bytes to it. This is critical to avoid a data race as the
400 // scanner re-uses the buffer for subsequent scans.
401 buf := make([]byte, len(scanner.Bytes()))
402 copy(buf, scanner.Bytes())
403
404 var clientLogin Transaction
405 if _, err := clientLogin.Write(buf); err != nil {
406 return fmt.Errorf("error writing login transaction: %w", err)
407 }
408
409 c := s.NewClientConn(rwc, remoteAddr)
410 defer c.Disconnect()
411
412 encodedPassword := clientLogin.GetField(FieldUserPassword).Data
413 c.Version = clientLogin.GetField(FieldVersion).Data
414
415 login := clientLogin.GetField(FieldUserLogin).DecodeObfuscatedString()
416 if login == "" {
417 login = GuestAccount
418 }
419
420 c.Logger = s.Logger.With("ip", ipAddr, "login", login)
421
422 // If authentication fails, send error reply and close connection
423 if !c.Authenticate(login, encodedPassword) {
424 t := c.NewErrReply(&clientLogin, "Incorrect login.")[0]
425
426 _, err := io.Copy(rwc, &t)
427 if err != nil {
428 return err
429 }
430
431 c.Logger.Info("Incorrect login")
432
433 return nil
434 }
435
436 if clientLogin.GetField(FieldUserIconID).Data != nil {
437 c.Icon = clientLogin.GetField(FieldUserIconID).Data
438 }
439
440 c.Account = c.Server.AccountManager.Get(login)
441 if c.Account == nil {
442 return nil
443 }
444
445 if clientLogin.GetField(FieldUserName).Data != nil {
446 if c.Authorize(AccessAnyName) {
447 c.UserName = clientLogin.GetField(FieldUserName).Data
448 } else {
449 c.UserName = []byte(c.Account.Name)
450 }
451 }
452
453 if c.Authorize(AccessDisconUser) {
454 c.Flags.Set(UserFlagAdmin, 1)
455 }
456
457 s.outbox <- c.NewReply(&clientLogin,
458 NewField(FieldVersion, []byte{0x00, 0xbe}),
459 NewField(FieldCommunityBannerID, []byte{0, 0}),
460 NewField(FieldServerName, []byte(s.Config.Name)),
461 )
462
463 // Send user access privs so client UI knows how to behave
464 c.Server.outbox <- NewTransaction(TranUserAccess, c.ID, NewField(FieldUserAccess, c.Account.Access[:]))
465
466 // Accounts with AccessNoAgreement do not receive the server agreement on login. The behavior is different between
467 // client versions. For 1.2.3 client, we do not send TranShowAgreement. For other client versions, we send
468 // TranShowAgreement but with the NoServerAgreement field set to 1.
469 if c.Authorize(AccessNoAgreement) {
470 // If client version is nil, then the client uses the 1.2.3 login behavior
471 if c.Version != nil {
472 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldNoServerAgreement, []byte{1}))
473 }
474 } else {
475 _, _ = c.Server.Agreement.Seek(0, 0)
476 data, _ := io.ReadAll(c.Server.Agreement)
477
478 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldData, data))
479 }
480
481 // If the client has provided a username as part of the login, we can infer that it is using the 1.2.3 login
482 // flow and not the 1.5+ flow.
483 if len(c.UserName) != 0 {
484 // Add the client username to the logger. For 1.5+ clients, we don't have this information yet as it comes as
485 // part of TranAgreed
486 c.Logger = c.Logger.With("name", string(c.UserName))
487 c.Logger.Info("Login successful")
488
489 // Notify other clients on the server that the new user has logged in. For 1.5+ clients we don't have this
490 // information yet, so we do it in TranAgreed instead
491 for _, t := range c.NotifyOthers(
492 NewTransaction(
493 TranNotifyChangeUser, [2]byte{0, 0},
494 NewField(FieldUserName, c.UserName),
495 NewField(FieldUserID, c.ID[:]),
496 NewField(FieldUserIconID, c.Icon),
497 NewField(FieldUserFlags, c.Flags[:]),
498 ),
499 ) {
500 c.Server.outbox <- t
501 }
502 }
503
504 c.Server.Stats.Increment(StatConnectionCounter, StatCurrentlyConnected)
505 defer c.Server.Stats.Decrement(StatCurrentlyConnected)
506
507 if len(s.ClientMgr.List()) > c.Server.Stats.Get(StatConnectionPeak) {
508 c.Server.Stats.Set(StatConnectionPeak, len(s.ClientMgr.List()))
509 }
510
511 // Scan for new transactions and handle them as they come in.
512 for scanner.Scan() {
513 // Copy the scanner bytes to a new slice to it to avoid a data race when the scanner re-uses the buffer.
514 tmpBuf := make([]byte, len(scanner.Bytes()))
515 copy(tmpBuf, scanner.Bytes())
516
517 var t Transaction
518 if _, err := t.Write(tmpBuf); err != nil {
519 return err
520 }
521
522 c.handleTransaction(t)
523 }
524 return nil
525 }
526
527 // handleFileTransfer receives a client net.Conn from the file transfer server, performs the requested transfer type, then closes the connection
528 func (s *Server) handleFileTransfer(ctx context.Context, rwc io.ReadWriter) error {
529 defer dontPanic(s.Logger)
530
531 // The first 16 bytes contain the file transfer.
532 var t transfer
533 if _, err := io.CopyN(&t, rwc, 16); err != nil {
534 return fmt.Errorf("error reading file transfer: %w", err)
535 }
536
537 fileTransfer := s.FileTransferMgr.Get(t.ReferenceNumber)
538 if fileTransfer == nil {
539 return errors.New("invalid transaction ID")
540 }
541
542 defer func() {
543 s.FileTransferMgr.Delete(t.ReferenceNumber)
544
545 // Wait a few seconds before closing the connection: this is a workaround for problems
546 // observed with Windows clients where the client must initiate close of the TCP connection before
547 // the server does. This is gross and seems unnecessary. TODO: Revisit?
548 time.Sleep(3 * time.Second)
549 }()
550
551 rLogger := s.Logger.With(
552 "remoteAddr", ctx.Value(contextKeyReq).(requestCtx).remoteAddr,
553 "login", fileTransfer.ClientConn.Account.Login,
554 "Name", string(fileTransfer.ClientConn.UserName),
555 )
556
557 fullPath, err := ReadPath(fileTransfer.FileRoot, fileTransfer.FilePath, fileTransfer.FileName)
558 if err != nil {
559 return err
560 }
561
562 switch fileTransfer.Type {
563 case BannerDownload:
564 if _, err := io.Copy(rwc, bytes.NewBuffer(s.Banner)); err != nil {
565 return fmt.Errorf("banner download: %w", err)
566 }
567 case FileDownload:
568 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
569 defer func() {
570 s.Stats.Decrement(StatDownloadsInProgress)
571 }()
572
573 err = DownloadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, true)
574 if err != nil {
575 return fmt.Errorf("file download: %w", err)
576 }
577
578 case FileUpload:
579 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
580 defer func() {
581 s.Stats.Decrement(StatUploadsInProgress)
582 }()
583
584 err = UploadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
585 if err != nil {
586 return fmt.Errorf("file upload: %w", err)
587 }
588
589 case FolderDownload:
590 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
591 defer func() {
592 s.Stats.Decrement(StatDownloadsInProgress)
593 }()
594
595 err = DownloadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
596 if err != nil {
597 return fmt.Errorf("folder download: %w", err)
598 }
599
600 case FolderUpload:
601 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
602 defer func() {
603 s.Stats.Decrement(StatUploadsInProgress)
604 }()
605
606 rLogger.Info(
607 "Folder upload started",
608 "dstPath", fullPath,
609 "TransferSize", binary.BigEndian.Uint32(fileTransfer.TransferSize),
610 "FolderItemCount", fileTransfer.FolderItemCount,
611 )
612
613 err = UploadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
614 if err != nil {
615 return fmt.Errorf("folder upload: %w", err)
616 }
617 }
618 return nil
619 }
620
621 func (s *Server) SendAll(t TranType, fields ...Field) {
622 for _, c := range s.ClientMgr.List() {
623 s.outbox <- NewTransaction(t, c.ID, fields...)
624 }
625 }
626
627 func (s *Server) Shutdown(msg []byte) {
628 s.Logger.Info("Shutdown signal received")
629 s.SendAll(TranDisconnectMsg, NewField(FieldData, msg))
630
631 time.Sleep(3 * time.Second)
632
633 os.Exit(0)
634 }