Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: verify processing rate matches across vertices when using udsource #102

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions test/udsource-e2e/testdata/simple-source-go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ spec:
image: quay.io/numaio/numaflow-go/source-simple-source:stable
limits:
readBatchSize: 500
scale:
min: 2
max: 2
- name: out
sink:
log: {}
Expand Down
3 changes: 3 additions & 0 deletions test/udsource-e2e/testdata/simple-source-java.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ spec:
image: quay.io/numaio/numaflow-java/source-simple-source:stable
limits:
readBatchSize: 500
scale:
min: 2
max: 2
- name: out
sink:
log: {}
Expand Down
3 changes: 3 additions & 0 deletions test/udsource-e2e/testdata/simple-source-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ spec:
imagePullPolicy: Always
limits:
readBatchSize: 500
scale:
min: 2
max: 2
- name: out
sink:
log: {}
Expand Down
91 changes: 83 additions & 8 deletions test/udsource-e2e/udsource_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.

Expand All @@ -19,12 +17,18 @@ limitations under the License.
package e2e

import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
daemonclient "github.com/numaproj/numaflow/pkg/daemon/client"
. "github.com/numaproj/numaflow/test/fixtures"
)

Expand All @@ -33,15 +37,15 @@ type UserDefinedSourceSuite struct {
}

func (s *UserDefinedSourceSuite) testSimpleSourceGo() {
s.testSimpleSource("go")
s.testSimpleSource("go", true)
}

func (s *UserDefinedSourceSuite) testSimpleSourceJava() {
s.testSimpleSource("java")
s.testSimpleSource("java", false)
}

func (s *UserDefinedSourceSuite) testSimpleSourcePython() {
s.testSimpleSource("python")
s.testSimpleSource("python", false)
}

func (s *UserDefinedSourceSuite) TestUDSource() {
Expand All @@ -62,14 +66,24 @@ func (s *UserDefinedSourceSuite) TestUDSource() {
wg.Wait()
}

func (s *UserDefinedSourceSuite) testSimpleSource(lang string) {
func (s *UserDefinedSourceSuite) testSimpleSource(lang string, verifyRate bool) {
w := s.Given().Pipeline(fmt.Sprintf("@testdata/simple-source-%s.yaml", lang)).
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()

// wait for all the pods to come up
w.Expect().VertexPodsRunning()
pipelineName := fmt.Sprintf("simple-source-%s", lang)

if verifyRate {
// wait for all the pods and daemon server to come up
w.Expect().VertexPodsRunning().DaemonPodsRunning().DaemonPodLogContains(pipelineName, LogDaemonStarted)
// port-forward daemon server
defer w.DaemonPodPortForward(pipelineName, 1234, dfv1.DaemonServicePort).
TerminateAllPodPortForwards()
} else {
// wait for all the pods to come up
w.Expect().VertexPodsRunning()
}

// we use the log sink instead of redis to verify the output because the simple user-defined source generates
// such a large amount of data that the redis sink is not able to handle it, it breaks with OOM error
Expand All @@ -84,6 +98,67 @@ func (s *UserDefinedSourceSuite) testSimpleSource(lang string) {
w.Expect().VertexPodLogContains("out", "630")
w.Expect().VertexPodLogContains("out", "999")

if verifyRate {
// verify the processing rate match between source and sink
client, err := daemonclient.NewDaemonServiceClient("localhost:1234")
assert.NoError(s.T(), err)
defer func() {
_ = client.Close()
}()

// timeout the test if rates don't match within 2 minutes.
timer := time.NewTimer(120 * time.Second)
// we use 10-second windows for rate calculation
// wait for 10 seconds for a new timestamped count entry to be added to the rate calculation windows
waitInterval := 10 * time.Second
succeedChan := make(chan struct{})
go func() {
vertexNames := []string{"in", "out"}
for {
var rates []float64
for _, vertexName := range vertexNames {
m, err := client.GetVertexMetrics(context.Background(), pipelineName, vertexName)
assert.NoError(s.T(), err)
assert.Equal(s.T(), pipelineName, *m[0].Pipeline)
oneMinRate := m[0].ProcessingRates["1m"]
rates = append(rates, oneMinRate)
}
if !ratesMatch(rates) {
time.Sleep(waitInterval)
} else {
succeedChan <- struct{}{}
break
}
}
}()
select {
case <-succeedChan:
time.Sleep(waitInterval)
break
case <-timer.C:
assert.Fail(s.T(), "timed out waiting for processing rate to match across vertices.")
}
timer.Stop()
}
}

func ratesMatch(rates []float64) bool {
if len(rates) <= 1 {
return true
}
firstVal := rates[0]
// the simple source can reach 8k TPS, we don't compare until the pipeline is stable.
// using 5000 as a threshold
if firstVal < 5000 {
return false
}
for i := 1; i < len(rates); i++ {
diff := math.Abs(firstVal - rates[i])
if diff > (firstVal * 0.1) {
return false
}
}
return true
}

func TestUserDefinedSourceSuite(t *testing.T) {
Expand Down
Loading