Skip to content

Commit

Permalink
feat: unaligned wal (numaproj#1511)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Mar 13, 2024
1 parent 1844575 commit dc69b29
Show file tree
Hide file tree
Showing 83 changed files with 3,702 additions and 527 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobs:
max-parallel: 13
matrix:
driver: [jetstream]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinputs-e2e, idle-source-e2e, reduce-sdk-e2e]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-one-e2e, reduce-two-e2e, udsource-e2e, api-e2e, sideinputs-e2e, idle-source-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ test-kafka-e2e:
test-http-e2e:
test-nats-e2e:
test-sdks-e2e:
test-reduce-e2e:
test-reduce-one-e2e:
test-reduce-two-e2e:
test-api-e2e:
test-udsource-e2e:
test-transformer-e2e:
Expand Down
22 changes: 18 additions & 4 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,24 @@ const (
// PVC mount path for PBQ
PathPBQMount = "/var/numaflow/pbq"

// Default persistent store options
DefaultStoreSyncDuration = 2 * time.Second // Default sync duration for pbq
DefaultStoreMaxBufferSize = 100000 // Default buffer size for pbq in bytes
DefaultStorePath = PathPBQMount + "/wals" // Default store path
// Default WAL options
DefaultWALSyncDuration = 30 * time.Second // Default sync duration for pbq
DefaultWALMaxSyncSize = 5 * 1024 * 1024 // Default size to wait for an explicit sync
DefaultSegmentWALPath = PathPBQMount + "/wals" // Default segment wal path
DefaultWALSegmentRotationDuration = 60 * time.Second // Default segment rotation duration
DefaultWALSegmentSize = 30 * 1024 * 1024 // Default segment size

// Default GC-events WAL options
DefaultGCEventsWALRotationDuration = 60 * time.Second // Default rotation duration for the GC tracker
DefaultGCEventsWALEventsPath = PathPBQMount + "/events" // Default store path for operations
DefaultGCEventsWALSyncDuration = 30 * time.Second // Default sync duration for the GC tracker
DefaultGCEventsWALRotationEventsCount = 3000 // Default rotation events count for the GC tracker

// Default WAL Compactor options
DefaultWALCompactorSyncDuration = 30 * time.Second // Default sync duration for the compactor
DefaultWALCompactorMaxFileSize = 30 * 1024 * 1024 // Default max file size for the compactor
DefaultWALCompactionDuration = 60 * time.Second // Default compaction duration
DefaultCompactWALPath = PathPBQMount + "/compact-wals" // Default compaction wal path

// DefaultKeyForNonKeyedData Default key for non keyed stream
DefaultKeyForNonKeyedData = "NON_KEYED_STREAM"
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestJetStreamBufferRead(t *testing.T) {
defer jw.Close()
// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(20), startTime)
messages := testutils.BuildTestWriteMessages(int64(20), startTime, nil)
// Verify if buffer is full.
for jw.isFull.Load() {
select {
Expand Down
10 changes: 5 additions & 5 deletions pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestForwarderJetStreamBuffer(t *testing.T) {
defer jw.Close()
// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(10), startTime)
messages := testutils.BuildTestWriteMessages(int64(10), startTime, nil)
// Verify if buffer is not full.
for jw.isFull.Load() {
select {
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestJetStreamBufferWriterBufferFull(t *testing.T) {
}
// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(2), startTime)
messages := testutils.BuildTestWriteMessages(int64(2), startTime, nil)
// Add some data to buffer using write and verify no writes are performed when buffer is full
_, errs := jw.Write(ctx, messages)
assert.Equal(t, len(errs), 2)
Expand All @@ -239,7 +239,7 @@ func TestJetStreamBufferWriterBufferFull(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}
}
messages = testutils.BuildTestWriteMessages(int64(2), time.Unix(1636470001, 0))
messages = testutils.BuildTestWriteMessages(int64(2), time.Unix(1636470001, 0), nil)
_, errs = jw.Write(ctx, messages)
assert.Equal(t, len(errs), 2)
for _, errMsg := range errs {
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) {
}
// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(2), startTime)
messages := testutils.BuildTestWriteMessages(int64(2), startTime, nil)
// Add some data to buffer using write and verify no writes are performed when buffer is full
_, errs := jw.Write(ctx, messages)
assert.Equal(t, len(errs), 2)
Expand All @@ -296,7 +296,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}
}
messages = testutils.BuildTestWriteMessages(int64(2), time.Unix(1636470001, 0))
messages = testutils.BuildTestWriteMessages(int64(2), time.Unix(1636470001, 0), nil)
_, errs = jw.Write(ctx, messages)
assert.Equal(t, len(errs), 2)
for _, errMsg := range errs {
Expand Down
8 changes: 4 additions & 4 deletions pkg/isb/stores/redis/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestRedisQRead_Read(t *testing.T) {

// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(count, startTime)
messages := testutils.BuildTestWriteMessages(count, startTime, nil)
for _, msg := range messages {
err := client.Client.XAdd(ctx, &redis.XAddArgs{
Stream: rqr.GetStreamName(),
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestRedisCheckBacklog(t *testing.T) {

// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(count, startTime)
messages := testutils.BuildTestWriteMessages(count, startTime, nil)
for _, msg := range messages {
err := client.Client.XAdd(ctx, &redis.XAddArgs{
Stream: rqr.GetStreamName(),
Expand Down Expand Up @@ -392,7 +392,7 @@ func (suite *ReadWritePerformance) TestReadWriteLatency() {
suite.False(suite.rqw.IsFull())
var writeMessages = make([]isb.Message, 0, suite.count)

writeMessages = append(writeMessages, testutils.BuildTestWriteMessages(suite.count, testStartTime)...)
writeMessages = append(writeMessages, testutils.BuildTestWriteMessages(suite.count, testStartTime, nil)...)

stopped := suite.isdf.Start()

Expand Down Expand Up @@ -443,7 +443,7 @@ func (suite *ReadWritePerformance) TestReadWriteLatencyPipelining() {
suite.False(suite.rqw.IsFull())
var writeMessages = make([]isb.Message, 0, suite.count)

writeMessages = append(writeMessages, testutils.BuildTestWriteMessages(suite.count, testStartTime)...)
writeMessages = append(writeMessages, testutils.BuildTestWriteMessages(suite.count, testStartTime, nil)...)

stopped := suite.isdf.Start()

Expand Down
7 changes: 4 additions & 3 deletions pkg/isb/stores/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ func buildTestWriteMessages(rqw *BufferWrite, count int64, startTime time.Time)
var messages = make([]isb.Message, 0, count)
var internalHashKeysMap map[string]bool
var internalHashKeys = make([]string, 0)
messages = append(messages, testutils.BuildTestWriteMessages(count, startTime)...)
messages = append(messages, testutils.BuildTestWriteMessages(count, startTime, nil)...)
for i := int64(0); i < count; i++ {
tmpTime := startTime.Add(time.Duration(i) * time.Minute)
messages[i].EventTime = tmpTime
hashKeyName := rqw.GetHashKeyName(tmpTime)
if ok := internalHashKeysMap[hashKeyName]; !ok {
internalHashKeys = append(internalHashKeys, hashKeyName)
Expand Down Expand Up @@ -486,7 +487,7 @@ func TestXTrimOnIsFull(t *testing.T) {

// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(10), startTime)
messages := testutils.BuildTestWriteMessages(int64(10), startTime, nil)
// Add 10 messages
for _, msg := range messages {
err := client.Client.XAdd(ctx, &redis.XAddArgs{
Expand Down Expand Up @@ -551,7 +552,7 @@ func TestSetWriteInfo(t *testing.T) {

// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(10), startTime)
messages := testutils.BuildTestWriteMessages(int64(10), startTime, nil)
// Add 10 messages
for _, msg := range messages {
err := client.Client.XAdd(ctx, &redis.XAddArgs{
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestNewSimpleBuffer(t *testing.T) {
assert.Equal(t, sb.IsEmpty(), true)

startTime := time.Unix(1636470000, 0)
writeMessages := testutils.BuildTestWriteMessages(count, startTime)
writeMessages := testutils.BuildTestWriteMessages(count, startTime, nil)
sb.Write(ctx, writeMessages[0:5])
assert.Equal(t, int64(5), sb.writeIdx)
assert.Equal(t, int64(0), sb.readIdx)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestNewSimpleBuffer_BufferFullWritingStrategyIsDiscard(t *testing.T) {
assert.Equal(t, sb.IsEmpty(), true)

startTime := time.Unix(1636470000, 0)
writeMessages := testutils.BuildTestWriteMessages(count, startTime)
writeMessages := testutils.BuildTestWriteMessages(count, startTime, nil)

// try to write 3 messages, it should fail (we have only space for 2)
// the first 2 messages should be written, the last one should be discarded and returns us a NoRetryableError.
Expand Down
22 changes: 13 additions & 9 deletions pkg/isb/testutils/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ type PayloadForTest struct {
}

// BuildTestWriteMessages builds test isb.Message which can be used for testing.
func BuildTestWriteMessages(count int64, startTime time.Time) []isb.Message {
func BuildTestWriteMessages(count int64, startTime time.Time, keys []string) []isb.Message {
if keys == nil {
keys = []string{}
}
var messages = make([]isb.Message, 0, count)
for i := int64(0); i < count; i++ {
tmpTime := startTime.Add(time.Duration(i) * time.Minute)
tmpTime := startTime.Add(time.Duration(i) * time.Second)
result, _ := json.Marshal(PayloadForTest{
Key: fmt.Sprintf("paydload_%d", i),
Value: i,
Expand All @@ -53,7 +56,7 @@ func BuildTestWriteMessages(count int64, startTime time.Time) []isb.Message {
EventTime: tmpTime,
},
ID: fmt.Sprintf("%d-testVertex-0-0", i), // TODO: hard coded ID suffix ATM, make configurable if needed
Keys: []string{},
Keys: keys,
},
Body: isb.Body{Payload: result},
},
Expand All @@ -65,7 +68,7 @@ func BuildTestWriteMessages(count int64, startTime time.Time) []isb.Message {

// BuildTestWindowRequests builds test window.TimedWindowRequest which can be used for testing.
func BuildTestWindowRequests(count int64, startTime time.Time, windowOp window.Operation) []window.TimedWindowRequest {
var readMessages = BuildTestReadMessages(count, startTime)
var readMessages = BuildTestReadMessages(count, startTime, nil)
var windowRequests = make([]window.TimedWindowRequest, count)

for idx, readMessage := range readMessages {
Expand All @@ -78,23 +81,23 @@ func BuildTestWindowRequests(count int64, startTime time.Time, windowOp window.O
}

// BuildTestReadMessages builds test isb.ReadMessage which can be used for testing.
func BuildTestReadMessages(count int64, startTime time.Time) []isb.ReadMessage {
writeMessages := BuildTestWriteMessages(count, startTime)
func BuildTestReadMessages(count int64, startTime time.Time, keys []string) []isb.ReadMessage {
writeMessages := BuildTestWriteMessages(count, startTime, keys)
var readMessages = make([]isb.ReadMessage, count)

for idx, writeMessage := range writeMessages {
readMessages[idx] = isb.ReadMessage{
Message: writeMessage,
ReadOffset: isb.NewSimpleStringPartitionOffset(fmt.Sprintf("read_%s", writeMessage.Header.ID), 0),
ReadOffset: isb.NewSimpleStringPartitionOffset(fmt.Sprintf("%d", idx), 0),
}
}

return readMessages
}

// BuildTestReadMessagesIntOffset builds test isb.ReadMessage which can be used for testing.
func BuildTestReadMessagesIntOffset(count int64, startTime time.Time) []isb.ReadMessage {
writeMessages := BuildTestWriteMessages(count, startTime)
func BuildTestReadMessagesIntOffset(count int64, startTime time.Time, keys []string) []isb.ReadMessage {
writeMessages := BuildTestWriteMessages(count, startTime, keys)
var readMessages = make([]isb.ReadMessage, count)

for idx, writeMessage := range writeMessages {
Expand All @@ -103,6 +106,7 @@ func BuildTestReadMessagesIntOffset(count int64, startTime time.Time) []isb.Read
readMessages[idx] = isb.ReadMessage{
Message: writeMessage,
ReadOffset: isb.NewSimpleIntPartitionOffset(int64(offset), 0),
Watermark: writeMessage.EventTime,
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/isbsvc/redis_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"testing"
"time"

"github.com/numaproj/numaflow/pkg/isb/stores/redis"
goredis "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"

"github.com/numaproj/numaflow/pkg/isb/stores/redis"

"github.com/numaproj/numaflow/pkg/isb/testutils"
redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
)
Expand All @@ -50,7 +51,7 @@ func TestIsbsRedisSvc_Buffers(t *testing.T) {
// Verify
// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(10), startTime)
messages := testutils.BuildTestWriteMessages(int64(10), startTime, nil)
// Add 10 messages
for _, msg := range messages {
err := redisClient.Client.XAdd(ctx, &goredis.XAddArgs{
Expand Down
Loading

0 comments on commit dc69b29

Please sign in to comment.