]> git.r.bdr.sh - rbdr/mobius/blob - hotline/server.go
Register server address on local network with Bonjour
[rbdr/mobius] / hotline / server.go
1 package hotline
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "crypto/rand"
8 "encoding/binary"
9 "errors"
10 "fmt"
11 "golang.org/x/text/encoding/charmap"
12 "golang.org/x/time/rate"
13 "io"
14 "log"
15 "log/slog"
16 "net"
17 "os"
18 "strings"
19 "sync"
20 "time"
21 )
22
23 type contextKey string
24
25 var contextKeyReq = contextKey("req")
26
27 type requestCtx struct {
28 remoteAddr string
29 }
30
31 // Converts bytes from Mac Roman encoding to UTF-8
32 var txtDecoder = charmap.Macintosh.NewDecoder()
33
34 // Converts bytes from UTF-8 to Mac Roman encoding
35 var txtEncoder = charmap.Macintosh.NewEncoder()
36
37 type Server struct {
38 NetInterface string
39 Port int
40
41 rateLimiters map[string]*rate.Limiter
42
43 handlers map[TranType]HandlerFunc
44
45 Config Config
46 Logger *slog.Logger
47
48 TrackerPassID [4]byte
49
50 Stats Counter
51
52 FS FileStore // Storage backend to use for File storage
53
54 outbox chan Transaction
55
56 Agreement io.ReadSeeker
57 Banner []byte
58
59 FileTransferMgr FileTransferMgr
60 ChatMgr ChatManager
61 ClientMgr ClientManager
62 AccountManager AccountManager
63 ThreadedNewsMgr ThreadedNewsMgr
64 BanList BanMgr
65
66 MessageBoard io.ReadWriteSeeker
67 }
68
69 type Option = func(s *Server)
70
71 func WithConfig(config Config) func(s *Server) {
72 return func(s *Server) {
73 s.Config = config
74 }
75 }
76
77 func WithLogger(logger *slog.Logger) func(s *Server) {
78 return func(s *Server) {
79 s.Logger = logger
80 }
81 }
82
83 // WithPort optionally overrides the default TCP port.
84 func WithPort(port int) func(s *Server) {
85 return func(s *Server) {
86 s.Port = port
87 }
88 }
89
90 // WithInterface optionally sets a specific interface to listen on.
91 func WithInterface(netInterface string) func(s *Server) {
92 return func(s *Server) {
93 s.NetInterface = netInterface
94 }
95 }
96
97 type ServerConfig struct {
98 }
99
100 func NewServer(options ...Option) (*Server, error) {
101 server := Server{
102 handlers: make(map[TranType]HandlerFunc),
103 outbox: make(chan Transaction),
104 rateLimiters: make(map[string]*rate.Limiter),
105 FS: &OSFileStore{},
106 ChatMgr: NewMemChatManager(),
107 ClientMgr: NewMemClientMgr(),
108 FileTransferMgr: NewMemFileTransferMgr(),
109 Stats: NewStats(),
110 }
111
112 for _, opt := range options {
113 opt(&server)
114 }
115
116 // generate a new random passID for tracker registration
117 _, err := rand.Read(server.TrackerPassID[:])
118 if err != nil {
119 return nil, err
120 }
121
122 return &server, nil
123 }
124
125 func (s *Server) CurrentStats() map[string]interface{} {
126 return s.Stats.Values()
127 }
128
129 func (s *Server) ListenAndServe(ctx context.Context) error {
130 go s.registerWithTrackers(ctx)
131 go s.keepaliveHandler(ctx)
132 go s.processOutbox()
133
134 var wg sync.WaitGroup
135
136 wg.Add(1)
137 go func() {
138 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port))
139 if err != nil {
140 log.Fatal(err)
141 }
142
143 log.Fatal(s.Serve(ctx, ln))
144 }()
145
146 wg.Add(1)
147 go func() {
148 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port+1))
149 if err != nil {
150 log.Fatal(err)
151 }
152
153 log.Fatal(s.ServeFileTransfers(ctx, ln))
154 }()
155
156 wg.Wait()
157
158 return nil
159 }
160
161 func (s *Server) ServeFileTransfers(ctx context.Context, ln net.Listener) error {
162 for {
163 conn, err := ln.Accept()
164 if err != nil {
165 return err
166 }
167
168 go func() {
169 defer func() { _ = conn.Close() }()
170
171 err = s.handleFileTransfer(
172 context.WithValue(ctx, contextKeyReq, requestCtx{remoteAddr: conn.RemoteAddr().String()}),
173 conn,
174 )
175
176 if err != nil {
177 s.Logger.Error("file transfer error", "err", err)
178 }
179 }()
180 }
181 }
182
183 func (s *Server) sendTransaction(t Transaction) error {
184 client := s.ClientMgr.Get(t.ClientID)
185
186 if client == nil {
187 return nil
188 }
189
190 _, err := io.Copy(client.Connection, &t)
191 if err != nil {
192 return fmt.Errorf("failed to send transaction to client %v: %v", t.ClientID, err)
193 }
194
195 return nil
196 }
197
198 func (s *Server) processOutbox() {
199 for {
200 t := <-s.outbox
201 go func() {
202 if err := s.sendTransaction(t); err != nil {
203 s.Logger.Error("error sending transaction", "err", err)
204 }
205 }()
206 }
207 }
208
209 // perIPRateLimit controls how frequently an IP address can connect before being throttled.
210 // 0.5 = 1 connection every 2 seconds
211 const perIPRateLimit = rate.Limit(0.5)
212
213 func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
214 for {
215 select {
216 case <-ctx.Done():
217 s.Logger.Info("Server shutting down")
218 return ctx.Err()
219 default:
220 conn, err := ln.Accept()
221 if err != nil {
222 s.Logger.Error("Error accepting connection", "err", err)
223 continue
224 }
225
226 go func() {
227 ipAddr := strings.Split(conn.RemoteAddr().(*net.TCPAddr).String(), ":")[0]
228
229 connCtx := context.WithValue(ctx, contextKeyReq, requestCtx{
230 remoteAddr: conn.RemoteAddr().String(),
231 })
232
233 s.Logger.Info("Connection established", "ip", ipAddr)
234 defer conn.Close()
235
236 // Check if we have an existing rate limit for the IP and create one if we do not.
237 rl, ok := s.rateLimiters[ipAddr]
238 if !ok {
239 rl = rate.NewLimiter(perIPRateLimit, 1)
240 s.rateLimiters[ipAddr] = rl
241 }
242
243 // Check if the rate limit is exceeded and close the connection if so.
244 if !rl.Allow() {
245 s.Logger.Info("Rate limit exceeded", "RemoteAddr", conn.RemoteAddr())
246 conn.Close()
247 return
248 }
249
250 if err := s.handleNewConnection(connCtx, conn, conn.RemoteAddr().String()); err != nil {
251 if err == io.EOF {
252 s.Logger.Info("Client disconnected", "RemoteAddr", conn.RemoteAddr())
253 } else {
254 s.Logger.Error("Error serving request", "RemoteAddr", conn.RemoteAddr(), "err", err)
255 }
256 }
257 }()
258 }
259 }
260 }
261
262 // time in seconds between tracker re-registration
263 const trackerUpdateFrequency = 300
264
265 // registerWithTrackers runs every trackerUpdateFrequency seconds to update the server's tracker entry on all configured
266 // trackers.
267 func (s *Server) registerWithTrackers(ctx context.Context) {
268 for {
269 if s.Config.EnableTrackerRegistration {
270 for _, t := range s.Config.Trackers {
271 tr := &TrackerRegistration{
272 UserCount: len(s.ClientMgr.List()),
273 PassID: s.TrackerPassID,
274 Name: s.Config.Name,
275 Description: s.Config.Description,
276 }
277 binary.BigEndian.PutUint16(tr.Port[:], uint16(s.Port))
278
279 // Check the tracker string for a password. This is janky but avoids a breaking change to the Config
280 // Trackers field.
281 splitAddr := strings.Split(":", t)
282 if len(splitAddr) == 3 {
283 tr.Password = splitAddr[2]
284 }
285
286 if err := register(&RealDialer{}, t, tr); err != nil {
287 s.Logger.Error(fmt.Sprintf("Unable to register with tracker %v", t), "error", err)
288 }
289 }
290 }
291 // Using time.Ticker with for/select would be more idiomatic, but it's super annoying that it doesn't tick on
292 // first pass. Revist, maybe.
293 // https://github.com/golang/go/issues/17601
294 time.Sleep(trackerUpdateFrequency * time.Second)
295 }
296 }
297
298 const (
299 userIdleSeconds = 300 // time in seconds before an inactive user is marked idle
300 idleCheckInterval = 10 // time in seconds to check for idle users
301 )
302
303 // keepaliveHandler runs every idleCheckInterval seconds and increments a user's idle time by idleCheckInterval seconds.
304 // If the updated idle time exceeds userIdleSeconds and the user was not previously idle, we notify all connected clients
305 // that the user has gone idle. For most clients, this turns the user grey in the user list.
306 func (s *Server) keepaliveHandler(ctx context.Context) {
307 ticker := time.NewTicker(idleCheckInterval * time.Second)
308 defer ticker.Stop()
309
310 for {
311 select {
312 case <-ctx.Done():
313 return
314 case <-ticker.C:
315 for _, c := range s.ClientMgr.List() {
316 c.mu.Lock()
317 c.IdleTime += idleCheckInterval
318
319 // Check if the user
320 if c.IdleTime > userIdleSeconds && !c.Flags.IsSet(UserFlagAway) {
321 c.Flags.Set(UserFlagAway, 1)
322
323 c.SendAll(
324 TranNotifyChangeUser,
325 NewField(FieldUserID, c.ID[:]),
326 NewField(FieldUserFlags, c.Flags[:]),
327 NewField(FieldUserName, c.UserName),
328 NewField(FieldUserIconID, c.Icon),
329 )
330 }
331 c.mu.Unlock()
332 }
333 }
334 }
335 }
336
337 func (s *Server) NewClientConn(conn io.ReadWriteCloser, remoteAddr string) *ClientConn {
338 clientConn := &ClientConn{
339 Icon: []byte{0, 0}, // TODO: make array type
340 Connection: conn,
341 Server: s,
342 RemoteAddr: remoteAddr,
343
344 ClientFileTransferMgr: NewClientFileTransferMgr(),
345 }
346
347 s.ClientMgr.Add(clientConn)
348
349 return clientConn
350 }
351
352 func sendBanMessage(rwc io.Writer, message string) {
353 t := NewTransaction(
354 TranServerMsg,
355 [2]byte{0, 0},
356 NewField(FieldData, []byte(message)),
357 NewField(FieldChatOptions, []byte{0, 0}),
358 )
359 _, _ = io.Copy(rwc, &t)
360 time.Sleep(1 * time.Second)
361 }
362
363 // handleNewConnection takes a new net.Conn and performs the initial login sequence
364 func (s *Server) handleNewConnection(ctx context.Context, rwc io.ReadWriteCloser, remoteAddr string) error {
365 defer dontPanic(s.Logger)
366
367 if err := performHandshake(rwc); err != nil {
368 return fmt.Errorf("perform handshake: %w", err)
369 }
370
371 // Check if remoteAddr is present in the ban list
372 ipAddr := strings.Split(remoteAddr, ":")[0]
373 if isBanned, banUntil := s.BanList.IsBanned(ipAddr); isBanned {
374 // permaban
375 if banUntil == nil {
376 sendBanMessage(rwc, "You are permanently banned on this server")
377 s.Logger.Debug("Disconnecting permanently banned IP", "remoteAddr", ipAddr)
378 return nil
379 }
380
381 // temporary ban
382 if time.Now().Before(*banUntil) {
383 sendBanMessage(rwc, "You are temporarily banned on this server")
384 s.Logger.Debug("Disconnecting temporarily banned IP", "remoteAddr", ipAddr)
385 return nil
386 }
387 }
388
389 // Create a new scanner for parsing incoming bytes into transaction tokens
390 scanner := bufio.NewScanner(rwc)
391 scanner.Split(transactionScanner)
392
393 scanner.Scan()
394
395 // Make a new []byte slice and copy the scanner bytes to it. This is critical to avoid a data race as the
396 // scanner re-uses the buffer for subsequent scans.
397 buf := make([]byte, len(scanner.Bytes()))
398 copy(buf, scanner.Bytes())
399
400 var clientLogin Transaction
401 if _, err := clientLogin.Write(buf); err != nil {
402 return fmt.Errorf("error writing login transaction: %w", err)
403 }
404
405 c := s.NewClientConn(rwc, remoteAddr)
406 defer c.Disconnect()
407
408 encodedPassword := clientLogin.GetField(FieldUserPassword).Data
409 c.Version = clientLogin.GetField(FieldVersion).Data
410
411 login := clientLogin.GetField(FieldUserLogin).DecodeObfuscatedString()
412 if login == "" {
413 login = GuestAccount
414 }
415
416 c.Logger = s.Logger.With("ip", ipAddr, "login", login)
417
418 // If authentication fails, send error reply and close connection
419 if !c.Authenticate(login, encodedPassword) {
420 t := c.NewErrReply(&clientLogin, "Incorrect login.")[0]
421
422 _, err := io.Copy(rwc, &t)
423 if err != nil {
424 return err
425 }
426
427 c.Logger.Info("Incorrect login")
428
429 return nil
430 }
431
432 if clientLogin.GetField(FieldUserIconID).Data != nil {
433 c.Icon = clientLogin.GetField(FieldUserIconID).Data
434 }
435
436 c.Account = c.Server.AccountManager.Get(login)
437 if c.Account == nil {
438 return nil
439 }
440
441 if clientLogin.GetField(FieldUserName).Data != nil {
442 if c.Authorize(AccessAnyName) {
443 c.UserName = clientLogin.GetField(FieldUserName).Data
444 } else {
445 c.UserName = []byte(c.Account.Name)
446 }
447 }
448
449 if c.Authorize(AccessDisconUser) {
450 c.Flags.Set(UserFlagAdmin, 1)
451 }
452
453 s.outbox <- c.NewReply(&clientLogin,
454 NewField(FieldVersion, []byte{0x00, 0xbe}),
455 NewField(FieldCommunityBannerID, []byte{0, 0}),
456 NewField(FieldServerName, []byte(s.Config.Name)),
457 )
458
459 // Send user access privs so client UI knows how to behave
460 c.Server.outbox <- NewTransaction(TranUserAccess, c.ID, NewField(FieldUserAccess, c.Account.Access[:]))
461
462 // Accounts with AccessNoAgreement do not receive the server agreement on login. The behavior is different between
463 // client versions. For 1.2.3 client, we do not send TranShowAgreement. For other client versions, we send
464 // TranShowAgreement but with the NoServerAgreement field set to 1.
465 if c.Authorize(AccessNoAgreement) {
466 // If client version is nil, then the client uses the 1.2.3 login behavior
467 if c.Version != nil {
468 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldNoServerAgreement, []byte{1}))
469 }
470 } else {
471 _, _ = c.Server.Agreement.Seek(0, 0)
472 data, _ := io.ReadAll(c.Server.Agreement)
473
474 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldData, data))
475 }
476
477 // If the client has provided a username as part of the login, we can infer that it is using the 1.2.3 login
478 // flow and not the 1.5+ flow.
479 if len(c.UserName) != 0 {
480 // Add the client username to the logger. For 1.5+ clients, we don't have this information yet as it comes as
481 // part of TranAgreed
482 c.Logger = c.Logger.With("name", string(c.UserName))
483 c.Logger.Info("Login successful")
484
485 // Notify other clients on the server that the new user has logged in. For 1.5+ clients we don't have this
486 // information yet, so we do it in TranAgreed instead
487 for _, t := range c.NotifyOthers(
488 NewTransaction(
489 TranNotifyChangeUser, [2]byte{0, 0},
490 NewField(FieldUserName, c.UserName),
491 NewField(FieldUserID, c.ID[:]),
492 NewField(FieldUserIconID, c.Icon),
493 NewField(FieldUserFlags, c.Flags[:]),
494 ),
495 ) {
496 c.Server.outbox <- t
497 }
498 }
499
500 c.Server.Stats.Increment(StatConnectionCounter, StatCurrentlyConnected)
501 defer c.Server.Stats.Decrement(StatCurrentlyConnected)
502
503 if len(s.ClientMgr.List()) > c.Server.Stats.Get(StatConnectionPeak) {
504 c.Server.Stats.Set(StatConnectionPeak, len(s.ClientMgr.List()))
505 }
506
507 // Scan for new transactions and handle them as they come in.
508 for scanner.Scan() {
509 // Copy the scanner bytes to a new slice to it to avoid a data race when the scanner re-uses the buffer.
510 tmpBuf := make([]byte, len(scanner.Bytes()))
511 copy(tmpBuf, scanner.Bytes())
512
513 var t Transaction
514 if _, err := t.Write(tmpBuf); err != nil {
515 return err
516 }
517
518 c.handleTransaction(t)
519 }
520 return nil
521 }
522
523 // handleFileTransfer receives a client net.Conn from the file transfer server, performs the requested transfer type, then closes the connection
524 func (s *Server) handleFileTransfer(ctx context.Context, rwc io.ReadWriter) error {
525 defer dontPanic(s.Logger)
526
527 // The first 16 bytes contain the file transfer.
528 var t transfer
529 if _, err := io.CopyN(&t, rwc, 16); err != nil {
530 return fmt.Errorf("error reading file transfer: %w", err)
531 }
532
533 fileTransfer := s.FileTransferMgr.Get(t.ReferenceNumber)
534 if fileTransfer == nil {
535 return errors.New("invalid transaction ID")
536 }
537
538 defer func() {
539 s.FileTransferMgr.Delete(t.ReferenceNumber)
540
541 // Wait a few seconds before closing the connection: this is a workaround for problems
542 // observed with Windows clients where the client must initiate close of the TCP connection before
543 // the server does. This is gross and seems unnecessary. TODO: Revisit?
544 time.Sleep(3 * time.Second)
545 }()
546
547 rLogger := s.Logger.With(
548 "remoteAddr", ctx.Value(contextKeyReq).(requestCtx).remoteAddr,
549 "login", fileTransfer.ClientConn.Account.Login,
550 "Name", string(fileTransfer.ClientConn.UserName),
551 )
552
553 fullPath, err := ReadPath(fileTransfer.FileRoot, fileTransfer.FilePath, fileTransfer.FileName)
554 if err != nil {
555 return err
556 }
557
558 switch fileTransfer.Type {
559 case BannerDownload:
560 if _, err := io.Copy(rwc, bytes.NewBuffer(s.Banner)); err != nil {
561 return fmt.Errorf("banner download: %w", err)
562 }
563 case FileDownload:
564 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
565 defer func() {
566 s.Stats.Decrement(StatDownloadsInProgress)
567 }()
568
569 err = DownloadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, true)
570 if err != nil {
571 return fmt.Errorf("file download: %w", err)
572 }
573
574 case FileUpload:
575 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
576 defer func() {
577 s.Stats.Decrement(StatUploadsInProgress)
578 }()
579
580 err = UploadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
581 if err != nil {
582 return fmt.Errorf("file upload: %w", err)
583 }
584
585 case FolderDownload:
586 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
587 defer func() {
588 s.Stats.Decrement(StatDownloadsInProgress)
589 }()
590
591 err = DownloadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
592 if err != nil {
593 return fmt.Errorf("folder download: %w", err)
594 }
595
596 case FolderUpload:
597 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
598 defer func() {
599 s.Stats.Decrement(StatUploadsInProgress)
600 }()
601
602 rLogger.Info(
603 "Folder upload started",
604 "dstPath", fullPath,
605 "TransferSize", binary.BigEndian.Uint32(fileTransfer.TransferSize),
606 "FolderItemCount", fileTransfer.FolderItemCount,
607 )
608
609 err = UploadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
610 if err != nil {
611 return fmt.Errorf("folder upload: %w", err)
612 }
613 }
614 return nil
615 }
616
617 func (s *Server) SendAll(t TranType, fields ...Field) {
618 for _, c := range s.ClientMgr.List() {
619 s.outbox <- NewTransaction(t, c.ID, fields...)
620 }
621 }
622
623 func (s *Server) Shutdown(msg []byte) {
624 s.Logger.Info("Shutdown signal received")
625 s.SendAll(TranDisconnectMsg, NewField(FieldData, msg))
626
627 time.Sleep(3 * time.Second)
628
629 os.Exit(0)
630 }