Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement reduce stream sdk #91

Merged
merged 11 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@
</to>
</configuration>
</execution>
<execution>
<id>reduce-stream-sum</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory
</mainClass>
</container>
<to>
<image>numaflow-java-examples/reduce-stream-sum</image>
</to>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.Server;
import io.numaproj.numaflow.reducestreamer.user.ReduceStreamerFactory;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SumFactory extends ReduceStreamerFactory<SumFunction> {

KeranYang marked this conversation as resolved.
Show resolved Hide resolved
public static void main(String[] args) throws Exception {
log.info("sum udf was invoked");
new Server(new SumFactory()).start();
}

@Override
public SumFunction createReduceStreamer() {
return new SumFunction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.user.ReduceStreamer;
import lombok.extern.slf4j.Slf4j;

/**
* SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys
* and outputs the sum when the sum is greater than 100.
* When the input stream closes, the function outputs the sum no matter what value it holds.
*/
@Slf4j
public class SumFunction extends ReduceStreamer {

private int sum = 0;

@Override
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
if (sum >= 100) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
sum = 0;
}
}

@Override
public void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
Metadata md) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.numaproj.numaflow.reducestreamer;
whynowy marked this conversation as resolved.
Show resolved Hide resolved

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorEOFResponse is to store the EOF signal from a ReduceStreamerActor.
* ReduceStreamerActor sends it back to the supervisor actor to indicate that
* the streamer actor itself has finished processing the data and is ready to be
* released.
*/
@Getter
@AllArgsConstructor
class ActorEOFResponse {
ReduceOuterClass.ReduceResponse response;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorRequest is a wrapper of the gRpc input request.
* It is constructed by the service when service receives an input request and then sent to
* the supervisor actor.
*/
@Getter
@AllArgsConstructor
class ActorRequest {
ReduceOuterClass.ReduceRequest request;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getRequest().getPayload().getKeysList().toArray(new String[0]));
}

public String[] getKeySet() {
return this.getRequest().getPayload().getKeysList().toArray(new String[0]);
}
}
13 changes: 13 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.numaproj.numaflow.reducestreamer;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reducestream.sock";

public static final String EOF = "EOF";

public static final String SUCCESS = "SUCCESS";

public static final String DELIMITER = ":";
}
26 changes: 26 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.info.ServerInfoAccessor;
import lombok.Builder;
import lombok.Getter;

/**
* GRPCConfig is used to provide configurations for gRPC server.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

/**
* Static method to create default GRPCConfig.
*/
static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.AllDeadLetters;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Shutdown actor, listens to exceptions and handles shutdown.
*/
@Slf4j
@AllArgsConstructor
class ReduceShutdownActor extends AbstractActor {
private final CompletableFuture<Void> failureFuture;

public static Props props(
CompletableFuture<Void> failureFuture) {
return Props.create(ReduceShutdownActor.class, failureFuture);
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
failureFuture.completeExceptionally(reason);
}

@Override
public Receive createReceive() {
return ReceiveBuilder
.create()
.match(Throwable.class, this::shutdown)
.match(String.class, this::completedSuccessfully)
.match(AllDeadLetters.class, this::handleDeadLetters)
.build();
}

/*
complete the future with exception so that the exception will be thrown
indicate that same to response observer.
*/
private void shutdown(Throwable throwable) {
log.debug("got a shut down exception");
failureFuture.completeExceptionally(throwable);
}

// if there are no exceptions, complete the future without exception.
private void completedSuccessfully(String eof) {
log.debug("completed successfully of shutdown executed");
failureFuture.complete(null);
// if all the actors completed successfully, we can stop the shutdown actor.
getContext().getSystem().stop(getSelf());
}

// if we see dead letters, we need to stop the execution and exit
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
failureFuture.completeExceptionally(new Throwable("dead letters"));
getContext().getSystem().stop(getSelf());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.HandlerDatum;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserverImpl;
import io.numaproj.numaflow.reducestreamer.user.ReduceStreamer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
* Reduce stream actor invokes user defined functions to handle reduce request.
* When receiving an input request, it invokes the processMessage to handle the datum.
* When receiving an EOF signal from the supervisor, it invokes the handleEndOfStream to execute
* the user-defined end of stream processing logics.
*/
@Slf4j
@AllArgsConstructor
public class ReduceStreamerActor extends AbstractActor {
private String[] keys;
private Metadata md;
private ReduceStreamer groupBy;
private OutputStreamObserver outputStream;

public static Props props(
String[] keys, Metadata md, ReduceStreamer groupBy,
StreamObserver<ReduceOuterClass.ReduceResponse> responseStreamObserver) {
return Props.create(
ReduceStreamerActor.class,
keys,
md,
groupBy,
new OutputStreamObserverImpl(md, responseStreamObserver));
}

@Override
public Receive createReceive() {
return ReceiveBuilder
.create()
.match(HandlerDatum.class, this::invokeHandler)
.match(String.class, this::sendEOF)
.build();
}

private void invokeHandler(HandlerDatum handlerDatum) {
this.groupBy.processMessage(keys, handlerDatum, outputStream, md);
}

private void sendEOF(String EOF) {
this.groupBy.handleEndOfStream(keys, outputStream, md);
getSender().tell(buildEOFResponse(), getSelf());
}

private ActorEOFResponse buildEOFResponse() {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(true);
// set a dummy result with the keys.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorEOFResponse(responseBuilder.build());
}
}
Loading
Loading