diff --git a/test/reduce-two-e2e/reduce_two_test.go b/test/reduce-two-e2e/reduce_two_test.go index 91351604a1..a37abcb8fe 100644 --- a/test/reduce-two-e2e/reduce_two_test.go +++ b/test/reduce-two-e2e/reduce_two_test.go @@ -120,14 +120,22 @@ func (r *ReduceSuite) TestSimpleSessionPipeline() { done <- struct{}{} } -func (r *ReduceSuite) TestSimpleSessionKeyedPipeline() { +func (r *ReduceSuite) TestSimpleSessionKeyedPipelineGo() { + r.testSimpleSessionKeyedPipeline("go") +} + +func (r *ReduceSuite) TestSimpleSessionKeyedPipelineJava() { + r.testSimpleSessionKeyedPipeline("java") +} + +func (r *ReduceSuite) testSimpleSessionKeyedPipeline(lang string) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml"). + w := r.Given().Pipeline(fmt.Sprintf("@testdata/session-reduce/simple-session-keyed-counter-pipeline-%s.yaml", lang)). When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - pipelineName := "simple-session-counter" + pipelineName := fmt.Sprintf("simple-session-counter-%s", lang) // wait for all the pods to come up w.Expect().VertexPodsRunning() @@ -169,17 +177,17 @@ func (r *ReduceSuite) TestSimpleSessionKeyedPipeline() { func (r *ReduceSuite) TestSimpleSessionPipelineFailOverUsingWAL() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml"). + w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml"). When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - pipelineName := "simple-session-counter" + pipelineName := "simple-session-counter-go" // wait for all the pods to come up w.Expect().VertexPodsRunning() args := "kubectl delete po -n numaflow-system -l " + - "numaflow.numaproj.io/pipeline-name=simple-session-counter,numaflow.numaproj.io/vertex-name=compute-count" + "numaflow.numaproj.io/pipeline-name=simple-session-counter-go,numaflow.numaproj.io/vertex-name=compute-count" // Kill the reducer pods before processing to trigger failover. w.Exec("/bin/sh", []string{"-c", args}, CheckPodKillSucceeded) diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml similarity index 97% rename from test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml rename to test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml index 7868194690..413977cf86 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml @@ -1,7 +1,7 @@ apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: - name: simple-session-counter + name: simple-session-counter-go spec: watermark: maxDelay: 60s diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml new file mode 100644 index 0000000000..100031748e --- /dev/null +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml @@ -0,0 +1,47 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-session-counter-java +spec: + vertices: + - name: in + scale: + min: 1 + source: + http: {} + - name: even-odd + scale: + min: 1 + udf: + container: + image: quay.io/numaio/numaflow-go/map-even-odd:v0.6.1 + - name: compute-count + partitions: 1 + udf: + container: + # see https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter + image: quay.io/numaio/numaflow-java/session-reduce-count:stable + groupBy: + window: + session: + timeout: 10s + keyed: true + storage: + persistentVolumeClaim: + volumeSize: 2Gi + accessMode: ReadWriteOnce + - name: sink + scale: + min: 1 + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + edges: + - from: in + to: even-odd + - from: even-odd + to: compute-count + - from: compute-count + to: sink