]> git.r.bdr.sh - rbdr/mobius/blob - hotline/server.go
Add initial HTTP API endpoints
[rbdr/mobius] / hotline / server.go
1 package hotline
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "crypto/rand"
8 "encoding/binary"
9 "errors"
10 "fmt"
11 "golang.org/x/text/encoding/charmap"
12 "io"
13 "log"
14 "log/slog"
15 "net"
16 "os"
17 "strings"
18 "sync"
19 "time"
20 )
21
22 type contextKey string
23
24 var contextKeyReq = contextKey("req")
25
26 type requestCtx struct {
27 remoteAddr string
28 }
29
30 // Converts bytes from Mac Roman encoding to UTF-8
31 var txtDecoder = charmap.Macintosh.NewDecoder()
32
33 // Converts bytes from UTF-8 to Mac Roman encoding
34 var txtEncoder = charmap.Macintosh.NewEncoder()
35
36 type Server struct {
37 NetInterface string
38 Port int
39
40 handlers map[TranType]HandlerFunc
41
42 Config Config
43 Logger *slog.Logger
44
45 TrackerPassID [4]byte
46
47 Stats Counter
48
49 FS FileStore // Storage backend to use for File storage
50
51 outbox chan Transaction
52
53 Agreement io.ReadSeeker
54 Banner []byte
55
56 FileTransferMgr FileTransferMgr
57 ChatMgr ChatManager
58 ClientMgr ClientManager
59 AccountManager AccountManager
60 ThreadedNewsMgr ThreadedNewsMgr
61 BanList BanMgr
62
63 MessageBoard io.ReadWriteSeeker
64 }
65
66 type Option = func(s *Server)
67
68 func WithConfig(config Config) func(s *Server) {
69 return func(s *Server) {
70 s.Config = config
71 }
72 }
73
74 func WithLogger(logger *slog.Logger) func(s *Server) {
75 return func(s *Server) {
76 s.Logger = logger
77 }
78 }
79
80 // WithPort optionally overrides the default TCP port.
81 func WithPort(port int) func(s *Server) {
82 return func(s *Server) {
83 s.Port = port
84 }
85 }
86
87 // WithInterface optionally sets a specific interface to listen on.
88 func WithInterface(netInterface string) func(s *Server) {
89 return func(s *Server) {
90 s.NetInterface = netInterface
91 }
92 }
93
94 type ServerConfig struct {
95 }
96
97 func NewServer(options ...Option) (*Server, error) {
98 server := Server{
99 handlers: make(map[TranType]HandlerFunc),
100 outbox: make(chan Transaction),
101 FS: &OSFileStore{},
102 ChatMgr: NewMemChatManager(),
103 ClientMgr: NewMemClientMgr(),
104 FileTransferMgr: NewMemFileTransferMgr(),
105 Stats: NewStats(),
106 }
107
108 for _, opt := range options {
109 opt(&server)
110 }
111
112 // generate a new random passID for tracker registration
113 _, err := rand.Read(server.TrackerPassID[:])
114 if err != nil {
115 return nil, err
116 }
117
118 return &server, nil
119 }
120
121 func (s *Server) CurrentStats() map[string]interface{} {
122 return s.Stats.Values()
123 }
124
125 func (s *Server) ListenAndServe(ctx context.Context) error {
126 go s.registerWithTrackers(ctx)
127 go s.keepaliveHandler(ctx)
128 go s.processOutbox()
129
130 var wg sync.WaitGroup
131
132 wg.Add(1)
133 go func() {
134 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port))
135 if err != nil {
136 log.Fatal(err)
137 }
138
139 log.Fatal(s.Serve(ctx, ln))
140 }()
141
142 wg.Add(1)
143 go func() {
144 ln, err := net.Listen("tcp", fmt.Sprintf("%s:%v", s.NetInterface, s.Port+1))
145 if err != nil {
146 log.Fatal(err)
147 }
148
149 log.Fatal(s.ServeFileTransfers(ctx, ln))
150 }()
151
152 wg.Wait()
153
154 return nil
155 }
156
157 func (s *Server) ServeFileTransfers(ctx context.Context, ln net.Listener) error {
158 for {
159 conn, err := ln.Accept()
160 if err != nil {
161 return err
162 }
163
164 go func() {
165 defer func() { _ = conn.Close() }()
166
167 err = s.handleFileTransfer(
168 context.WithValue(ctx, contextKeyReq, requestCtx{remoteAddr: conn.RemoteAddr().String()}),
169 conn,
170 )
171
172 if err != nil {
173 s.Logger.Error("file transfer error", "reason", err)
174 }
175 }()
176 }
177 }
178
179 func (s *Server) sendTransaction(t Transaction) error {
180 client := s.ClientMgr.Get(t.ClientID)
181
182 if client == nil {
183 return nil
184 }
185
186 _, err := io.Copy(client.Connection, &t)
187 if err != nil {
188 return fmt.Errorf("failed to send transaction to client %v: %v", t.ClientID, err)
189 }
190
191 return nil
192 }
193
194 func (s *Server) processOutbox() {
195 for {
196 t := <-s.outbox
197 go func() {
198 if err := s.sendTransaction(t); err != nil {
199 s.Logger.Error("error sending transaction", "err", err)
200 }
201 }()
202 }
203 }
204
205 func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
206 for {
207 select {
208 case <-ctx.Done():
209 s.Logger.Info("Server shutting down")
210 return ctx.Err()
211 default:
212 conn, err := ln.Accept()
213 if err != nil {
214 s.Logger.Error("Error accepting connection", "err", err)
215 continue
216 }
217
218 go func() {
219 connCtx := context.WithValue(ctx, contextKeyReq, requestCtx{
220 remoteAddr: conn.RemoteAddr().String(),
221 })
222
223 s.Logger.Info("Connection established", "addr", conn.RemoteAddr())
224 defer conn.Close()
225
226 if err := s.handleNewConnection(connCtx, conn, conn.RemoteAddr().String()); err != nil {
227 if err == io.EOF {
228 s.Logger.Info("Client disconnected", "RemoteAddr", conn.RemoteAddr())
229 } else {
230 s.Logger.Error("Error serving request", "RemoteAddr", conn.RemoteAddr(), "err", err)
231 }
232 }
233 }()
234 }
235 }
236 }
237
238 // time in seconds between tracker re-registration
239 const trackerUpdateFrequency = 300
240
241 // registerWithTrackers runs every trackerUpdateFrequency seconds to update the server's tracker entry on all configured
242 // trackers.
243 func (s *Server) registerWithTrackers(ctx context.Context) {
244 ticker := time.NewTicker(trackerUpdateFrequency * time.Second)
245 defer ticker.Stop()
246
247 for {
248 select {
249 case <-ctx.Done():
250 return
251 case <-ticker.C:
252 if s.Config.EnableTrackerRegistration {
253 tr := &TrackerRegistration{
254 UserCount: len(s.ClientMgr.List()),
255 PassID: s.TrackerPassID,
256 Name: s.Config.Name,
257 Description: s.Config.Description,
258 }
259 binary.BigEndian.PutUint16(tr.Port[:], uint16(s.Port))
260
261 for _, t := range s.Config.Trackers {
262 if err := register(&RealDialer{}, t, tr); err != nil {
263 s.Logger.Error(fmt.Sprintf("Unable to register with tracker %v", t), "error", err)
264 }
265 }
266 }
267 }
268 }
269
270 }
271
272 const (
273 userIdleSeconds = 300 // time in seconds before an inactive user is marked idle
274 idleCheckInterval = 10 // time in seconds to check for idle users
275 )
276
277 // keepaliveHandler runs every idleCheckInterval seconds and increments a user's idle time by idleCheckInterval seconds.
278 // If the updated idle time exceeds userIdleSeconds and the user was not previously idle, we notify all connected clients
279 // that the user has gone idle. For most clients, this turns the user grey in the user list.
280 func (s *Server) keepaliveHandler(ctx context.Context) {
281 ticker := time.NewTicker(idleCheckInterval * time.Second)
282 defer ticker.Stop()
283
284 for {
285 select {
286 case <-ctx.Done():
287 return
288 case <-ticker.C:
289 for _, c := range s.ClientMgr.List() {
290 c.mu.Lock()
291 c.IdleTime += idleCheckInterval
292
293 // Check if the user
294 if c.IdleTime > userIdleSeconds && !c.Flags.IsSet(UserFlagAway) {
295 c.Flags.Set(UserFlagAway, 1)
296
297 c.SendAll(
298 TranNotifyChangeUser,
299 NewField(FieldUserID, c.ID[:]),
300 NewField(FieldUserFlags, c.Flags[:]),
301 NewField(FieldUserName, c.UserName),
302 NewField(FieldUserIconID, c.Icon),
303 )
304 }
305 c.mu.Unlock()
306 }
307 }
308 }
309 }
310
311 func (s *Server) NewClientConn(conn io.ReadWriteCloser, remoteAddr string) *ClientConn {
312 clientConn := &ClientConn{
313 Icon: []byte{0, 0}, // TODO: make array type
314 Connection: conn,
315 Server: s,
316 RemoteAddr: remoteAddr,
317
318 ClientFileTransferMgr: NewClientFileTransferMgr(),
319 }
320
321 s.ClientMgr.Add(clientConn)
322
323 return clientConn
324 }
325
326 func sendBanMessage(rwc io.Writer, message string) {
327 t := NewTransaction(
328 TranServerMsg,
329 [2]byte{0, 0},
330 NewField(FieldData, []byte(message)),
331 NewField(FieldChatOptions, []byte{0, 0}),
332 )
333 _, _ = io.Copy(rwc, &t)
334 time.Sleep(1 * time.Second)
335 }
336
337 // handleNewConnection takes a new net.Conn and performs the initial login sequence
338 func (s *Server) handleNewConnection(ctx context.Context, rwc io.ReadWriteCloser, remoteAddr string) error {
339 defer dontPanic(s.Logger)
340
341 if err := performHandshake(rwc); err != nil {
342 return fmt.Errorf("perform handshake: %w", err)
343 }
344
345 // Check if remoteAddr is present in the ban list
346 ipAddr := strings.Split(remoteAddr, ":")[0]
347 if isBanned, banUntil := s.BanList.IsBanned(ipAddr); isBanned {
348 // permaban
349 if banUntil == nil {
350 sendBanMessage(rwc, "You are permanently banned on this server")
351 s.Logger.Debug("Disconnecting permanently banned IP", "remoteAddr", ipAddr)
352 return nil
353 }
354
355 // temporary ban
356 if time.Now().Before(*banUntil) {
357 sendBanMessage(rwc, "You are temporarily banned on this server")
358 s.Logger.Debug("Disconnecting temporarily banned IP", "remoteAddr", ipAddr)
359 return nil
360 }
361 }
362
363 // Create a new scanner for parsing incoming bytes into transaction tokens
364 scanner := bufio.NewScanner(rwc)
365 scanner.Split(transactionScanner)
366
367 scanner.Scan()
368
369 // Make a new []byte slice and copy the scanner bytes to it. This is critical to avoid a data race as the
370 // scanner re-uses the buffer for subsequent scans.
371 buf := make([]byte, len(scanner.Bytes()))
372 copy(buf, scanner.Bytes())
373
374 var clientLogin Transaction
375 if _, err := clientLogin.Write(buf); err != nil {
376 return fmt.Errorf("error writing login transaction: %w", err)
377 }
378
379 c := s.NewClientConn(rwc, remoteAddr)
380 defer c.Disconnect()
381
382 encodedPassword := clientLogin.GetField(FieldUserPassword).Data
383 c.Version = clientLogin.GetField(FieldVersion).Data
384
385 login := clientLogin.GetField(FieldUserLogin).DecodeObfuscatedString()
386 if login == "" {
387 login = GuestAccount
388 }
389
390 c.Logger = s.Logger.With("ip", ipAddr, "login", login)
391
392 // If authentication fails, send error reply and close connection
393 if !c.Authenticate(login, encodedPassword) {
394 t := c.NewErrReply(&clientLogin, "Incorrect login.")[0]
395
396 _, err := io.Copy(rwc, &t)
397 if err != nil {
398 return err
399 }
400
401 c.Logger.Info("Login failed", "clientVersion", fmt.Sprintf("%x", c.Version))
402
403 return nil
404 }
405
406 if clientLogin.GetField(FieldUserIconID).Data != nil {
407 c.Icon = clientLogin.GetField(FieldUserIconID).Data
408 }
409
410 c.Account = c.Server.AccountManager.Get(login)
411 if c.Account == nil {
412 return nil
413 }
414
415 if clientLogin.GetField(FieldUserName).Data != nil {
416 if c.Authorize(AccessAnyName) {
417 c.UserName = clientLogin.GetField(FieldUserName).Data
418 } else {
419 c.UserName = []byte(c.Account.Name)
420 }
421 }
422
423 if c.Authorize(AccessDisconUser) {
424 c.Flags.Set(UserFlagAdmin, 1)
425 }
426
427 s.outbox <- c.NewReply(&clientLogin,
428 NewField(FieldVersion, []byte{0x00, 0xbe}),
429 NewField(FieldCommunityBannerID, []byte{0, 0}),
430 NewField(FieldServerName, []byte(s.Config.Name)),
431 )
432
433 // Send user access privs so client UI knows how to behave
434 c.Server.outbox <- NewTransaction(TranUserAccess, c.ID, NewField(FieldUserAccess, c.Account.Access[:]))
435
436 // Accounts with AccessNoAgreement do not receive the server agreement on login. The behavior is different between
437 // client versions. For 1.2.3 client, we do not send TranShowAgreement. For other client versions, we send
438 // TranShowAgreement but with the NoServerAgreement field set to 1.
439 if c.Authorize(AccessNoAgreement) {
440 // If client version is nil, then the client uses the 1.2.3 login behavior
441 if c.Version != nil {
442 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldNoServerAgreement, []byte{1}))
443 }
444 } else {
445 _, _ = c.Server.Agreement.Seek(0, 0)
446 data, _ := io.ReadAll(c.Server.Agreement)
447
448 c.Server.outbox <- NewTransaction(TranShowAgreement, c.ID, NewField(FieldData, data))
449 }
450
451 // If the client has provided a username as part of the login, we can infer that it is using the 1.2.3 login
452 // flow and not the 1.5+ flow.
453 if len(c.UserName) != 0 {
454 // Add the client username to the logger. For 1.5+ clients, we don't have this information yet as it comes as
455 // part of TranAgreed
456 c.Logger = c.Logger.With("name", string(c.UserName))
457 c.Logger.Info("Login successful")
458
459 // Notify other clients on the server that the new user has logged in. For 1.5+ clients we don't have this
460 // information yet, so we do it in TranAgreed instead
461 for _, t := range c.NotifyOthers(
462 NewTransaction(
463 TranNotifyChangeUser, [2]byte{0, 0},
464 NewField(FieldUserName, c.UserName),
465 NewField(FieldUserID, c.ID[:]),
466 NewField(FieldUserIconID, c.Icon),
467 NewField(FieldUserFlags, c.Flags[:]),
468 ),
469 ) {
470 c.Server.outbox <- t
471 }
472 }
473
474 c.Server.Stats.Increment(StatConnectionCounter, StatCurrentlyConnected)
475 defer c.Server.Stats.Decrement(StatCurrentlyConnected)
476
477 if len(s.ClientMgr.List()) > c.Server.Stats.Get(StatConnectionPeak) {
478 c.Server.Stats.Set(StatConnectionPeak, len(s.ClientMgr.List()))
479 }
480
481 // Scan for new transactions and handle them as they come in.
482 for scanner.Scan() {
483 // Copy the scanner bytes to a new slice to it to avoid a data race when the scanner re-uses the buffer.
484 tmpBuf := make([]byte, len(scanner.Bytes()))
485 copy(tmpBuf, scanner.Bytes())
486
487 var t Transaction
488 if _, err := t.Write(tmpBuf); err != nil {
489 return err
490 }
491
492 c.handleTransaction(t)
493 }
494 return nil
495 }
496
497 // handleFileTransfer receives a client net.Conn from the file transfer server, performs the requested transfer type, then closes the connection
498 func (s *Server) handleFileTransfer(ctx context.Context, rwc io.ReadWriter) error {
499 defer dontPanic(s.Logger)
500
501 // The first 16 bytes contain the file transfer.
502 var t transfer
503 if _, err := io.CopyN(&t, rwc, 16); err != nil {
504 return fmt.Errorf("error reading file transfer: %w", err)
505 }
506
507 fileTransfer := s.FileTransferMgr.Get(t.ReferenceNumber)
508 if fileTransfer == nil {
509 return errors.New("invalid transaction ID")
510 }
511
512 defer func() {
513 s.FileTransferMgr.Delete(t.ReferenceNumber)
514
515 // Wait a few seconds before closing the connection: this is a workaround for problems
516 // observed with Windows clients where the client must initiate close of the TCP connection before
517 // the server does. This is gross and seems unnecessary. TODO: Revisit?
518 time.Sleep(3 * time.Second)
519 }()
520
521 rLogger := s.Logger.With(
522 "remoteAddr", ctx.Value(contextKeyReq).(requestCtx).remoteAddr,
523 "login", fileTransfer.ClientConn.Account.Login,
524 "Name", string(fileTransfer.ClientConn.UserName),
525 )
526
527 fullPath, err := ReadPath(s.Config.FileRoot, fileTransfer.FilePath, fileTransfer.FileName)
528 if err != nil {
529 return err
530 }
531
532 switch fileTransfer.Type {
533 case BannerDownload:
534 if _, err := io.Copy(rwc, bytes.NewBuffer(s.Banner)); err != nil {
535 return fmt.Errorf("error sending Banner: %w", err)
536 }
537 case FileDownload:
538 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
539 defer func() {
540 s.Stats.Decrement(StatDownloadsInProgress)
541 }()
542
543 err = DownloadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, true)
544 if err != nil {
545 return fmt.Errorf("file download: %w", err)
546 }
547
548 case FileUpload:
549 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
550 defer func() {
551 s.Stats.Decrement(StatUploadsInProgress)
552 }()
553
554 err = UploadHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
555 if err != nil {
556 return fmt.Errorf("file upload error: %w", err)
557 }
558
559 case FolderDownload:
560 s.Stats.Increment(StatDownloadCounter, StatDownloadsInProgress)
561 defer func() {
562 s.Stats.Decrement(StatDownloadsInProgress)
563 }()
564
565 err = DownloadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
566 if err != nil {
567 return fmt.Errorf("file upload error: %w", err)
568 }
569
570 case FolderUpload:
571 s.Stats.Increment(StatUploadCounter, StatUploadsInProgress)
572 defer func() {
573 s.Stats.Decrement(StatUploadsInProgress)
574 }()
575
576 rLogger.Info(
577 "Folder upload started",
578 "dstPath", fullPath,
579 "TransferSize", binary.BigEndian.Uint32(fileTransfer.TransferSize),
580 "FolderItemCount", fileTransfer.FolderItemCount,
581 )
582
583 err = UploadFolderHandler(rwc, fullPath, fileTransfer, s.FS, rLogger, s.Config.PreserveResourceForks)
584 if err != nil {
585 return fmt.Errorf("file upload error: %w", err)
586 }
587 }
588 return nil
589 }
590
591 func (s *Server) SendAll(t TranType, fields ...Field) {
592 for _, c := range s.ClientMgr.List() {
593 s.outbox <- NewTransaction(t, c.ID, fields...)
594 }
595 }
596
597 func (s *Server) Shutdown(msg []byte) {
598 s.Logger.Info("Shutdown signal received")
599 s.SendAll(TranDisconnectMsg, NewField(FieldData, msg))
600
601 time.Sleep(3 * time.Second)
602
603 os.Exit(0)
604 }