Skip to content

Commit

Permalink
chore: increase idle source e2e test timeout (numaproj#1577)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Mar 19, 2024
1 parent b243c6c commit d50c531
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (jr *jetStreamReader) Read(ctx context.Context, count int64) ([]*isb.ReadMe
rctx, cancel := context.WithTimeout(ctx, jr.opts.readTimeOut)
defer cancel()
msgs, err := jr.sub.Fetch(int(count), nats.Context(rctx))
if err != nil && !errors.Is(err, nats.ErrTimeout) {
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
isbReadErrors.With(map[string]string{"buffer": jr.GetName()}).Inc()
return nil, fmt.Errorf("failed to fetch messages from jet stream subject %q, %w", jr.subject, err)
}
Expand Down
6 changes: 2 additions & 4 deletions test/idle-source-e2e/idle_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,15 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithHttpSource() {
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("X-Numaflow-Event-Time", eventTime))
time.Sleep(10 * time.Millisecond)
}
}
}()

// since the key can be even or odd and the window duration is 10s
// the sum should be 20(for even) and 40(for odd)
w.Expect().
SinkContains("sink", "20").
SinkContains("sink", "40")
SinkContains("sink", "20", WithTimeout(300*time.Second)).
SinkContains("sink", "40", WithTimeout(300*time.Second))
done <- struct{}{}
}

Expand Down Expand Up @@ -115,7 +114,6 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithKafkaSource() {
if i < 2000 {
SendMessage(topic, "data", generateMsg("2", startTime), 1)
}
time.Sleep(10 * time.Millisecond)
startTime = startTime.Add(1 * time.Second)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ spec:
limits:
readBatchSize: 50
watermark:
maxDelay: 10s
idleSource:
threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value.
incrementBy: 2s # If source is found to be idle then increment the watermark by given incrementBy value.
Expand Down

0 comments on commit d50c531

Please sign in to comment.