+ * The isLast attribute indicates whether the response is globally the last one to be sent to + * the output gRPC stream, if set to true, it means the response is the very last response among + * all key sets. When output stream actor receives an isLast response, it sends the response and immediately + * closes the output stream. + */ +@Getter +@Setter +@AllArgsConstructor +class ActorResponse { + ReduceOuterClass.ReduceResponse response; + boolean isLast; + + // TODO - do we need to include window information in the id? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. + // we will revisit this one later. + public String getActorUniqueIdentifier() { + return String.join( + Constants.DELIMITER, + this.getResponse().getResult().getKeysList().toArray(new String[0])); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java new file mode 100644 index 00000000..16746dab --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java @@ -0,0 +1,13 @@ +package io.numaproj.numaflow.reducestreamer; + +class Constants { + public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; + + public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reducestream.sock"; + + public static final String EOF = "EOF"; + + public static final String SUCCESS = "SUCCESS"; + + public static final String DELIMITER = ":"; +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java new file mode 100644 index 00000000..5f5a1137 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java @@ -0,0 +1,26 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.info.ServerInfoAccessor; +import lombok.Builder; +import lombok.Getter; + +/** + * GRPCConfig is used to provide configurations for gRPC server. + */ +@Getter +@Builder(builderMethodName = "newBuilder") +public class GRPCConfig { + private String socketPath; + private int maxMessageSize; + private String infoFilePath; + + /** + * Static method to create default GRPCConfig. + */ + static GRPCConfig defaultGrpcConfig() { + return GRPCConfig.newBuilder() + .infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH) + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java new file mode 100644 index 00000000..a36df0ce --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java @@ -0,0 +1,28 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reducestreamer.model.Datum; +import lombok.AllArgsConstructor; + +import java.time.Instant; + +@AllArgsConstructor +class HandlerDatum implements Datum { + private byte[] value; + private Instant watermark; + private Instant eventTime; + + @Override + public Instant getWatermark() { + return this.watermark; + } + + @Override + public byte[] getValue() { + return this.value; + } + + @Override + public Instant getEventTime() { + return this.eventTime; + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java b/src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java new file mode 100644 index 00000000..a4f75369 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java @@ -0,0 +1,22 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reducestreamer.model.IntervalWindow; +import lombok.AllArgsConstructor; + +import java.time.Instant; + +@AllArgsConstructor +class IntervalWindowImpl implements IntervalWindow { + private final Instant startTime; + private final Instant endTime; + + @Override + public Instant getStartTime() { + return this.startTime; + } + + @Override + public Instant getEndTime() { + return this.endTime; + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java b/src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java new file mode 100644 index 00000000..2e8f7da9 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java @@ -0,0 +1,15 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reducestreamer.model.IntervalWindow; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import lombok.AllArgsConstructor; + +@AllArgsConstructor +class MetadataImpl implements Metadata { + private final IntervalWindow intervalWindow; + + @Override + public IntervalWindow getIntervalWindow() { + return intervalWindow; + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java new file mode 100644 index 00000000..afa4e724 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java @@ -0,0 +1,48 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * Output actor is a wrapper around the gRPC output stream. + * It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream. + * ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream. + *
+ * More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
+ */
+@Slf4j
+@AllArgsConstructor
+class OutputActor extends AbstractActor {
+ StreamObserver