Skip to content

Commit

Permalink
ingest/ledgerbackend: fix to ensure that the ledger buffer properly q…
Browse files Browse the repository at this point in the history
…ueues the last batch of ledgers (LCM) within the specified range, preventing get_ledger from blocking indefinitely.
  • Loading branch information
urvisavla committed Jan 2, 2025
1 parent 9c71a45 commit 1c84c6f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 23 deletions.
40 changes: 19 additions & 21 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func createBufferedStorageBackendForTesting() BufferedStorageBackend {
func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) *datastore.MockDataStore {
mockDataStore := new(datastore.MockDataStore)
partition := count*partitionSize - 1

schema := datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
}

start = schema.GetSequenceNumberStartBoundary(start)
end = schema.GetSequenceNumberEndBoundary(end)
for i := start; i <= end; i = i + count {
var objectName string
var readCloser io.ReadCloser
Expand All @@ -78,10 +86,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil).Times(1)
}
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
})
mockDataStore.On("GetSchema").Return(schema)

t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand Down Expand Up @@ -248,31 +253,24 @@ func TestBSBGetLedger_SingleLedgerPerFile(t *testing.T) {
}

func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) {
startLedger := uint32(2)
endLedger := uint32(5)
startLedger := uint32(6)
endLedger := uint32(17)
lcmArray := createLCMForTesting(startLedger, endLedger)
bsb := createBufferedStorageBackendForTesting()
ctx := context.Background()
ledgerRange := BoundedRange(startLedger, endLedger)

mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 4)
bsb.dataStore = mockDataStore
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(2),
FilesPerPartition: partitionSize,
})

assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange))
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50)
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 4 }, time.Second*5, time.Millisecond*50)

lcm, err := bsb.GetLedger(ctx, uint32(2))
assert.NoError(t, err)
assert.Equal(t, lcmArray[0], lcm)
lcm, err = bsb.GetLedger(ctx, uint32(3))
assert.NoError(t, err)
assert.Equal(t, lcmArray[1], lcm)
lcm, err = bsb.GetLedger(ctx, uint32(4))
assert.NoError(t, err)
assert.Equal(t, lcmArray[2], lcm)
for i := 0; i <= int(endLedger-startLedger); i++ {
lcm, err := bsb.GetLedger(ctx, startLedger+uint32(i))
assert.NoError(t, err)
assert.Equal(t, lcmArray[i], lcm)
}
}

func TestBSBGetLedger_ErrorPreceedingLedger(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
}

func (lb *ledgerBuffer) pushTaskQueue() {
// In bounded mode, don't queue past the end ledger
if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded {
// In bounded mode, don't queue past the end boundary ledger for the specified range.
if lb.ledgerRange.bounded && lb.nextTaskLedger > lb.dataStore.GetSchema().GetSequenceNumberEndBoundary(lb.ledgerRange.to) {
return
}
lb.taskQueue <- lb.nextTaskLedger
Expand Down

0 comments on commit 1c84c6f

Please sign in to comment.