Skip to content

Commit

Permalink
test: Add e2e test case for Kafka idle source (numaproj#1412)
Browse files Browse the repository at this point in the history
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Antonino Fugazzotto <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
Co-authored-by:  Antonino Fugazzotto <[email protected]>
  • Loading branch information
3 people authored Mar 19, 2024
1 parent ea55a92 commit b243c6c
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 23 deletions.
2 changes: 1 addition & 1 deletion config/apps/kafka/kafka-minimal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ spec:
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_CREATE_TOPICS
value: "input-topic:1:1,middle-topic:1:1,output-topic:1:1"
value: "kafka-topic:2:1,input-topic:1:1,middle-topic:1:1,output-topic:1:1"
readinessProbe:
tcpSocket:
port: 9092
Expand Down
7 changes: 2 additions & 5 deletions pkg/shared/idlehandler/source_idlehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,8 @@ func TestSourceIdleHandler_IsSourceIdling(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
iw := &SourceIdleHandler{
config: tt.fields.config,
lastIdleWmPublishedTime: tt.fields.lastIdleWmPublishedTime,
updatedTS: tt.fields.updatedTS,
}
iw := NewSourceIdleHandler(tt.fields.config, nil, nil)
iw.lastIdleWmPublishedTime = tt.fields.lastIdleWmPublishedTime
time.Sleep(tt.sleep)
if got := iw.IsSourceIdling(); got != tt.want {
t.Errorf("IsSourceIdling() = %v, want %v", got, tt.want)
Expand Down
30 changes: 24 additions & 6 deletions test/e2e-api/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,35 @@ func init() {
})

http.HandleFunc("/kafka/produce-topic", func(w http.ResponseWriter, r *http.Request) {
var (
partition int
err error
)
topic := r.URL.Query().Get("topic")
key := r.URL.Query().Get("key")
queryPartition := r.URL.Query().Get("partition")

config := sarama.NewConfig()
config.Producer.Return.Successes = true
if queryPartition == "" {
partition = 0
} else {
partition, err = strconv.Atoi(queryPartition)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// if partition is specified, use manual partitioner
config.Producer.Partitioner = sarama.NewManualPartitioner
}
buf, err := io.ReadAll(r.Body)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

config := sarama.NewConfig()
config.Producer.Return.Successes = true

syncProducer, err := sarama.NewSyncProducer(brokers, config)

if err != nil {
Expand All @@ -135,9 +152,10 @@ func init() {
}
defer syncProducer.Close()
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(buf),
Key: sarama.ByteEncoder([]byte(key)),
Topic: topic,
Value: sarama.ByteEncoder(buf),
Key: sarama.ByteEncoder([]byte(key)),
Partition: int32(partition),
}
if _, _, err := syncProducer.SendMessage(message); err != nil {
_, _ = fmt.Fprintf(w, "ERROR: %v\n", err)
Expand Down
4 changes: 2 additions & 2 deletions test/fixtures/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func GetKafkaCount(topic string, count int) int {
return count
}

func SendMessage(topic string, key string, message string) {
InvokeE2EAPIPOST("/kafka/produce-topic?topic=%s&key=%s", message, topic, key)
func SendMessage(topic string, key string, message string, partition int) {
InvokeE2EAPIPOST("/kafka/produce-topic?topic=%s&key=%s&partition=%d", message, topic, key, partition)

}

Expand Down
77 changes: 74 additions & 3 deletions test/idle-source-e2e/idle_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,23 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

/* Test the functionality of progressing watermark in case of idling.
for example: publishing the data to only one replica instead of multiples.
Once "threshold" reached to 5s(configurable) and if source is found as idle, then it will increment the watermark by
3s(configurable) after waiting for stepInterval of 2s(configurable).
*/

//go:generate kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true
//go:generate kubectl apply -k ../../config/apps/kafka -n numaflow-system
// Wait for zookeeper to come up
//go:generate sleep 60

package idle_source_e2e

import (
"context"
"encoding/json"
"log"
"strconv"
"testing"
"time"
Expand All @@ -32,14 +45,14 @@ type IdleSourceSuite struct {
E2ESuite
}

func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithHttpSource() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w := is.Given().Pipeline("@testdata/idle-source-reduce-pipeline.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "idle-source"
pipelineName := "http-idle-source"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()
Expand All @@ -60,6 +73,7 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() {
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("X-Numaflow-Event-Time", eventTime))
time.Sleep(10 * time.Millisecond)
}
}
}()
Expand All @@ -72,6 +86,63 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() {
done <- struct{}{}
}

func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithKafkaSource() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

topic := "kafka-topic"

w := is.Given().Pipeline("@testdata/kafka-pipeline.yaml").When().CreatePipelineAndWait()
// wait for all the pods to come up
w.Expect().VertexPodsRunning()

defer w.DeletePipelineAndWait()
defer DeleteKafkaTopic(topic)

done := make(chan struct{})
go func() {
startTime := time.Now().Add(-10000 * time.Hour)
for i := 0; true; i++ {
select {
case <-ctx.Done():
return
case <-done:
return
default:
// send message to both partition for first 1000 messages for overcome the kafka source lazy loading wm publisher.
// after that send message to only one partition. so that idle source will be detected and wm will be progressed.
SendMessage(topic, "data", generateMsg("1", startTime), 0)
if i < 2000 {
SendMessage(topic, "data", generateMsg("2", startTime), 1)
}
time.Sleep(10 * time.Millisecond)
startTime = startTime.Add(1 * time.Second)
}
}
}()

// since the window duration is 10 second, so the count of event will be 20, when sending data to only both partition.
w.Expect().SinkContains("sink", "20", WithTimeout(300*time.Second))
// since the window duration is 10 second, so the count of event will be 10, when sending data to only one partition.
w.Expect().SinkContains("sink", "10", WithTimeout(300*time.Second))

done <- struct{}{}
}

type data struct {
Value string `json:"value"`
Time time.Time `json:"time"`
}

func generateMsg(msg string, t time.Time) string {
testMsg := data{Value: msg, Time: t}
jsonBytes, err := json.Marshal(testMsg)
if err != nil {
log.Fatalf("failed to marshal test message: %v", err)
}
return string(jsonBytes)
}

func TestReduceSuite(t *testing.T) {
suite.Run(t, new(IdleSourceSuite))
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: idle-source
name: http-idle-source
spec:
limits:
readBatchSize: 50
Expand Down
54 changes: 54 additions & 0 deletions test/idle-source-e2e/testdata/kafka-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: kafka-idle-source
spec:
limits:
readBatchSize: 50
watermark:
idleSource:
threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value.
incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value.
stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed.
vertices:
- name: kafka-in
source:
kafka:
brokers:
- kafka-broker:9092
topic: kafka-topic
consumerGroup: test-group
transformer:
builtin:
name: eventTimeExtractor
kwargs:
expression: json(payload).time
format: 2006-01-02T15:04:05Z07:00
scale:
min: 2
- name: count-event
udf:
container:
image: quay.io/numaio/numaflow-go/reduce-counter:v0.6.1
groupBy:
window:
fixed:
length: 10s
keyed: true
storage:
persistentVolumeClaim:
volumeSize: 10Gi
accessMode: ReadWriteOnce
- name: sink
scale:
min: 1
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink
image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest
edges:
- from: kafka-in
to: count-event
- from: count-event
to: sink
10 changes: 5 additions & 5 deletions test/reduce-one-e2e/reduce_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ReduceSuite struct {

// one reduce vertex (keyed)
func (r *ReduceSuite) TestSimpleKeyedReducePipeline() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w := r.Given().Pipeline("@testdata/simple-keyed-reduce-pipeline.yaml").
When().
Expand Down Expand Up @@ -74,7 +74,7 @@ func (r *ReduceSuite) TestSimpleKeyedReducePipeline() {

// one reduce vertex(non keyed)
func (r *ReduceSuite) TestSimpleNonKeyedReducePipeline() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w := r.Given().Pipeline("@testdata/simple-non-keyed-reduce-pipeline.yaml").
When().
Expand Down Expand Up @@ -112,7 +112,7 @@ func (r *ReduceSuite) TestSimpleNonKeyedReducePipeline() {

// two reduce vertex(keyed and non keyed)
func (r *ReduceSuite) TestComplexReducePipelineKeyedNonKeyed() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w := r.Given().Pipeline("@testdata/complex-reduce-pipeline.yaml").
When().
Expand Down Expand Up @@ -149,7 +149,7 @@ func (r *ReduceSuite) TestComplexReducePipelineKeyedNonKeyed() {
}

func (r *ReduceSuite) TestSimpleReducePipelineFailOverUsingWAL() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w := r.Given().Pipeline("@testdata/simple-reduce-pipeline-wal.yaml").
When().
Expand Down Expand Up @@ -199,7 +199,7 @@ func (r *ReduceSuite) TestSimpleReducePipelineFailOverUsingWAL() {

// two reduce vertices (keyed and non-keyed) followed by a sliding window vertex
func (r *ReduceSuite) TestComplexSlidingWindowPipeline() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
w := r.Given().Pipeline("@testdata/complex-sliding-window-pipeline.yaml").
When().
Expand Down

0 comments on commit b243c6c

Please sign in to comment.