From 54fe2ffdda30cf1abe60c5d631d98bea2cb06ba4 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 13 Mar 2024 20:48:21 -0400 Subject: [PATCH 1/8] test: add simple session reduce e2e for java sdk Signed-off-by: Keran Yang --- test/reduce-two-e2e/reduce_two_test.go | 14 ++++-- ...le-session-keyed-counter-pipeline-go.yaml} | 2 +- ...e-session-keyed-counter-pipeline-java.yaml | 47 +++++++++++++++++++ 3 files changed, 59 insertions(+), 4 deletions(-) rename test/reduce-two-e2e/testdata/session-reduce/{simple-session-keyed-counter-pipeline.yaml => simple-session-keyed-counter-pipeline-go.yaml} (97%) create mode 100644 test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml diff --git a/test/reduce-two-e2e/reduce_two_test.go b/test/reduce-two-e2e/reduce_two_test.go index 91351604a1..2d7e9ae2cf 100644 --- a/test/reduce-two-e2e/reduce_two_test.go +++ b/test/reduce-two-e2e/reduce_two_test.go @@ -120,14 +120,22 @@ func (r *ReduceSuite) TestSimpleSessionPipeline() { done <- struct{}{} } -func (r *ReduceSuite) TestSimpleSessionKeyedPipeline() { +func (r *ReduceSuite) TestSimpleSessionKeyedPipelineGo() { + r.testSimpleSessionKeyedPipeline("go") +} + +func (r *ReduceSuite) TestSimpleSessionKeyedPipelineJava() { + r.testSimpleSessionKeyedPipeline("java") +} + +func (r *ReduceSuite) testSimpleSessionKeyedPipeline(lang string) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml"). + w := r.Given().Pipeline(fmt.Sprintf("@testdata/session-reduce/simple-session-keyed-counter-pipeline-%s.yaml", lang)). When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - pipelineName := "simple-session-counter" + fmt.Sprintf("simple-session-counter-%s", lang) // wait for all the pods to come up w.Expect().VertexPodsRunning() diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml similarity index 97% rename from test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml rename to test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml index 6966bce860..b4f23b44d8 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: simple-session-counter + name: simple-session-counter-go spec: vertices: - name: in diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml new file mode 100644 index 0000000000..100031748e --- /dev/null +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml @@ -0,0 +1,47 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-session-counter-java +spec: + vertices: + - name: in + scale: + min: 1 + source: + http: {} + - name: even-odd + scale: + min: 1 + udf: + container: + image: quay.io/numaio/numaflow-go/map-even-odd:v0.6.1 + - name: compute-count + partitions: 1 + udf: + container: + # see https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter + image: quay.io/numaio/numaflow-java/session-reduce-count:stable + groupBy: + window: + session: + timeout: 10s + keyed: true + storage: + persistentVolumeClaim: + volumeSize: 2Gi + accessMode: ReadWriteOnce + - name: sink + scale: + min: 1 + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + edges: + - from: in + to: even-odd + - from: even-odd + to: compute-count + - from: compute-count + to: sink From 69a158f134d1d4fdbc68df88d702e5471e04d19b Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 13 Mar 2024 20:50:44 -0400 Subject: [PATCH 2/8] . Signed-off-by: Keran Yang --- test/reduce-two-e2e/reduce_two_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/reduce-two-e2e/reduce_two_test.go b/test/reduce-two-e2e/reduce_two_test.go index 2d7e9ae2cf..346f15c6ef 100644 --- a/test/reduce-two-e2e/reduce_two_test.go +++ b/test/reduce-two-e2e/reduce_two_test.go @@ -135,7 +135,7 @@ func (r *ReduceSuite) testSimpleSessionKeyedPipeline(lang string) { When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - fmt.Sprintf("simple-session-counter-%s", lang) + pipelineName := fmt.Sprintf("simple-session-counter-%s", lang) // wait for all the pods to come up w.Expect().VertexPodsRunning() From e5cca0344e8c03d396325e0d1e72df5fe0b4aadb Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 13 Mar 2024 20:55:26 -0400 Subject: [PATCH 3/8] Revert "chore(deps): bump google.golang.org/protobuf from 1.31.0 to 1.33.0 (#1556)" This reverts commit 4b580b148c29ca12776eae134f31ec46563e8bcc. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index f5617b6b86..2b6d8f175d 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( golang.org/x/sync v0.6.0 google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e google.golang.org/grpc v1.58.3 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.31.0 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 diff --git a/go.sum b/go.sum index c04f65d8b2..ea3a899407 100644 --- a/go.sum +++ b/go.sum @@ -1372,8 +1372,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 7f27a4d526054997d898feb5cfccadae1bab9ad0 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 13 Mar 2024 21:29:41 -0400 Subject: [PATCH 4/8] . Signed-off-by: Keran Yang --- test/reduce-two-e2e/reduce_two_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/reduce-two-e2e/reduce_two_test.go b/test/reduce-two-e2e/reduce_two_test.go index 346f15c6ef..1452860ad0 100644 --- a/test/reduce-two-e2e/reduce_two_test.go +++ b/test/reduce-two-e2e/reduce_two_test.go @@ -177,11 +177,11 @@ func (r *ReduceSuite) testSimpleSessionKeyedPipeline(lang string) { func (r *ReduceSuite) TestSimpleSessionPipelineFailOverUsingWAL() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml"). + w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml"). When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - pipelineName := "simple-session-counter" + pipelineName := "simple-session-counter-go" // wait for all the pods to come up w.Expect().VertexPodsRunning() From c775aba7c5713f9e30752c5ab4ee622b2389da7e Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 13 Mar 2024 22:10:22 -0400 Subject: [PATCH 5/8] . Signed-off-by: Keran Yang --- test/reduce-two-e2e/reduce_two_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/reduce-two-e2e/reduce_two_test.go b/test/reduce-two-e2e/reduce_two_test.go index 1452860ad0..a37abcb8fe 100644 --- a/test/reduce-two-e2e/reduce_two_test.go +++ b/test/reduce-two-e2e/reduce_two_test.go @@ -187,7 +187,7 @@ func (r *ReduceSuite) TestSimpleSessionPipelineFailOverUsingWAL() { w.Expect().VertexPodsRunning() args := "kubectl delete po -n numaflow-system -l " + - "numaflow.numaproj.io/pipeline-name=simple-session-counter,numaflow.numaproj.io/vertex-name=compute-count" + "numaflow.numaproj.io/pipeline-name=simple-session-counter-go,numaflow.numaproj.io/vertex-name=compute-count" // Kill the reducer pods before processing to trigger failover. w.Exec("/bin/sh", []string{"-c", args}, CheckPodKillSucceeded) From 2183fdebc4b7ce7b063bceeeb16abf5faa322674 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 13 Mar 2024 22:22:14 -0400 Subject: [PATCH 6/8] Revert "Revert "chore(deps): bump google.golang.org/protobuf from 1.31.0 to 1.33.0 (#1556)"" This reverts commit e5cca0344e8c03d396325e0d1e72df5fe0b4aadb. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 2b6d8f175d..f5617b6b86 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( golang.org/x/sync v0.6.0 google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e google.golang.org/grpc v1.58.3 - google.golang.org/protobuf v1.31.0 + google.golang.org/protobuf v1.33.0 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 diff --git a/go.sum b/go.sum index ea3a899407..c04f65d8b2 100644 --- a/go.sum +++ b/go.sum @@ -1372,8 +1372,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 7ef0d10bb240c7e26707b5a9ff17e63feb3bea1a Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Thu, 14 Mar 2024 23:20:17 -0400 Subject: [PATCH 7/8] test adding batch size limit Signed-off-by: Keran Yang --- .../simple-session-keyed-counter-pipeline-go.yaml | 2 ++ .../simple-session-keyed-counter-pipeline-java.yaml | 2 ++ .../testdata/session-reduce/simple-session-sum-pipeline.yaml | 2 ++ 3 files changed, 6 insertions(+) diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml index b4f23b44d8..944c10dba9 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml @@ -3,6 +3,8 @@ kind: Pipeline metadata: name: simple-session-counter-go spec: + limits: + readBatchSize: 50 vertices: - name: in scale: diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml index 100031748e..4e160d9ee9 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml @@ -3,6 +3,8 @@ kind: Pipeline metadata: name: simple-session-counter-java spec: + limits: + readBatchSize: 50 vertices: - name: in scale: diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml index 2b493e3e5b..3e19d8da61 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml @@ -3,6 +3,8 @@ kind: Pipeline metadata: name: simple-session-sum spec: + limits: + readBatchSize: 50 vertices: - name: in scale: From 4f3fb3154ba7da2c3cadcc29975f5f49c7292ffc Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 18 Mar 2024 10:45:28 -0400 Subject: [PATCH 8/8] Revert "test adding batch size limit" This reverts commit 7ef0d10bb240c7e26707b5a9ff17e63feb3bea1a. --- .../simple-session-keyed-counter-pipeline-go.yaml | 2 -- .../simple-session-keyed-counter-pipeline-java.yaml | 2 -- .../testdata/session-reduce/simple-session-sum-pipeline.yaml | 2 -- 3 files changed, 6 deletions(-) diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml index 944c10dba9..b4f23b44d8 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml @@ -3,8 +3,6 @@ kind: Pipeline metadata: name: simple-session-counter-go spec: - limits: - readBatchSize: 50 vertices: - name: in scale: diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml index 4e160d9ee9..100031748e 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml @@ -3,8 +3,6 @@ kind: Pipeline metadata: name: simple-session-counter-java spec: - limits: - readBatchSize: 50 vertices: - name: in scale: diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml index 3e19d8da61..2b493e3e5b 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml @@ -3,8 +3,6 @@ kind: Pipeline metadata: name: simple-session-sum spec: - limits: - readBatchSize: 50 vertices: - name: in scale: