Skip to content

Commit

Permalink
Feature/relay5 (#82)
Browse files Browse the repository at this point in the history
* Remove write syncs to improve performance. Remove test cases from dsapp

* Change log level
  • Loading branch information
dpunish3r authored Nov 13, 2023
1 parent 013c6c6 commit 63349f7
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 41 deletions.
25 changes: 0 additions & 25 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func runServer(ctx *cli.Context) error {
dataTx = append(dataTx, 1) // nolint:gomnd
dataTx = binary.LittleEndian.AppendUint32(dataTx, 5) // nolint:gomnd
dataTx = append(dataTx, []byte{1, 2, 3, 4, 5}...) // nolint:gomnd
updatedData := []byte{6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}

dataBlockEnd := make([]byte, 0)
dataBlockEnd = binary.LittleEndian.AppendUint64(dataBlockEnd, 1337) // nolint:gomnd
Expand All @@ -231,30 +230,6 @@ func runServer(ctx *cli.Context) error {

rand.Seed(time.Now().UnixNano())

// Get Header
header := s.GetHeader()
log.Infof(">> GetHeader test: nextEntryNumber[%d]", header.TotalEntries)

// Get Entry
if header.TotalEntries > 10 { // nolint:gomnd
entry, err := s.GetEntry(10) // nolint:gomnd
if err != nil {
log.Errorf(">> GetEntry test: error %v", err)
} else {
log.Infof(">> GetEntry test: num[%d] type[%d] length[%d] data[%v]", entry.Number, entry.Type, entry.Length, entry.Data)
}

// Update entry data
_ = s.UpdateEntryData(10, EtL2Tx, updatedData) // nolint:gomnd
// Get Entry again
entry, err = s.GetEntry(10) // nolint:gomnd
if err != nil {
log.Errorf(">> GetEntry test: error %v", err)
} else {
log.Infof(">> GetEntry test: num[%d] type[%d] length[%d] data[%v]", entry.Number, entry.Type, entry.Length, entry.Data)
}
}

// Atomic Operations loop
for n := 1; n <= int(numOpersLoop); n++ {
// Start atomic operation
Expand Down
18 changes: 2 additions & 16 deletions datastreamer/streamfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,6 @@ func (f *StreamFile) writeHeaderEntry() error {
log.Errorf("Error writing the header: %v", err)
return err
}
err = f.fileHeader.Sync()
if err != nil {
log.Errorf("Error flushing header data to disk: %v", err)
return err
}

// Update the written header
f.mutexHeader.Lock()
Expand Down Expand Up @@ -519,13 +514,13 @@ func (f *StreamFile) AddFileEntry(e FileEntry) error {
// Check if file is full
if f.header.TotalLength == f.maxLength {
// Add new data pages to the file
log.Warnf(">> FULL FILE (TotalLength: %d) -> extending!", f.header.TotalLength)
log.Infof(">> FULL FILE (TotalLength: %d) -> extending!", f.header.TotalLength)
err = f.extendFile()
if err != nil {
return err
}

log.Warnf(">> New file max length: %d", f.maxLength)
log.Infof(">> New file max length: %d", f.maxLength)

// Re-set the file position to write
_, err = f.file.Seek(int64(f.header.TotalLength), io.SeekStart)
Expand All @@ -543,13 +538,6 @@ func (f *StreamFile) AddFileEntry(e FileEntry) error {
return err
}

// Flush data to disk
err = f.file.Sync()
if err != nil {
log.Errorf("Error flushing new entry to disk: %v", err)
return err
}

// Update the current header in memory (on disk later when the commit arrives)
f.mutexHeader.Lock()
f.header.TotalLength = f.header.TotalLength + entryLength
Expand Down Expand Up @@ -591,8 +579,6 @@ func (f *StreamFile) fillPagePadEntries() error {
return err
}

// Sync/flush to disk will be done outside this function

// Update the current header in memory (on disk later when the commit arrives)
f.mutexHeader.Lock()
f.header.TotalLength = f.header.TotalLength + pageRemaining
Expand Down

0 comments on commit 63349f7

Please sign in to comment.