]> git.r.bdr.sh - rbdr/mobius/blob - hotline/server.go
Improve human readability of account config files
[rbdr/mobius] / hotline / server.go
1 package hotline
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "crypto/rand"
8 "encoding/binary"
9 "errors"
10 "fmt"
11 "golang.org/x/text/encoding/charmap"
12 "golang.org/x/time/rate"
13 "io"
14 "log"
15 "log/slog"
16 "net"
17 "os"
18 "strings"
19 "sync"
20 "time"
21 )
22
23 type contextKey string
24
25 var contextKeyReq = contextKey("req")
26
27 type requestCtx struct {
28 remoteAddr string
29 }
30
31 // Converts bytes from Mac Roman encoding to UTF-8
32 var txtDecoder = charmap.Macintosh.NewDecoder()
33
34 // Converts bytes from UTF-8 to Mac Roman encoding
35 var txtEncoder = charmap.Macintosh.NewEncoder()
36
37 type Server struct {
38 NetInterface string
39 Port int
40
41 rateLimiters map[string]*rate.Limiter
42
43 handlers map[TranType]HandlerFunc
44
45 Config Config
46 Logger *slog.Logger
47
48 TrackerPassID [4]byte
49
50 Stats Counter
51
52 FS FileStore // Storage backend to use for File storage
53
54 outbox chan Transaction
55
56 Agreement io.ReadSeeker
57 Banner []byte
58
59 FileTransferMgr FileTransferMgr
60 ChatMgr ChatManager
61 ClientMgr ClientManager
62 AccountManager AccountManager
63 ThreadedNewsMgr ThreadedNewsMgr
64 BanList BanMgr
65
66 MessageBoard io.ReadWriteSeeker
67 }
68
69 type Option = func(s *Server)
70
71 func WithConfig(config Config) func(s *Server) {
72 return func(s *Server) {
73 s.Config = config
74 }
75 }
76
77 func WithLogger(logger *slog.Logger) func(s *Server) {
78 return func(s *Server) {
79 s.Logger = logger
80 }
81 }
82
83 // WithPort optionally overrides the default TCP port.
84 func WithPort(port int) func(s *Server) {
85 return func(s *Server) {
86 s.Port = port
87 }
88 }
89
90 // WithInterface optionally sets a specific interface to listen on.
91 func WithInterface(netInterface string) func(s *Server) {
92 return func(s *Server) {
93 s.NetInterface = netInterface
94 }
95 }
96
97 type ServerConfig struct {
98 }
99
100 func NewServer(options ...Option) (*Server, error) {
101 server := Server{
102 handlers: make(map[TranType]HandlerFunc),
103 outbox: make(chan Transaction),
104 rateLimiters: make(map[string]*rate.Limiter),
105 FS: &OSFileStore{},
106 ChatMgr: NewMemChatManager(),
107 ClientMgr: NewMemClientMgr(),
108 FileTransferMgr: NewMemFileTransferMgr(),
109 Stats: NewStats(),
110 }
111
112 for _, opt := range options {
113 opt(&server)
114 }
115
116 // generate a new random passID for tracker registration
117 _, err := rand.Read(server.TrackerPassID[:])
118 if err != nil {
119 return nil, err
120 }
121
122 return &server, nil
123 }
124
125 func (s *Server) CurrentStats() map[string]interface{} {
126 return s.Stats.Values()
127 }
128
129 func (s *Server) ListenAndServe(ctx context.Context) error {
130 go s.registerWithTrackers(ctx)
131 go s.keepaliveHandler(ctx)
132 go s.processOutbox()
133
134 var wg sync.WaitGroup
135
136 wg.Add(1)
137 go func() {
138 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port))
139 if err != nil {
140 log.Fatal(err)
141 }
142
143 log.Fatal(s.Serve(ctx, ln))
144 }()
145
146 wg.Add(1)
147 go func() {
148 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port+1))
149 if err != nil {
150 log.Fatal(err)
151 }
152
153 log.Fatal(s.ServeFileTransfers(ctx, ln))
154 }()
155
156 wg.Wait()
157
158 return nil
159 }
160
161 func (s *Server) ServeFileTransfers(ctx context.Context, ln net.Listener) error {
162 for {
163 conn, err := ln.Accept()
164 if err != nil {
165 return err
166 }
167
168 go func() {
169 defer func() { _ = conn.Close() }()
170
171 err = s.handleFileTransfer(
172 context.WithValue(ctx, contextKeyReq, requestCtx{remoteAddr: conn.RemoteAddr().String()}),
173 conn,
174 )
175
176 if err != nil {
177 s.Logger.Error("file transfer error", "err", err)
178 }
179 }()
180 }
181 }
182
183 func (s *Server) sendTransaction(t Transaction) error {
184 client := s.ClientMgr.Get(t.ClientID)
185
186 if client == nil {
187 return nil
188 }
189
190 _, err := io.Copy(client.Connection, &t)
191 if err != nil {
192 return fmt.Errorf("failed to send transaction to client %v: %v", t.ClientID, err)
193 }
194
195 return nil
196 }
197
198 func (s *Server) processOutbox() {
199 for {
200 t := <-s.outbox
201 go func() {
202 if err := s.sendTransaction(t); err != nil {
203 s.Logger.Error("error sending transaction", "err", err)
204 }
205 }()
206 }
207 }
208
209 // perIPRateLimit controls how frequently an IP address can connect before being throttled.
210 // 0.5 = 1 connection every 2 seconds
211 const perIPRateLimit = rate.Limit(0.5)
212
213 func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
214 for {
215 select {
216 case <-ctx.Done():
217 s.Logger.Info("Server shutting down")
218 return ctx.Err()
219 default:
220 conn, err := ln.Accept()
221 if err != nil {
222 s.Logger.Error("Error accepting connection", "err", err)
223 continue
224 }
225
226 go func() {
227 ipAddr := strings.Split(conn.RemoteAddr().(*net.TCPAddr).String(), ":")[0]
228
229 connCtx := context.WithValue(ctx, contextKeyReq, requestCtx{
230 remoteAddr: conn.RemoteAddr().String(),
231 })
232
233 s.Logger.Info("Connection established", "ip", ipAddr)
234 defer conn.Close()
235
236 // Check if we have an existing rate limit for the IP and create one if we do not.
237 rl, ok := s.rateLimiters[ipAddr]
238 if !ok {
239 rl = rate.NewLimiter(perIPRateLimit, 1)
240 s.rateLimiters[ipAddr] = rl
241 }
242
243 // Check if the rate limit is exceeded and close the connection if so.
244 if !rl.Allow() {
245 s.Logger.Info("Rate limit exceeded", "RemoteAddr", conn.RemoteAddr())
246 conn.Close()
247 return
248 }
249
250 if err := s.handleNewConnection(connCtx, conn, conn.RemoteAddr().String()); err != nil {
251 if err == io.EOF {
252 s.Logger.Info("Client disconnected", "RemoteAddr", conn.RemoteAddr())
253 } else {
254 s.Logger.Error("Error serving request", "RemoteAddr", conn.RemoteAddr(), "err", err)
255 }
256 }
257 }()
258 }
259 }
260 }
261
262 // time in seconds between tracker re-registration
263 const trackerUpdateFrequency = 300
264
265 // registerWithTrackers runs every trackerUpdateFrequency seconds to update the server's tracker entry on all configured
266 // trackers.
267 func (s *Server) registerWithTrackers(ctx context.Context) {
268 s.Logger.Info("Tracker registration enabled", "trackers", s.Config.Trackers)
269
270 for {
271 if s.Config.EnableTrackerRegistration {
272 for _, t := range s.Config.Trackers {
273 tr := &TrackerRegistration{
274 UserCount: len(s.ClientMgr.List()),
275 PassID: s.TrackerPassID,
276 Name: s.Config.Name,
277 Description: s.Config.Description,
278 }
279 binary.BigEndian.PutUint16(tr.Port[:], uint16(s.Port))
280
281 // Check the tracker string for a password. This is janky but avoids a breaking change to the Config
282 // Trackers field.
283 splitAddr := strings.Split(":", t)
284 if len(splitAddr) == 3 {
285 tr.Password = splitAddr[2]
286 }
287
288 if err := register(&RealDialer{}, t, tr); err != nil {
289 s.Logger.Error(fmt.Sprintf("Unable to register with tracker %v", t), "error", err)
290 }
291 }
292 }
293 // Using time.Ticker with for/select would be more idiomatic, but it's super annoying that it doesn't tick on
294 // first pass. Revist, maybe.
295 // https://github.com/golang/go/issues/17601
296 time.Sleep(trackerUpdateFrequency * time.Second)
297 }
298 }
299
300 const (
301 userIdleSeconds = 300 // time in seconds before an inactive user is marked idle
302 idleCheckInterval = 10 // time in seconds to check for idle users
303 )
304
305 // keepaliveHandler runs every idleCheckInterval seconds and increments a user's idle time by idleCheckInterval seconds.
306 // If the updated idle time exceeds userIdleSeconds and the user was not previously idle, we notify all connected clients
307 // that the user has gone idle. For most clients, this turns the user grey in the user list.
308 func (s *Server) keepaliveHandler(ctx context.Context) {
309 ticker := time.NewTicker(idleCheckInterval * time.Second)
310 defer ticker.Stop()
311
312 for {
313 select {
314 case <-ctx.Done():
315 return
316 case <-ticker.C:
317 for _, c := range s.ClientMgr.List() {
318 c.mu.Lock()
319 c.IdleTime += idleCheckInterval
320
321 // Check if the user
322 if c.IdleTime > userIdleSeconds && !c.Flags.IsSet(UserFlagAway) {
323 c.Flags.Set(UserFlagAway, 1)
324
325 c.SendAll(
326 TranNotifyChangeUser,
327 NewField(FieldUserID, c.ID[:]),
328 NewField(FieldUserFlags, c.Flags[:]),
329 NewField(FieldUserName, c.UserName),
330 NewField(FieldUserIconID, c.Icon),
331 )
332 }
333 c.mu.Unlock()
334 }
335 }
336 }
337 }
338
339 func (s *Server) NewClientConn(conn io.ReadWriteCloser, remoteAddr string) *ClientConn {
340 clientConn := &ClientConn{
341 Icon: []byte{0, 0}, // TODO: make array type
342 Connection: conn,
343 Server: s,
344 RemoteAddr: remoteAddr,
345
346 ClientFileTransferMgr: NewClientFileTransferMgr(),
347 }
348
349 s.ClientMgr.Add(clientConn)
350
351 return clientConn
352 }
353
354 func sendBanMessage(rwc io.Writer, message string) {
355 t := NewTransaction(
356 TranServerMsg,
357 [2]byte{0, 0},
358 NewField(FieldData, []byte(message)),
359 NewField(FieldChatOptions, []byte{0, 0}),
360 )
361 _, _ = io.Copy(rwc, &t)
362 time.Sleep(1 * time.Second)
363 }
364
365 // handleNewConnection takes a new net.Conn and performs the initial login sequence
366 func (s *Server) handleNewConnection(ctx context.Context, rwc io.ReadWriteCloser, remoteAddr string) error {
367 defer dontPanic(s.Logger)
368
369 if err := performHandshake(rwc); err != nil {
370 return fmt.Errorf("perform handshake: %w", err)
371 }
372
373 // Check if remoteAddr is present in the ban list
374 ipAddr := strings.Split(remoteAddr, ":")[0]
375 if isBanned, banUntil := s.BanList.IsBanned(ipAddr); isBanned {
376 // permaban
377 if banUntil == nil {
378 sendBanMessage(rwc, "You are permanently banned on this server")
379 s.Logger.Debug("Disconnecting permanently banned IP", "remoteAddr", ipAddr)
380 return nil
381 }
382
383 // temporary ban
384 if time.Now().Before(*banUntil) {
385 sendBanMessage(rwc, "You are temporarily banned on this server")
386 s.Logger.Debug("Disconnecting temporarily banned IP", "remoteAddr", ipAddr)
387 return nil
388 }
389 }
390
391 // Create a new scanner for parsing incoming bytes into transaction tokens
392 scanner := bufio.NewScanner(rwc)
393 scanner.Split(transactionScanner)
394
395 scanner.Scan()
396
397 // Make a new []byte slice and copy the scanner bytes to it. This is critical to avoid a data race as the
398 // scanner re-uses the buffer for subsequent scans.
399 buf := make([]byte, len(scanner.Bytes()))
400 copy(buf, scanner.Bytes())
401
402 var clientLogin Transaction
403 if _, err := clientLogin.Write(buf); err != nil {
404 return fmt.Errorf("error writing login transaction: %w", err)
405 }
406
407 c := s.NewClientConn(rwc, remoteAddr)
408 defer c.Disconnect()
409
410 encodedPassword := clientLogin.GetField(FieldUserPassword).Data
411 c.Version = clientLogin.GetField(FieldVersion).Data
412
413 login := clientLogin.GetField(FieldUserLogin).DecodeObfuscatedString()
414 if login == "" {
415 login = GuestAccount
416 }
417
418 c.Logger = s.Logger.With("ip", ipAddr, "login", login)
419
420 // If authentication fails, send error reply and close connection
421 if !c.Authenticate(login, encodedPassword) {
422 t := c.NewErrReply(&clientLogin, "Incorrect login.")[0]
423
424 _, err := io.Copy(rwc, &t)
425 if err != nil {
426 return err
427 }
428
429 c.Logger.Info("Incorrect login")
430
431 return nil
432 }
433
434 if clientLogin.GetField(FieldUserIconID).Data != nil {
435 c.Icon = clientLogin.GetField(FieldUserIconID).Data
436 }
437
438 c.Account = c.Server.AccountManager.Get(login)
439 if c.Account == nil {
440 return nil
441 }
442
443 if clientLogin.GetField(FieldUserName).Data != nil {
444 if c.Authorize(AccessAnyName) {
445 c.UserName = clientLogin.GetField(FieldUserName).Data
446 } else {
447 c.UserName = []byte(c.Account.Name)
448 }
449 }
450
451 if c.Authorize(AccessDisconUser) {
452 c.Flags.Set(UserFlagAdmin, 1)
453 }
454
455 s.outbox <- c.NewReply(&clientLogin,
456 NewField(FieldVersion, []byte{0x00, 0xbe}),
457 NewField(FieldCommunityBannerID, []byte{0, 0}),
458 NewField(FieldServerName, []byte(s.Config.Name)),
459 )
460
461 // Send user access privs so client UI knows how to behave
462 c.Server.outbox <- NewTransaction(TranUserAccess, c.ID, NewField(FieldUserAccess, c.Account.Access[:]))
463
464 // Accounts with AccessNoAgreement do not receive the server agreement on login. The behavior is different between
465 // client versions. For 1.2.3 client, we do not send TranShowAgreement. For other client versions, we send
466 // TranShowAgreement but with the NoServerAgreement field set to 1.
467 if c.Authorize(AccessNoAgreement) {
468 // If client version is nil, then the client uses the 1.2.3 login behavior
469 if c.Version != nil {
470 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldNoServerAgreement, []byte{1}))
471 }
472 } else {
473 _, _ = c.Server.Agreement.Seek(0, 0)
474 data, _ := io.ReadAll(c.Server.Agreement)
475
476 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldData, data))
477 }
478
479 // If the client has provided a username as part of the login, we can infer that it is using the 1.2.3 login
480 // flow and not the 1.5+ flow.
481 if len(c.UserName) != 0 {
482 // Add the client username to the logger. For 1.5+ clients, we don't have this information yet as it comes as
483 // part of TranAgreed
484 c.Logger = c.Logger.With("name", string(c.UserName))
485 c.Logger.Info("Login successful")
486
487 // Notify other clients on the server that the new user has logged in. For 1.5+ clients we don't have this
488 // information yet, so we do it in TranAgreed instead
489 for _, t := range c.NotifyOthers(
490 NewTransaction(
491 TranNotifyChangeUser, [2]byte{0, 0},
492 NewField(FieldUserName, c.UserName),
493 NewField(FieldUserID, c.ID[:]),
494 NewField(FieldUserIconID, c.Icon),
495 NewField(FieldUserFlags, c.Flags[:]),
496 ),
497 ) {
498 c.Server.outbox <- t
499 }
500 }
501
502 c.Server.Stats.Increment(StatConnectionCounter, StatCurrentlyConnected)
503 defer c.Server.Stats.Decrement(StatCurrentlyConnected)
504
505 if len(s.ClientMgr.List()) > c.Server.Stats.Get(StatConnectionPeak) {
506 c.Server.Stats.Set(StatConnectionPeak, len(s.ClientMgr.List()))
507 }
508
509 // Scan for new transactions and handle them as they come in.
510 for scanner.Scan() {
511 // Copy the scanner bytes to a new slice to it to avoid a data race when the scanner re-uses the buffer.
512 tmpBuf := make([]byte, len(scanner.Bytes()))
513 copy(tmpBuf, scanner.Bytes())
514
515 var t Transaction
516 if _, err := t.Write(tmpBuf); err != nil {
517 return err
518 }
519
520 c.handleTransaction(t)
521 }
522 return nil
523 }
524
525 // handleFileTransfer receives a client net.Conn from the file transfer server, performs the requested transfer type, then closes the connection
526 func (s *Server) handleFileTransfer(ctx context.Context, rwc io.ReadWriter) error {
527 defer dontPanic(s.Logger)
528
529 // The first 16 bytes contain the file transfer.
530 var t transfer
531 if _, err := io.CopyN(&t, rwc, 16); err != nil {
532 return fmt.Errorf("error reading file transfer: %w", err)
533 }
534
535 fileTransfer := s.FileTransferMgr.Get(t.ReferenceNumber)
536 if fileTransfer == nil {
537 return errors.New("invalid transaction ID")
538 }
539
540 defer func() {
541 s.FileTransferMgr.Delete(t.ReferenceNumber)
542
543 // Wait a few seconds before closing the connection: this is a workaround for problems
544 // observed with Windows clients where the client must initiate close of the TCP connection before
545 // the server does. This is gross and seems unnecessary. TODO: Revisit?
546 time.Sleep(3 * time.Second)
547 }()
548
549 rLogger := s.Logger.With(
550 "remoteAddr", ctx.Value(contextKeyReq).(requestCtx).remoteAddr,
551 "login", fileTransfer.ClientConn.Account.Login,
552 "Name", string(fileTransfer.ClientConn.UserName),
553 )
554
555 fullPath, err := ReadPath(fileTransfer.FileRoot, fileTransfer.FilePath, fileTransfer.FileName)
556 if err != nil {
557 return err
558 }
559
560 switch fileTransfer.Type {
561 case BannerDownload:
562 if _, err := io.Copy(rwc, bytes.NewBuffer(s.Banner)); err != nil {
563 return fmt.Errorf("banner download: %w", err)
564 }
565 case FileDownload:
566 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
567 defer func() {
568 s.Stats.Decrement(StatDownloadsInProgress)
569 }()
570
571 err = DownloadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, true)
572 if err != nil {
573 return fmt.Errorf("file download: %w", err)
574 }
575
576 case FileUpload:
577 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
578 defer func() {
579 s.Stats.Decrement(StatUploadsInProgress)
580 }()
581
582 err = UploadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
583 if err != nil {
584 return fmt.Errorf("file upload: %w", err)
585 }
586
587 case FolderDownload:
588 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
589 defer func() {
590 s.Stats.Decrement(StatDownloadsInProgress)
591 }()
592
593 err = DownloadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
594 if err != nil {
595 return fmt.Errorf("folder download: %w", err)
596 }
597
598 case FolderUpload:
599 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
600 defer func() {
601 s.Stats.Decrement(StatUploadsInProgress)
602 }()
603
604 rLogger.Info(
605 "Folder upload started",
606 "dstPath", fullPath,
607 "TransferSize", binary.BigEndian.Uint32(fileTransfer.TransferSize),
608 "FolderItemCount", fileTransfer.FolderItemCount,
609 )
610
611 err = UploadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
612 if err != nil {
613 return fmt.Errorf("folder upload: %w", err)
614 }
615 }
616 return nil
617 }
618
619 func (s *Server) SendAll(t TranType, fields ...Field) {
620 for _, c := range s.ClientMgr.List() {
621 s.outbox <- NewTransaction(t, c.ID, fields...)
622 }
623 }
624
625 func (s *Server) Shutdown(msg []byte) {
626 s.Logger.Info("Shutdown signal received")
627 s.SendAll(TranDisconnectMsg, NewField(FieldData, msg))
628
629 time.Sleep(3 * time.Second)
630
631 os.Exit(0)
632 }