diff --git a/.gitignore b/.gitignore index 0e4eb24..23372a4 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ *.so *.dylib main -dsdemo +dsapp dsrelay # Test binary, built with `go test -c` diff --git a/Makefile b/Makefile index 0484038..7c9283a 100644 --- a/Makefile +++ b/Makefile @@ -17,9 +17,9 @@ install-linter: ## Installs the linter lint: ## Runs the linter export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/golangci-lint run -.PHONY: build-dsdemo -build-dsdemo: ## Builds datastream demo cli app (server, client, relay) - go build -o dsdemo cmd/main.go +.PHONY: build-dsapp +build-dsapp: ## Builds datastream demo cli app (server, client, relay) + go build -o dsapp cmd/main.go .PHONY: build-dsrelay build-dsrelay: ## Builds datastream relay cli app diff --git a/README.md b/README.md index cb4b40a..aa6e7bd 100644 --- a/README.md +++ b/README.md @@ -176,20 +176,20 @@ Stream relay server included in the datastream library allows scaling the number - ExecCommand(datastreamer.CmdBookmark) -> gets entry data pointed by bookmark and fills the `.Entry` field ## DATASTREAM CLI DEMO APP -Build the binary datastream demo app (`dsdemo`): +Build the binary datastream demo app (`dsapp`): ``` -make build +make build-dsapp ``` Run the app without parameters to see the available commands: ``` -./dsdemo +./dsapp ``` ``` NAME: - dsdemo - Run a datastream server/client/relay demo cli app + dsapp - Run a datastream server/client/relay demo cli app USAGE: - dsdemo [global options] command [command options] [arguments...] + dsapp [global options] command [command options] [arguments...] COMMANDS: server Run datastream server @@ -203,14 +203,14 @@ GLOBAL OPTIONS: ### SERVER Use the help option to check available parameters for the server command: ``` -./dsdemo help server +./dsapp help server ``` ``` NAME: - dsdemo server - Run datastream server + dsapp server - Run datastream server USAGE: - dsdemo server [command options] [arguments...] + dsapp server [command options] [arguments...] OPTIONS: --port value exposed port for clients to connect (default: 6900) @@ -222,28 +222,29 @@ OPTIONS: ``` Run a datastream server with default parameters (port: `6900`, file: `datastream.bin`, log: `info`): ``` -./dsdemo server +./dsapp server ``` Or run a datastream server with custom parameters: ``` -./dsdemo server --port 6969 --file seqstream.bin --log warn +./dsapp server --port 6969 --file seqstream.bin --log warn ``` ### CLIENT Use the help option to check available parameters for the client command: ``` -./dsdemo help client +./dsapp help client ``` ``` NAME: - dsdemo client - Run datastream client + dsapp client - Run datastream client USAGE: - dsdemo client [command options] [arguments...] + dsapp client [command options] [arguments...] OPTIONS: --server value datastream server address to connect (IP:port) (default: 127.0.0.1:6900) --from value entry number to start the sync/streaming from (latest|0..N) (default: latest) --frombookmark value bookmark to start the sync/streaming from (0..N) (has preference over --from parameter) + --header query file header information (default: false) --entry value entry number to query data (0..N) --bookmark value entry bookmark to query entry data pointed by it (0..N) --log value log level (debug|info|warn|error) (default: info) @@ -251,23 +252,27 @@ OPTIONS: ``` Run a datastream client with default parameters (server: `127.0.0.1:6900`, from: `latest`, log: `info`) ``` -./dsdemo client +./dsapp client ``` Or run a datastream client with custom parameters: ``` -./dsdemo client --server 127.0.0.1:6969 --from 0 --log debug +./dsapp client --server 127.0.0.1:6969 --from 0 --log debug +``` +Or just get the current stream header file information: +``` +./dsapp client --server 127.0.0.1:6969 --header ``` ### RELAY Use the help option to check available parameters for the relay command: ``` -./dsdemo help relay +./dsapp help relay ``` ``` NAME: - dsdemo relay - Run datastream relay + dsapp relay - Run datastream relay USAGE: - dsdemo relay [command options] [arguments...] + dsapp relay [command options] [arguments...] OPTIONS: --server value datastream server address to connect (IP:port) (default: 127.0.0.1:6900) @@ -278,7 +283,7 @@ OPTIONS: ``` Run a datastream relay with default parameters (server: `127.0.0.1:6900`, port: `7900`, file: `datarelay.bin`, log: `info`) ``` -./dsdemo relay +./dsapp relay ``` ## USE CASE: zkEVM SEQUENCER ENTRIES diff --git a/cmd/main.go b/cmd/main.go index 678e571..1972b60 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -96,6 +96,11 @@ func main() { Usage: "bookmark to start the sync/streaming from (0..N) (has preference over --from parameter)", Value: "none", }, + &cli.BoolFlag{ + Name: "header", + Usage: "query file header information", + Value: false, + }, &cli.StringFlag{ Name: "entry", Usage: "entry number to query data (0..N)", @@ -340,6 +345,7 @@ func runClient(ctx *cli.Context) error { } from := ctx.String("from") fromBookmark := ctx.String("frombookmark") + queryHeader := ctx.Bool("header") queryEntry := ctx.String("entry") queryBookmark := ctx.String("bookmark") @@ -358,6 +364,17 @@ func runClient(ctx *cli.Context) error { return err } + // Query file header information + if queryHeader { + err = c.ExecCommand(datastreamer.CmdHeader) + if err != nil { + log.Infof("Error: %v", err) + } else { + log.Infof("QUERY HEADER: TotalEntries[%d] TotalLength[%d]", c.Header.TotalEntries, c.Header.TotalLength) + } + return nil + } + // Query entry option if queryEntry != "none" { qEntry, err := strconv.Atoi(queryEntry) diff --git a/datastreamer/datastreamer_test.go b/datastreamer/datastreamer_test.go index 6c2dfce..efbc676 100644 --- a/datastreamer/datastreamer_test.go +++ b/datastreamer/datastreamer_test.go @@ -2,6 +2,7 @@ package datastreamer_test import ( "encoding/binary" + "errors" "fmt" "os" "strings" @@ -22,6 +23,13 @@ type TestEntry struct { type TestBookmark struct { FieldA []byte } +type TestHeader struct { + PacketType uint8 + HeadLength uint32 + StreamType uint64 + TotalLength uint64 + TotalEntries uint64 +} func (t TestEntry) Encode() []byte { bytes := make([]byte, 0) @@ -52,21 +60,59 @@ var ( Outputs: []string{"stdout"}, }, } - leveldb = config.Filename[0:strings.IndexRune(config.Filename, '.')] + ".db" - streamServer *datastreamer.StreamServer - streamType = datastreamer.StreamType(1) - testEntryType = datastreamer.EntryType(1) - - testEntry = TestEntry{ - FieldA: 123, - FieldB: common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), - FieldC: []byte("test entry 1"), + leveldb = config.Filename[0:strings.IndexRune(config.Filename, '.')] + ".db" + streamServer *datastreamer.StreamServer + streamType = datastreamer.StreamType(1) + entryType1 = datastreamer.EntryType(1) + entryType2 = datastreamer.EntryType(2) + + testEntries = []TestEntry{ + { + FieldA: 0, + FieldB: common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), + FieldC: []byte("test entry 0"), + }, + { + FieldA: 1, + FieldB: common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), + FieldC: []byte("test entry 1"), + }, + { + FieldA: 2, + FieldB: common.HexToHash("0x2234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), + FieldC: []byte("test entry 2"), + }, + { + FieldA: 3, + FieldB: common.HexToHash("0x3234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), + FieldC: []byte("test entry 3"), + }, + { + FieldA: 4, + FieldB: common.HexToHash("0x3234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), + FieldC: []byte("large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4" + + "large test entry 4 large test entry 4 large test entry 4 large test entry 4"), + }, + } + + badUpdateEntry = TestEntry{ + FieldA: 10, + FieldB: common.HexToHash("0xa1cdef7890abcdef1234567890abcdef1234567890abcdef1234567890123456"), + FieldC: []byte("test entry not updated"), } - testEntry2 = TestEntry{ - FieldA: 456, - FieldB: common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), - FieldC: []byte("test entry 2"), + okUpdateEntry = TestEntry{ + FieldA: 11, + FieldB: common.HexToHash("0xa2cdef7890abcdef1234567890abcdef1234567890abcdef1234567890123456"), + FieldC: []byte("update entry"), } testBookmark = TestBookmark{ @@ -76,6 +122,14 @@ var ( nonAddedBookmark = TestBookmark{ FieldA: []byte{0, 2, 0, 0, 0, 0, 0, 0, 0}, } + + headerEntry = TestHeader{ + PacketType: 1, + HeadLength: 29, + StreamType: 1, + TotalLength: 1053479, + TotalEntries: 1304, + } ) func deleteFiles() error { @@ -103,44 +157,218 @@ func TestServer(t *testing.T) { if err != nil { panic(err) } - // Should fail because the start atomic operation has not been called - entryNumber, err := streamServer.AddStreamEntry(testEntryType, testEntry.Encode()) + + // Case: Add entry without starting atomic operation -> FAIL + entryNumber, err := streamServer.AddStreamEntry(entryType1, testEntries[1].Encode()) require.Equal(t, datastreamer.ErrAddEntryNotAllowed, err) require.Equal(t, uint64(0), entryNumber) - // Should fail because server is not started + // Case: Start atomic operation without starting the server -> FAIL err = streamServer.StartAtomicOp() require.Equal(t, datastreamer.ErrAtomicOpNotAllowed, err) require.Equal(t, uint64(0), entryNumber) - // Should succeed + // Case: Start server, start atomic operation, add entries, commit -> OK err = streamServer.Start() require.NoError(t, err) - // Should succeed err = streamServer.StartAtomicOp() require.NoError(t, err) - // Should succeed entryNumber, err = streamServer.AddStreamBookmark(testBookmark.Encode()) require.NoError(t, err) require.Equal(t, uint64(0), entryNumber) - entryNumber, err = streamServer.AddStreamEntry(testEntryType, testEntry.Encode()) + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[1].Encode()) require.NoError(t, err) require.Equal(t, uint64(1), entryNumber) - entryNumber, err = streamServer.AddStreamEntry(testEntryType, testEntry2.Encode()) + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[2].Encode()) require.NoError(t, err) require.Equal(t, uint64(2), entryNumber) err = streamServer.CommitAtomicOp() require.NoError(t, err) - // Get the second entry + // Case: Get entry data of an entry number that exists -> OK entry, err := streamServer.GetEntry(2) require.NoError(t, err) - require.Equal(t, testEntry2, TestEntry{}.Decode(entry.Data)) + require.Equal(t, testEntries[2], TestEntry{}.Decode(entry.Data)) + + // Case: Get entry data of an entry number that doesn't exist -> FAIL + entry, err = streamServer.GetEntry(3) + require.EqualError(t, datastreamer.ErrInvalidEntryNumber, err.Error()) + + // Case: Get entry number pointed by bookmark that exists -> OK + entryNumber, err = streamServer.GetBookmark(testBookmark.Encode()) + require.NoError(t, err) + require.Equal(t, uint64(0), entryNumber) + + // Case: Get entry number pointed by bookmark that doesn't exist -> FAIL + _, err = streamServer.GetBookmark(nonAddedBookmark.Encode()) + require.EqualError(t, errors.New("leveldb: not found"), err.Error()) + + // Case: Update entry data of an entry number that doesn't exist -> FAIL + err = streamServer.UpdateEntryData(22, entryType1, badUpdateEntry.Encode()) + require.EqualError(t, datastreamer.ErrInvalidEntryNumber, err.Error()) + + // Case: Update entry data present in atomic operation in progress -> FAIL + err = streamServer.StartAtomicOp() + require.NoError(t, err) + + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[3].Encode()) + require.NoError(t, err) + require.Equal(t, uint64(3), entryNumber) + + err = streamServer.UpdateEntryData(3, entryType1, badUpdateEntry.Encode()) + require.EqualError(t, datastreamer.ErrUpdateNotAllowed, err.Error()) + + err = streamServer.CommitAtomicOp() + require.NoError(t, err) + + // Case: Update entry data changing the entry type -> FAIL + err = streamServer.UpdateEntryData(3, entryType2, badUpdateEntry.Encode()) + require.EqualError(t, datastreamer.ErrUpdateEntryTypeNotAllowed, err.Error()) + + // Case: Update entry data changing data length -> FAIL + err = streamServer.UpdateEntryData(3, entryType1, badUpdateEntry.Encode()) + require.EqualError(t, datastreamer.ErrUpdateEntryDifferentSize, err.Error()) + + // Case: Update entry data not in atomic oper, same type, same data length -> OK + var entryUpdated uint64 = 3 + err = streamServer.UpdateEntryData(entryUpdated, entryType1, okUpdateEntry.Encode()) + require.NoError(t, err) + + // Case: Get entry just updated and check it is modified -> OK + entry, err = streamServer.GetEntry(entryUpdated) + require.NoError(t, err) + require.Equal(t, entryUpdated, entry.Number) + require.Equal(t, okUpdateEntry, TestEntry{}.Decode(entry.Data)) + + // Case: Get previous entry to the updated one and check not modified -> OK + if entryUpdated > 1 { + entry, err = streamServer.GetEntry(entryUpdated - 1) + require.NoError(t, err) + require.Equal(t, entryUpdated-1, entry.Number) + require.Equal(t, testEntries[entryUpdated-1], TestEntry{}.Decode(entry.Data)) + } + + // Case: Add 3 new entries -> OK + err = streamServer.StartAtomicOp() + require.NoError(t, err) + + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[1].Encode()) + require.NoError(t, err) + require.Equal(t, uint64(4), entryNumber) + + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[2].Encode()) + require.NoError(t, err) + require.Equal(t, uint64(5), entryNumber) + + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[2].Encode()) + require.NoError(t, err) + require.Equal(t, uint64(6), entryNumber) + + err = streamServer.CommitAtomicOp() + require.NoError(t, err) + + // Case: Atomic finished with rollback -> OK + err = streamServer.StartAtomicOp() + require.NoError(t, err) + + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[1].Encode()) + require.NoError(t, err) + require.Equal(t, uint64(7), entryNumber) + + err = streamServer.RollbackAtomicOp() + require.NoError(t, err) + + // Case: Get entry data of previous rollback entry number (doesn't exist) -> FAIL + entry, err = streamServer.GetEntry(7) + require.EqualError(t, datastreamer.ErrInvalidEntryNumber, err.Error()) + + // Case: Truncate file with atomic operation in progress -> FAIL + err = streamServer.StartAtomicOp() + require.NoError(t, err) + + err = streamServer.TruncateFile(5) + require.EqualError(t, datastreamer.ErrTruncateNotAllowed, err.Error()) + + err = streamServer.RollbackAtomicOp() + require.NoError(t, err) + + // Case: Truncate file from an entry number invalid -> FAIL + err = streamServer.TruncateFile(7) + require.EqualError(t, datastreamer.ErrInvalidEntryNumber, err.Error()) + + // Case: Truncate file from valid entry number, not atomic operation in progress -> OK + err = streamServer.TruncateFile(5) + require.NoError(t, err) + + // Case: Get entries included in previous file truncate (don't exist) -> FAIL + entry, err = streamServer.GetEntry(6) + require.EqualError(t, datastreamer.ErrInvalidEntryNumber, err.Error()) + entry, err = streamServer.GetEntry(5) + require.EqualError(t, datastreamer.ErrInvalidEntryNumber, err.Error()) + + // Case: Get entry not included in previous file truncate -> OK + entry, err = streamServer.GetEntry(4) + require.NoError(t, err) + require.Equal(t, uint64(4), entry.Number) + + // Log file header before fill the first data page + datastreamer.PrintHeaderEntry(streamServer.GetHeader(), "before fill page") + + // Case: Fill first data page with entries + entryLength := len(testEntries[4].Encode()) + datastreamer.FixedSizeFileEntry + bytesAvailable := datastreamer.PageDataSize - (streamServer.GetHeader().TotalLength - datastreamer.PageHeaderSize) + numEntries := bytesAvailable / uint64(entryLength) + log.Debugf(">>> totalLength: %d | bytesAvailable: %d | entryLength: %d | numEntries: %d", streamServer.GetHeader().TotalLength, bytesAvailable, entryLength, numEntries) + + lastEntry := entryNumber - 2 // 2 entries truncated + lastEntry = lastEntry - 1 + err = streamServer.StartAtomicOp() + require.NoError(t, err) + + for i := 1; i <= int(numEntries); i++ { + lastEntry++ + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[4].Encode()) + require.NoError(t, err) + require.Equal(t, uint64(lastEntry), entryNumber) + } + + err = streamServer.CommitAtomicOp() + require.NoError(t, err) + + bytesAvailable = datastreamer.PageDataSize - ((streamServer.GetHeader().TotalLength - datastreamer.PageHeaderSize) % datastreamer.PageDataSize) + numEntries = bytesAvailable / uint64(entryLength) + log.Debugf(">>> totalLength: %d | bytesAvailable: %d | entryLength: %d | numEntries: %d", streamServer.GetHeader().TotalLength, bytesAvailable, entryLength, numEntries) + + // Case: Get latest entry stored in the first data page -> OK + entry, err = streamServer.GetEntry(entryNumber) + require.NoError(t, err) + require.Equal(t, entryNumber, entry.Number) + require.Equal(t, testEntries[4], TestEntry{}.Decode(entry.Data)) + + // Case: Add new entry and will be stored in the second data page -> OK + err = streamServer.StartAtomicOp() + require.NoError(t, err) + + entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[4].Encode()) + require.NoError(t, err) + require.Equal(t, uint64(lastEntry+1), entryNumber) + + err = streamServer.CommitAtomicOp() + require.NoError(t, err) + + // Case: Get entry stored in the second data page -> OK + entry, err = streamServer.GetEntry(entryNumber) + require.NoError(t, err) + require.Equal(t, entryNumber, entry.Number) + require.Equal(t, testEntries[4], TestEntry{}.Decode(entry.Data)) + + // Log final file header + datastreamer.PrintHeaderEntry(streamServer.GetHeader(), "final tests") } func TestClient(t *testing.T) { @@ -150,30 +378,76 @@ func TestClient(t *testing.T) { err = client.Start() require.NoError(t, err) - // Should succeed - client.FromBookmark = testBookmark.Encode() - err = client.ExecCommand(datastreamer.CmdBookmark) - require.NoError(t, err) - require.Equal(t, testEntry.Encode(), client.Entry.Data) - - // Should fail because the bookmark is not added + // Case: Query data from not existing bookmark -> FAIL client.FromBookmark = nonAddedBookmark.Encode() err = client.ExecCommand(datastreamer.CmdBookmark) require.EqualError(t, datastreamer.ErrBookmarkNotFound, err.Error()) - // Should fail because the entry is not added - client.FromEntry = 3 + // Case: Query data from existing bookmark -> OK + client.FromBookmark = testBookmark.Encode() + err = client.ExecCommand(datastreamer.CmdBookmark) + require.NoError(t, err) + + // Case: Query data for entry number that doesn't exist -> FAIL + client.FromEntry = 5000 err = client.ExecCommand(datastreamer.CmdEntry) require.EqualError(t, datastreamer.ErrEntryNotFound, err.Error()) + // Case: Query data for entry number that exists -> OK client.FromEntry = 2 err = client.ExecCommand(datastreamer.CmdEntry) require.NoError(t, err) + require.Equal(t, testEntries[2], TestEntry{}.Decode(client.Entry.Data)) - require.Equal(t, testEntry2, TestEntry{}.Decode(client.Entry.Data)) - + // Case: Query data for entry number that exists -> OK client.FromEntry = 1 err = client.ExecCommand(datastreamer.CmdEntry) require.NoError(t, err) - require.Equal(t, testEntry, TestEntry{}.Decode(client.Entry.Data)) + require.Equal(t, testEntries[1], TestEntry{}.Decode(client.Entry.Data)) + + // Case: Query header info -> OK + err = client.ExecCommand(datastreamer.CmdHeader) + require.NoError(t, err) + require.Equal(t, headerEntry.TotalEntries, client.Header.TotalEntries) + require.Equal(t, headerEntry.TotalLength, client.Header.TotalLength) + + // Case: Start sync from not existing entry -> FAIL + // client.FromEntry = 22 + // err = client.ExecCommand(datastreamer.CmdStart) + // require.EqualError(t, datastreamer.ErrResultCommandError, err.Error()) + + // Case: Start sync from not existing bookmark -> FAIL + // client.FromBookmark = nonAddedBookmark.Encode() + // err = client.ExecCommand(datastreamer.CmdStartBookmark) + // require.EqualError(t, datastreamer.ErrResultCommandError, err.Error()) + + // Case: Start sync from existing entry -> OK + // client.FromEntry = 0 + // err = client.ExecCommand(datastreamer.CmdStart) + // require.NoError(t, err) + + // Case: Start sync from existing bookmark -> OK + client.FromBookmark = testBookmark.Encode() + err = client.ExecCommand(datastreamer.CmdStartBookmark) + require.NoError(t, err) + + // Case: Query entry data with streaming started -> FAIL + // client.FromEntry = 2 + // err = client.ExecCommand(datastreamer.CmdEntry) + // require.EqualError(t, datastreamer.ErrResultCommandError, err.Error()) + + // Case: Query bookmark data with streaming started -> FAIL + // client.FromBookmark = testBookmark.Encode() + // err = client.ExecCommand(datastreamer.CmdBookmark) + // require.EqualError(t, datastreamer.ErrResultCommandError, err.Error()) + + // Case: Stop receiving streaming -> OK + err = client.ExecCommand(datastreamer.CmdStop) + require.NoError(t, err) + + // Case: Query entry data after stop the streaming -> OK + client.FromEntry = 2 + err = client.ExecCommand(datastreamer.CmdEntry) + require.NoError(t, err) + require.Equal(t, testEntries[2], TestEntry{}.Decode(client.Entry.Data)) } diff --git a/datastreamer/streamclient.go b/datastreamer/streamclient.go index 6b3b2c2..2f8b481 100644 --- a/datastreamer/streamclient.go +++ b/datastreamer/streamclient.go @@ -523,7 +523,7 @@ func (c *StreamClient) getStreaming() { // Process the data entry err := c.processEntry(&e, c, c.relayServer) if err != nil { - log.Errorf("%s Error processing entry %d", c.Id, e.Number) + log.Fatalf("%s Error processing entry %d: %s. HALTED!", c.Id, e.Number, err.Error()) } } } diff --git a/datastreamer/streamfile.go b/datastreamer/streamfile.go index 6c52155..93b6634 100644 --- a/datastreamer/streamfile.go +++ b/datastreamer/streamfile.go @@ -19,8 +19,8 @@ const ( fileMode = 0666 // Open file mode magicNumSize = 16 // Magic numbers size headerSize = 29 // Header data size - pageHeaderSize = 4096 // 4K size header page - pageDataSize = 1024 * 1024 // 1 MB size data page + PageHeaderSize = 4096 // PageHeaderSize is the size of header page (4 KB) + PageDataSize = 1024 * 1024 // PageDataSize is the size of one data page (1 MB) initPages = 100 // Initial number of data pages nextPages = 10 // Number of data pages to add when file is full @@ -78,7 +78,7 @@ type iteratorFile struct { func NewStreamFile(fn string, st StreamType) (*StreamFile, error) { sf := StreamFile{ fileName: fn, - pageSize: pageDataSize, + pageSize: PageDataSize, file: nil, streamType: st, maxLength: 0, @@ -209,7 +209,7 @@ func (f *StreamFile) initializeFile() error { // createHeaderPage creates and initilize the header page of the stream file func (f *StreamFile) createHeaderPage() error { // Create the header page (first page) of the file - err := f.createPage(pageHeaderSize) + err := f.createPage(PageHeaderSize) if err != nil { log.Errorf("Error creating the header page: %v", err) return err @@ -217,8 +217,8 @@ func (f *StreamFile) createHeaderPage() error { // Update total data length and max file length f.mutexHeader.Lock() - f.maxLength = f.maxLength + pageHeaderSize - f.header.TotalLength = pageHeaderSize + f.maxLength = f.maxLength + PageHeaderSize + f.header.TotalLength = PageHeaderSize f.mutexHeader.Unlock() // Write magic numbers @@ -336,23 +336,17 @@ func (f *StreamFile) getHeaderEntry() HeaderEntry { } // PrintHeaderEntry prints file header information -func PrintHeaderEntry(e HeaderEntry) { - log.Info("--- HEADER ENTRY -------------------------") +func PrintHeaderEntry(e HeaderEntry, title string) { + log.Infof("--- HEADER ENTRY %s -------------------------", title) log.Infof("packetType: [%d]", e.packetType) log.Infof("headerLength: [%d]", e.headLength) log.Infof("streamType: [%d]", e.streamType) log.Infof("totalLength: [%d]", e.TotalLength) log.Infof("totalEntries: [%d]", e.TotalEntries) - var usedPages uint64 - if e.TotalLength == 0 { - usedPages = 0 - } else if (e.TotalLength-pageHeaderSize)%pageDataSize == 0 { - usedPages = (e.TotalLength - pageHeaderSize) / pageDataSize - } else { - usedPages = (e.TotalLength-pageHeaderSize)/pageDataSize + 1 - } - log.Infof("usedDataPages=[%d]", usedPages) + numPage := (e.TotalLength - PageHeaderSize) / PageDataSize + offPage := (e.TotalLength - PageHeaderSize) % PageDataSize + log.Infof("DataPage num=[%d] off=[%d]", numPage, offPage) } // writeHeaderEntry writes the memory header struct into the file header @@ -435,13 +429,13 @@ func (f *StreamFile) checkFileConsistency() error { } // Check header page is present - if info.Size() < pageHeaderSize { + if info.Size() < PageHeaderSize { log.Error("Invalid file: missing header page") return ErrInvalidFileMissingHeaderPage } // Check data pages are not cut - dataSize := info.Size() - pageHeaderSize + dataSize := info.Size() - PageHeaderSize uncut := dataSize % int64(f.pageSize) if uncut != 0 { log.Error("Inconsistent file size there is a cut data page") @@ -510,10 +504,10 @@ func (f *StreamFile) AddFileEntry(e FileEntry) error { // Check if the entry fits on current page var pageRemaining uint64 entryLength := uint64(len(be)) - if (f.header.TotalLength-pageHeaderSize)%pageDataSize == 0 { + if (f.header.TotalLength-PageHeaderSize)%PageDataSize == 0 { pageRemaining = 0 } else { - pageRemaining = pageDataSize - (f.header.TotalLength-pageHeaderSize)%pageDataSize + pageRemaining = PageDataSize - (f.header.TotalLength-PageHeaderSize)%PageDataSize } if entryLength > pageRemaining { log.Debugf(">> Fill with pad entries. PageRemaining:%d, EntryLength:%d", pageRemaining, entryLength) @@ -577,10 +571,10 @@ func (f *StreamFile) fillPagePadEntries() error { // Page remaining free space var pageRemaining uint64 - if (f.header.TotalLength-pageHeaderSize)%pageDataSize == 0 { + if (f.header.TotalLength-PageHeaderSize)%PageDataSize == 0 { pageRemaining = 0 } else { - pageRemaining = pageDataSize - (f.header.TotalLength-pageHeaderSize)%pageDataSize + pageRemaining = PageDataSize - (f.header.TotalLength-PageHeaderSize)%PageDataSize } if pageRemaining > 0 { @@ -615,8 +609,8 @@ func printStreamFile(f *StreamFile) { log.Infof("pageSize: [%d]", f.pageSize) log.Infof("streamType: [%d]", f.streamType) log.Infof("maxLength: [%d]", f.maxLength) - log.Infof("numDataPages=[%d]", (f.maxLength-pageHeaderSize)/pageDataSize) - PrintHeaderEntry(f.header) + log.Infof("numDataPages=[%d]", (f.maxLength-PageHeaderSize)/PageDataSize) + PrintHeaderEntry(f.header, "") } // DecodeBinaryToFileEntry decodes from binary bytes slice to file entry type @@ -644,6 +638,12 @@ func DecodeBinaryToFileEntry(b []byte) (FileEntry, error) { // iteratorFrom initializes iterator to locate a data entry number in the stream file func (f *StreamFile) iteratorFrom(entryNum uint64, readOnly bool) (*iteratorFile, error) { + // Check starting entry number + if entryNum >= f.writtenHead.TotalEntries { + log.Infof("Invalid starting entry number for iterator") + return nil, ErrInvalidEntryNumber + } + // Iterator mode var flag int if readOnly { @@ -677,7 +677,7 @@ func (f *StreamFile) iteratorFrom(entryNum uint64, readOnly bool) (*iteratorFile // iteratorNext gets the next data entry in the file for the iterator, returns the end of entries condition func (f *StreamFile) iteratorNext(iterator *iteratorFile) (bool, error) { // Check end of entries condition - if iterator.Entry.Number == f.writtenHead.TotalEntries { + if iterator.Entry.Number >= f.writtenHead.TotalEntries { return true, nil } @@ -700,10 +700,10 @@ func (f *StreamFile) iteratorNext(iterator *iteratorFile) (bool, error) { // Bytes to forward until next data page var forward int64 - if (pos-pageHeaderSize)%pageDataSize == 0 { + if (pos-PageHeaderSize)%PageDataSize == 0 { forward = 0 } else { - forward = pageDataSize - ((pos - pageHeaderSize) % pageDataSize) + forward = PageDataSize - ((pos - PageHeaderSize) % PageDataSize) } // Check end of data pages condition @@ -780,9 +780,9 @@ func (f *StreamFile) seekEntry(iterator *iteratorFile) error { // Start and end data pages avg := 0 beg := 0 - end := int((f.writtenHead.TotalLength - pageHeaderSize) / pageDataSize) - if (f.writtenHead.TotalLength-pageHeaderSize)%pageDataSize != 0 { - end = end + 1 + end := int((f.writtenHead.TotalLength - PageHeaderSize) / PageDataSize) + if (f.writtenHead.TotalLength-PageHeaderSize)%PageDataSize == 0 { + end = end - 1 } // Custom binary search @@ -790,7 +790,7 @@ func (f *StreamFile) seekEntry(iterator *iteratorFile) error { avg = beg + (end-beg)/2 // nolint:gomnd // Seek for the start of avg data page - newPos := (avg * pageDataSize) + pageHeaderSize + newPos := (avg * PageDataSize) + PageHeaderSize _, err := iterator.file.Seek(int64(newPos), io.SeekStart) if err != nil { log.Errorf("Error seeking page for iterator seek entry: %v", err) @@ -869,17 +869,17 @@ func (f *StreamFile) getFirstEntryOnNextPage(iterator *iteratorFile) (uint64, er } // Check if it is valid the current file position - if curpos < pageHeaderSize || curpos > int64(f.writtenHead.TotalLength) { + if curpos < PageHeaderSize || curpos > int64(f.writtenHead.TotalLength) { log.Errorf("Error current file position outside a data page") return 0, ErrCurrentPositionOutsideDataPage } // Check if exists another data page var forward int64 - if (curpos-pageHeaderSize)%pageDataSize == 0 { + if (curpos-PageHeaderSize)%PageDataSize == 0 { forward = 0 } else { - forward = pageDataSize - (curpos-pageHeaderSize)%pageDataSize + forward = PageDataSize - (curpos-PageHeaderSize)%PageDataSize } if curpos+forward >= int64(f.writtenHead.TotalLength) { diff --git a/datastreamer/streamserver.go b/datastreamer/streamserver.go index d33e1e3..93bcbf5 100644 --- a/datastreamer/streamserver.go +++ b/datastreamer/streamserver.go @@ -285,7 +285,7 @@ func (s *StreamServer) handleConnection(conn net.Conn) { err = s.processCommand(Command(command), clientId) if err != nil { // Kill client connection - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) // nolint:gomnd s.killClient(clientId) return } @@ -457,6 +457,9 @@ func (s *StreamServer) TruncateFile(entryNum uint64) error { return ErrTruncateNotAllowed } + // Log previous header + PrintHeaderEntry(s.streamFile.header, "(before truncate)") + // Truncate entries in the file err := s.streamFile.truncateFile(entryNum) if err != nil { @@ -466,6 +469,10 @@ func (s *StreamServer) TruncateFile(entryNum uint64) error { // Update entry number sequence s.nextEntry = s.streamFile.header.TotalEntries + // Log current header + log.Infof("File truncated! Removed entries from %d (included) until end of file", entryNum) + PrintHeaderEntry(s.streamFile.header, "(after truncate)") + return nil } @@ -935,7 +942,7 @@ func (s *StreamServer) streamingFromEntry(clientId string, fromEntry uint64) err // Send the file data entry binaryEntry := encodeFileEntryToBinary(iterator.Entry) - log.Infof("Sending data entry %d (type %d) to %s", iterator.Entry.Number, iterator.Entry.Type, clientId) + log.Debugf("Sending data entry %d (type %d) to %s", iterator.Entry.Number, iterator.Entry.Type, clientId) if conn != nil { _, err = conn.Write(binaryEntry) } else {