X-Git-Url: https://git.r.bdr.sh/rbdr/mobius/blobdiff_plain/d005ef04cfaa26943e6dd33807d741577ffb232a..95159e5585762c06c654945070ba54262b7dcec9:/hotline/transaction.go diff --git a/hotline/transaction.go b/hotline/transaction.go index b01fdfb..7883bfb 100644 --- a/hotline/transaction.go +++ b/hotline/transaction.go @@ -1,12 +1,14 @@ package hotline import ( + "bufio" "bytes" "encoding/binary" "errors" "fmt" - "github.com/jhalter/mobius/concat" + "io" "math/rand" + "slices" ) const ( @@ -71,8 +73,6 @@ const ( ) type Transaction struct { - clientID *[]byte - Flags byte // Reserved (should be 0) IsReply byte // Request (0) or reply (1) Type []byte // Requested operation (user defined) @@ -82,6 +82,9 @@ type Transaction struct { DataSize []byte // Size of data in this transaction part. This allows splitting large transactions into smaller parts. ParamCount []byte // Number of the parameters for this transaction Fields []Field + + clientID *[]byte // Internal identifier for target client + readOffset int // Internal offset to track read progress } func NewTransaction(t int, clientID *[]byte, fields ...Field) *Transaction { @@ -113,9 +116,19 @@ func (t *Transaction) Write(p []byte) (n int, err error) { if tranLen > len(p) { return n, errors.New("buflen too small for tranLen") } - fields, err := ReadFields(p[20:22], p[22:tranLen]) - if err != nil { - return n, err + + // Create a new scanner for parsing incoming bytes into transaction tokens + scanner := bufio.NewScanner(bytes.NewReader(p[22:tranLen])) + scanner.Split(fieldScanner) + + for i := 0; i < int(binary.BigEndian.Uint16(p[20:22])); i++ { + scanner.Scan() + + var field Field + if _, err := field.Write(scanner.Bytes()); err != nil { + return 0, fmt.Errorf("error reading field: %w", err) + } + t.Fields = append(t.Fields, field) } t.Flags = p[0] @@ -126,7 +139,6 @@ func (t *Transaction) Write(p []byte) (n int, err error) { t.TotalSize = p[12:16] t.DataSize = p[16:20] t.ParamCount = p[20:22] - t.Fields = fields return len(p), err } @@ -177,8 +189,8 @@ func ReadFields(paramCount []byte, buf []byte) ([]Field, error) { } fields = append(fields, Field{ - ID: fieldID, - FieldSize: fieldSize, + ID: [2]byte(fieldID), + FieldSize: [2]byte(fieldSize), Data: buf[4 : 4+fieldSizeInt], }) @@ -192,18 +204,23 @@ func ReadFields(paramCount []byte, buf []byte) ([]Field, error) { return fields, nil } -func (t *Transaction) MarshalBinary() (data []byte, err error) { +// Read implements the io.Reader interface for Transaction +func (t *Transaction) Read(p []byte) (int, error) { payloadSize := t.Size() fieldCount := make([]byte, 2) binary.BigEndian.PutUint16(fieldCount, uint16(len(t.Fields))) - var fieldPayload []byte + bbuf := new(bytes.Buffer) + for _, field := range t.Fields { - fieldPayload = append(fieldPayload, field.Payload()...) + _, err := bbuf.ReadFrom(&field) + if err != nil { + return 0, fmt.Errorf("error reading field: %w", err) + } } - return concat.Slices( + buf := slices.Concat( []byte{t.Flags, t.IsReply}, t.Type, t.ID, @@ -211,8 +228,17 @@ func (t *Transaction) MarshalBinary() (data []byte, err error) { payloadSize, payloadSize, // this is the dataSize field, but seeming the same as totalSize fieldCount, - fieldPayload, - ), err + bbuf.Bytes(), + ) + + if t.readOffset >= len(buf) { + return 0, io.EOF // All bytes have been read + } + + n := copy(p, buf[t.readOffset:]) + t.readOffset += n + + return n, nil } // Size returns the total size of the transaction payload @@ -231,7 +257,7 @@ func (t *Transaction) Size() []byte { func (t *Transaction) GetField(id int) Field { for _, field := range t.Fields { - if id == int(binary.BigEndian.Uint16(field.ID)) { + if id == int(binary.BigEndian.Uint16(field.ID[:])) { return field } }