diff --git a/test/udsource-e2e/testdata/simple-source-go.yaml b/test/udsource-e2e/testdata/simple-source-go.yaml index b0f184916f..9d1f309552 100644 --- a/test/udsource-e2e/testdata/simple-source-go.yaml +++ b/test/udsource-e2e/testdata/simple-source-go.yaml @@ -13,6 +13,9 @@ spec: image: quay.io/numaio/numaflow-go/source-simple-source:stable limits: readBatchSize: 500 + scale: + min: 2 + max: 2 - name: out sink: log: {} diff --git a/test/udsource-e2e/testdata/simple-source-java.yaml b/test/udsource-e2e/testdata/simple-source-java.yaml index 9d30d9413c..14e517a74f 100644 --- a/test/udsource-e2e/testdata/simple-source-java.yaml +++ b/test/udsource-e2e/testdata/simple-source-java.yaml @@ -13,6 +13,9 @@ spec: image: quay.io/numaio/numaflow-java/source-simple-source:stable limits: readBatchSize: 500 + scale: + min: 2 + max: 2 - name: out sink: log: {} diff --git a/test/udsource-e2e/testdata/simple-source-python.yaml b/test/udsource-e2e/testdata/simple-source-python.yaml index fc3d9f5c60..3f869ea098 100644 --- a/test/udsource-e2e/testdata/simple-source-python.yaml +++ b/test/udsource-e2e/testdata/simple-source-python.yaml @@ -14,6 +14,9 @@ spec: imagePullPolicy: Always limits: readBatchSize: 500 + scale: + min: 2 + max: 2 - name: out sink: log: {} diff --git a/test/udsource-e2e/udsource_test.go b/test/udsource-e2e/udsource_test.go index 57b9f30e93..f9e5dd48c5 100644 --- a/test/udsource-e2e/udsource_test.go +++ b/test/udsource-e2e/udsource_test.go @@ -1,5 +1,3 @@ -//go:build test - /* Copyright 2022 The Numaproj Authors. @@ -19,12 +17,18 @@ limitations under the License. package e2e import ( + "context" "fmt" + "math" "sync" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + daemonclient "github.com/numaproj/numaflow/pkg/daemon/client" . "github.com/numaproj/numaflow/test/fixtures" ) @@ -33,15 +37,15 @@ type UserDefinedSourceSuite struct { } func (s *UserDefinedSourceSuite) testSimpleSourceGo() { - s.testSimpleSource("go") + s.testSimpleSource("go", true) } func (s *UserDefinedSourceSuite) testSimpleSourceJava() { - s.testSimpleSource("java") + s.testSimpleSource("java", false) } func (s *UserDefinedSourceSuite) testSimpleSourcePython() { - s.testSimpleSource("python") + s.testSimpleSource("python", false) } func (s *UserDefinedSourceSuite) TestUDSource() { @@ -62,14 +66,24 @@ func (s *UserDefinedSourceSuite) TestUDSource() { wg.Wait() } -func (s *UserDefinedSourceSuite) testSimpleSource(lang string) { +func (s *UserDefinedSourceSuite) testSimpleSource(lang string, verifyRate bool) { w := s.Given().Pipeline(fmt.Sprintf("@testdata/simple-source-%s.yaml", lang)). When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - // wait for all the pods to come up - w.Expect().VertexPodsRunning() + pipelineName := fmt.Sprintf("simple-source-%s", lang) + + if verifyRate { + // wait for all the pods and daemon server to come up + w.Expect().VertexPodsRunning().DaemonPodsRunning().DaemonPodLogContains(pipelineName, LogDaemonStarted) + // port-forward daemon server + defer w.DaemonPodPortForward(pipelineName, 1234, dfv1.DaemonServicePort). + TerminateAllPodPortForwards() + } else { + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + } // we use the log sink instead of redis to verify the output because the simple user-defined source generates // such a large amount of data that the redis sink is not able to handle it, it breaks with OOM error @@ -84,6 +98,67 @@ func (s *UserDefinedSourceSuite) testSimpleSource(lang string) { w.Expect().VertexPodLogContains("out", "630") w.Expect().VertexPodLogContains("out", "999") + if verifyRate { + // verify the processing rate match between source and sink + client, err := daemonclient.NewDaemonServiceClient("localhost:1234") + assert.NoError(s.T(), err) + defer func() { + _ = client.Close() + }() + + // timeout the test if rates don't match within 2 minutes. + timer := time.NewTimer(120 * time.Second) + // we use 10-second windows for rate calculation + // wait for 10 seconds for a new timestamped count entry to be added to the rate calculation windows + waitInterval := 10 * time.Second + succeedChan := make(chan struct{}) + go func() { + vertexNames := []string{"in", "out"} + for { + var rates []float64 + for _, vertexName := range vertexNames { + m, err := client.GetVertexMetrics(context.Background(), pipelineName, vertexName) + assert.NoError(s.T(), err) + assert.Equal(s.T(), pipelineName, *m[0].Pipeline) + oneMinRate := m[0].ProcessingRates["1m"] + rates = append(rates, oneMinRate) + } + if !ratesMatch(rates) { + time.Sleep(waitInterval) + } else { + succeedChan <- struct{}{} + break + } + } + }() + select { + case <-succeedChan: + time.Sleep(waitInterval) + break + case <-timer.C: + assert.Fail(s.T(), "timed out waiting for processing rate to match across vertices.") + } + timer.Stop() + } +} + +func ratesMatch(rates []float64) bool { + if len(rates) <= 1 { + return true + } + firstVal := rates[0] + // the simple source can reach 8k TPS, we don't compare until the pipeline is stable. + // using 5000 as a threshold + if firstVal < 5000 { + return false + } + for i := 1; i < len(rates); i++ { + diff := math.Abs(firstVal - rates[i]) + if diff > (firstVal * 0.1) { + return false + } + } + return true } func TestUserDefinedSourceSuite(t *testing.T) {