Skip to content

Commit

Permalink
Merge branch 'KeranYang-session-e2e'
Browse files Browse the repository at this point in the history
  • Loading branch information
KeranYang committed Mar 18, 2024
2 parents b68525b + 4f3fb31 commit d7dee97
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
20 changes: 14 additions & 6 deletions test/reduce-two-e2e/reduce_two_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,22 @@ func (r *ReduceSuite) TestSimpleSessionPipeline() {
done <- struct{}{}
}

func (r *ReduceSuite) TestSimpleSessionKeyedPipeline() {
func (r *ReduceSuite) TestSimpleSessionKeyedPipelineGo() {
r.testSimpleSessionKeyedPipeline("go")
}

func (r *ReduceSuite) TestSimpleSessionKeyedPipelineJava() {
r.testSimpleSessionKeyedPipeline("java")
}

func (r *ReduceSuite) testSimpleSessionKeyedPipeline(lang string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml").
w := r.Given().Pipeline(fmt.Sprintf("@testdata/session-reduce/simple-session-keyed-counter-pipeline-%s.yaml", lang)).
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "simple-session-counter"
pipelineName := fmt.Sprintf("simple-session-counter-%s", lang)

// wait for all the pods to come up
w.Expect().VertexPodsRunning()
Expand Down Expand Up @@ -169,17 +177,17 @@ func (r *ReduceSuite) TestSimpleSessionKeyedPipeline() {
func (r *ReduceSuite) TestSimpleSessionPipelineFailOverUsingWAL() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline.yaml").
w := r.Given().Pipeline("@testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "simple-session-counter"
pipelineName := "simple-session-counter-go"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

args := "kubectl delete po -n numaflow-system -l " +
"numaflow.numaproj.io/pipeline-name=simple-session-counter,numaflow.numaproj.io/vertex-name=compute-count"
"numaflow.numaproj.io/pipeline-name=simple-session-counter-go,numaflow.numaproj.io/vertex-name=compute-count"

// Kill the reducer pods before processing to trigger failover.
w.Exec("/bin/sh", []string{"-c", args}, CheckPodKillSucceeded)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-session-counter
name: simple-session-counter-go
spec:
watermark:
maxDelay: 60s
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-session-counter-java
spec:
vertices:
- name: in
scale:
min: 1
source:
http: {}
- name: even-odd
scale:
min: 1
udf:
container:
image: quay.io/numaio/numaflow-go/map-even-odd:v0.6.1
- name: compute-count
partitions: 1
udf:
container:
# see https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter
image: quay.io/numaio/numaflow-java/session-reduce-count:stable
groupBy:
window:
session:
timeout: 10s
keyed: true
storage:
persistentVolumeClaim:
volumeSize: 2Gi
accessMode: ReadWriteOnce
- name: sink
scale:
min: 1
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink
image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest
edges:
- from: in
to: even-odd
- from: even-odd
to: compute-count
- from: compute-count
to: sink

0 comments on commit d7dee97

Please sign in to comment.