Skip to content

Commit

Permalink
chore: Batch ack requests and sink responses for better performance n…
Browse files Browse the repository at this point in the history
…umaproj#163 (numaproj#2194)

Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Oct 31, 2024
1 parent 00a74df commit f6dbe86
Show file tree
Hide file tree
Showing 27 changed files with 152 additions and 188 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ RUN chmod +x /bin/e2eapi
####################################################################################################
FROM scratch AS e2eapi
COPY --from=testbase /bin/e2eapi .
ENTRYPOINT ["/e2eapi"]
ENTRYPOINT ["/e2eapi"]
6 changes: 2 additions & 4 deletions examples/21-simple-mono-vertex.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
name: simple-mono-vertex
spec:
source:
udsource:
container:
image: quay.io/numaio/numaflow-java/source-simple-source:stable
image: quay.io/numaio/numaflow-rs/simple-source:stable
# transformer is an optional container to do any transformation to the incoming data before passing to the sink
transformer:
container:
image: quay.io/numaio/numaflow-rs/source-transformer-now:stable
sink:
udsink:
container:
image: quay.io/numaio/numaflow-java/simple-sink:stable
image: quay.io/numaio/numaflow-rs/sink-log:stable
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.20
github.com/nats-io/nats.go v1.37.0
github.com/numaproj/numaflow-go v0.8.2-0.20241014112709-e12c1b5176bd
github.com/numaproj/numaflow-go v0.8.2-0.20241030023053-f6819383aa7b
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.8.2-0.20241014112709-e12c1b5176bd h1:yL7sbAaeCw2rWar1CF19N69KEHmcJpL1YjtqOWEG41c=
github.com/numaproj/numaflow-go v0.8.2-0.20241014112709-e12c1b5176bd/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc=
github.com/numaproj/numaflow-go v0.8.2-0.20241030023053-f6819383aa7b h1:UEhFHfBwe2DwtnYzdFteTZ2tKwMX739llzfebfEMGg4=
github.com/numaproj/numaflow-go v0.8.2-0.20241030023053-f6819383aa7b/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Result result = 1;
repeated Result results = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
2 changes: 1 addition & 1 deletion pkg/apis/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ message ReadResponse {
message AckRequest {
message Request {
// Required field holding the offset to be acked
Offset offset = 1;
repeated Offset offsets = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
10 changes: 7 additions & 3 deletions pkg/sdkclient/sinker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,21 @@ func (c *client) SinkFn(ctx context.Context, requests []*sinkpb.SinkRequest) ([]

// Wait for the corresponding responses
var responses []*sinkpb.SinkResponse
for i := 0; i < len(requests)+1; i++ {
responsesCount := 0
for {
resp, err := c.sinkStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive sink response: %v", err)
}
if resp.GetStatus() != nil && resp.GetStatus().GetEot() {
if i != len(requests) {
c.log.Errorw("Received EOT message before all responses are received, we will wait indefinitely for the remaining responses", zap.Int("received", i), zap.Int("expected", len(requests)))
if responsesCount != len(requests) {
c.log.Errorw("Received EOT message before all responses are received, we will wait indefinitely for the remaining responses", zap.Int("received", responsesCount), zap.Int("expected", len(requests)))
} else {
break
}
continue
}
responsesCount += len(resp.GetResults())
responses = append(responses, resp)
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/sdkclient/sinker/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ func TestClient_SinkFn(t *testing.T) {
mockSinkClient := sinkmock.NewMockSink_SinkFnClient(ctrl)
mockSinkClient.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{
Result: &sinkpb.SinkResponse_Result{
Id: "temp-id",
Status: sinkpb.Status_SUCCESS,
Results: []*sinkpb.SinkResponse_Result{
{
Id: "temp-id",
Status: sinkpb.Status_SUCCESS,
},
},
}, nil)
mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{
Expand All @@ -94,9 +96,11 @@ func TestClient_SinkFn(t *testing.T) {
})
assert.Equal(t, []*sinkpb.SinkResponse{
{
Result: &sinkpb.SinkResponse_Result{
Id: "temp-id",
Status: sinkpb.Status_SUCCESS,
Results: []*sinkpb.SinkResponse_Result{
{
Id: "temp-id",
Status: sinkpb.Status_SUCCESS,
},
},
},
}, response)
Expand Down
26 changes: 9 additions & 17 deletions pkg/sdkclient/source/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,19 @@ func (c *client) ReadFn(_ context.Context, req *sourcepb.ReadRequest, datumCh ch
}

// AckFn acknowledges the data from the source.
func (c *client) AckFn(_ context.Context, reqs []*sourcepb.AckRequest) ([]*sourcepb.AckResponse, error) {
func (c *client) AckFn(_ context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error) {
// Send the ack request
for _, req := range reqs {
err := c.ackStream.Send(req)
if err != nil {
return nil, fmt.Errorf("failed to send ack request: %v", err)
}
err := c.ackStream.Send(req)
if err != nil {
return nil, fmt.Errorf("failed to send ack request: %v", err)
}

responses := make([]*sourcepb.AckResponse, len(reqs))
for i := 0; i < len(reqs); i++ {
// Wait for the ack response
resp, err := c.ackStream.Recv()
// we don't need an EOF check because we only close the stream during shutdown.
if err != nil {
return nil, fmt.Errorf("failed to receive ack response: %v", err)
}
responses[i] = resp
// Wait for the ack response
resp, err := c.ackStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive ack response: %v", err)
}

return responses, nil
return resp, nil
}

// PendingFn returns the number of pending data from the source.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sdkclient/source/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func TestAckFn(t *testing.T) {
assert.True(t, ackHandshakeResponse.GetHandshake().GetSot())

// Test AckFn
ack, err := testClient.AckFn(ctx, []*sourcepb.AckRequest{{}})
ack, err := testClient.AckFn(ctx, &sourcepb.AckRequest{})
assert.NoError(t, err)
assert.Equal(t, []*sourcepb.AckResponse{{}}, ack)
assert.Equal(t, &sourcepb.AckResponse{}, ack)
}

func TestPendingFn(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdkclient/source/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Client interface {
// ReadFn reads messages from the udsource.
ReadFn(ctx context.Context, req *sourcepb.ReadRequest, datumCh chan<- *sourcepb.ReadResponse) error
// AckFn acknowledges messages from the udsource.
AckFn(ctx context.Context, req []*sourcepb.AckRequest) ([]*sourcepb.AckResponse, error)
AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error)
// PendingFn returns the number of pending messages from the udsource.
PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error)
// PartitionsFn returns the list of partitions from the udsource.
Expand Down
14 changes: 8 additions & 6 deletions pkg/sinks/udsink/udsink_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,26 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
return errs
}
// Use ID to map the response messages, so that there's no strict requirement for the user-defined sink to return the response in order.
resMap := make(map[string]*sinkpb.SinkResponse)
resMap := make(map[string]*sinkpb.SinkResponse_Result)
for _, res := range responses {
resMap[res.Result.GetId()] = res
for _, result := range res.Results {
resMap[result.GetId()] = result
}
}
for i, m := range requests {
if r, existing := resMap[m.Request.GetId()]; !existing {
errs[i] = &NotFoundErr
} else {
if r.Result.GetStatus() == sinkpb.Status_FAILURE {
if r.Result.GetErrMsg() != "" {
if r.GetStatus() == sinkpb.Status_FAILURE {
if r.GetErrMsg() != "" {
errs[i] = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: r.Result.GetErrMsg(),
Message: r.GetErrMsg(),
}
} else {
errs[i] = &UnknownUDSinkErr
}
} else if r.Result.GetStatus() == sinkpb.Status_FALLBACK {
} else if r.GetStatus() == sinkpb.Status_FALLBACK {
errs[i] = &WriteToFallbackErr
} else {
errs[i] = nil
Expand Down
5 changes: 1 addition & 4 deletions pkg/sinks/udsink/udsink_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) {
mockSinkClient := sinkmock.NewMockSink_SinkFnClient(ctrl)
mockSinkClient.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{
Result: testResponseList[0],
}, nil)
mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{
Result: testResponseList[1],
Results: testResponseList,
}, nil)
mockSinkClient.EXPECT().Recv().Return(&sinkpb.SinkResponse{Status: &sinkpb.TransmissionStatus{
Eot: true,
Expand Down
14 changes: 5 additions & 9 deletions pkg/sources/udsource/grpc_udsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,12 @@ func (u *GRPCBasedUDSource) ApplyAckFn(ctx context.Context, offsets []isb.Offset
for i, offset := range offsets {
rOffsets[i] = ConvertToUserDefinedSourceOffset(offset)
}
ackRequests := make([]*sourcepb.AckRequest, len(rOffsets))
for i, offset := range rOffsets {
var r = &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offset: offset,
},
}
ackRequests[i] = r
var ackRequest = &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: rOffsets,
},
}
_, err := u.client.AckFn(ctx, ackRequests)
_, err := u.client.AckFn(ctx, ackRequest)
return err
}

Expand Down
17 changes: 5 additions & 12 deletions pkg/sources/udsource/grpc_udsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,14 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
mockClient.EXPECT().ReadFn(gomock.Any(), gomock.Any()).Return(nil, nil)
mockClient.EXPECT().AckFn(gomock.Any(), gomock.Any()).Return(mockAckClient, nil)

req1 := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offset: offset1,
},
}

req2 := &sourcepb.AckRequest{
req := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offset: offset2,
Offsets: []*sourcepb.Offset{offset1, offset2},
},
}

mockAckClient.EXPECT().Send(req1).Return(nil).Times(1)
mockAckClient.EXPECT().Send(req2).Return(nil).Times(1)
mockAckClient.EXPECT().Recv().Return(&sourcepb.AckResponse{}, nil).Times(2)
mockAckClient.EXPECT().Send(req).Return(nil).Times(1)
mockAckClient.EXPECT().Recv().Return(&sourcepb.AckResponse{}, nil).Times(1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -321,7 +314,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {

req1 := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offset: offset1,
Offsets: []*sourcepb.Offset{offset1, offset2},
},
}

Expand Down
4 changes: 2 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async-nats = "0.37.0"

[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9fb3c0ad0f5f43cc42b4919f849b7dcce9a91387" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "ddd879588e11455921f1ca958ea2b3c076689293" }

[build-dependencies]

Loading

0 comments on commit f6dbe86

Please sign in to comment.