Skip to content

Commit

Permalink
Merge branch 'pipeline' into release/develop
Browse files Browse the repository at this point in the history
  • Loading branch information
nefro85 committed Nov 22, 2023
2 parents b2f6f44 + 9f97f2e commit 0d160f9
Show file tree
Hide file tree
Showing 62 changed files with 2,167 additions and 155 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
branches: [ main, release/develop ]

jobs:
build:
Expand All @@ -23,4 +23,4 @@ jobs:
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build with Gradle
run: ./gradlew --console=plain build
run: ./gradlew -x shadowJar --console=plain build
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Doer kfeed - simple ingest" type="Application" factoryName="Application">
<configuration default="false" name="Doer pipeline grpc service" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="io.github.s7i.doer.Doer" />
<module name="doer.main" />
<option name="PROGRAM_PARAMETERS" value="kfeed -y src/test/resources/simple-ingest-dry-run.yml" />
<option name="PROGRAM_PARAMETERS" value="pipeline" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="io.github.s7i.doer.*" />
<option name="PATTERN" value="io.github.s7i.doer.command.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
Expand Down
16 changes: 16 additions & 0 deletions .run/Doer pipeline ingest.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Doer pipeline ingest" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="io.github.s7i.doer.Doer" />
<module name="doer.main" />
<option name="PROGRAM_PARAMETERS" value="./docs/pipeline/grpc-backend/ingest.yml" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="io.github.s7i.doer.command.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
16 changes: 16 additions & 0 deletions .run/Doer pipeline sink.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Doer pipeline sink" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="io.github.s7i.doer.Doer" />
<module name="doer.main" />
<option name="PROGRAM_PARAMETERS" value="./docs/pipeline/grpc-backend/console-sink.yml" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="io.github.s7i.doer.command.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ application {
mainClass.set('io.github.s7i.doer.Doer')
}

shadowJar {
// https://github.com/grpc/grpc-java/issues/5493#issuecomment-478500418
mergeServiceFiles()
}

tasks.withType(JavaCompile) {
options.compilerArgs << '-Xlint:unchecked'
options.deprecation = true
Expand Down
16 changes: 16 additions & 0 deletions docs/pipeline/grpc-backend/console-sink.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: v1
kind: sink
params:
doer.pipeline.backend: grpc
doer.pipeline.backend.target: localhost:6565
doer.pipeline.sink: true
doer.pipeline.bind: from-ingest-to-sink
doer.pipeline.id: id-sink1
mykaf: |+
bootstrap.servers=localhost:9092
client.id=doer.sink
spec:
- output: doer://console
# enabled: false
- output: kafka://mykaf/test-sink123
# enable: false
11 changes: 11 additions & 0 deletions docs/pipeline/grpc-backend/ingest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: v1
kind: ingest
param:
doer.output: pipeline://grpc
doer.pipeline.backend: grpc
doer.pipeline.backend.target: localhost:6565
ingest:
- record: record 1 ${date:yyyy-MM-dd}T${date:HH:mm:ss}+00:00
- record: record 2 ${date:yyyy-MM-dd}T${date:HH:mm:ss}+00:00
- record: record 3 ${date:yyyy-MM-dd}T${date:HH:mm:ss}+00:00
key: key-record 3
17 changes: 17 additions & 0 deletions docs/pipeline/grpc-backend/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Running from the console

1. Sink:

```bash
doer console-sink.yml

```

2. Service:
```bash
doer pipeline
```
3. Records ingest
```bash
doer ingest.yml
```
61 changes: 61 additions & 0 deletions docs/pipeline/pipeline-record.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/bin/bash

PROTO_SET=../../proto-doer/build/descriptors/main.desc
set -e

doer misc pipeline record --help

print_color() {
local TEXT=$1
local CLR_NAME=$2
local RED="\e[31m"
local GREEN="\e[32m"
local ENDCOLOR="\e[0m"
local COLOR=${!CLR_NAME}
echo -e "${COLOR}${TEXT}${ENDCOLOR}"
}

green() {
print_color $1 "GREEN"
}

make_record() {
doer misc pipeline record \
--meta test.data=true \
--key my-key \
--data "some test data"
}

make_record_pl() {
doer misc pipeline record \
--meta test.data=true \
--key my-key \
--data "some test data" \
--pipeline-load
}

green "[HEX:record]"
make_record | xxd

# https://github.com/protocolbuffers/protoscope
green "[protoscope]"
make_record | protoscope

green "[protoscope:print-field-names]"

make_record | protoscope \
-descriptor-set ${PROTO_SET} \
-message-type io.github.s7i.doer.proto.Record \
-print-field-names

green "[HEX:pipeline-load]"
make_record_pl | xxd

green "[protoscope:pipeline-load]"
make_record_pl | protoscope

green "[protoscope:pipeline-load:print-field-names]"
make_record_pl | protoscope \
-descriptor-set ${PROTO_SET} \
-message-type io.github.s7i.doer.pipeline.proto.PipelineLoad \
-print-field-names
39 changes: 39 additions & 0 deletions docs/pipeline/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Pipelines

Sample of pipeline specification:

```yaml
# Pipeline manifest draft
version: v1
kind: pipeline
params:
doer.pipeline.backend: kafka
doer.pipeline.backend.kafka.properties: |+
bootstrap.servers=kafka:9093
spec:
- pipeline: a | b
- name: a
description: "a" as instance of source
manifest-file: source.yml
- name: b
description: "b" as instance of sink
manifest-file: sink.yml
---

version: v2
kind: pipeline
params:
doer.pipeline.backend: kafka
doer.pipeline.backend.kafka.properties: |+
bootstrap.servers=kafka:9093
spec:
pipeline:
- flow: a | b
elements:
- name: a
description: "a" as instance of source
manifest-file: source.yml
- name: b
description: "b" as instance of sink
manifest-file: sink.yml
```
13 changes: 13 additions & 0 deletions proto-doer/src/main/proto/doer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.github.s7i.doer.proto;
option java_multiple_files = true;

import "google/protobuf/wrappers.proto";
import "google/protobuf/empty.proto";

message DoerVersion {
string name = 1;
Expand All @@ -17,3 +18,15 @@ message Record {
google.protobuf.BytesValue data = 4;
}

service DoerService {

rpc version (google.protobuf.Empty) returns (Sone);
rpc exec(Sone) returns (Sone);
rpc pulse(google.protobuf.Empty) returns (stream Sone);

}

message Sone {
string s = 1;
}

15 changes: 15 additions & 0 deletions proto-doer/src/main/proto/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,22 @@ import "google/protobuf/any.proto";
option java_multiple_files = true;

service PipelineService {
rpc exchangeMeta(MetaOp) returns (MetaOp);
rpc publish(PipelinePublishRequest) returns (PipelinePublishResponse);
rpc subscribe(MetaOp) returns (stream PipelineLoad);
}

message MetaOp {
message Request {
string name = 1;
map<string, string> parameters = 2;
}
message Response {
string status = 1;
map<string, string> parameters = 2;
}
Request request = 1;
Response response = 2;
}

message PipelinePublishRequest {
Expand Down
13 changes: 0 additions & 13 deletions spec/manifest/pipeline.yml

This file was deleted.

54 changes: 42 additions & 12 deletions src/main/java/io/github/s7i/doer/Context.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package io.github.s7i.doer;

import static io.github.s7i.doer.Doer.console;
import static java.util.Objects.requireNonNull;

import io.github.s7i.doer.domain.output.*;
import io.github.s7i.doer.domain.output.Output;
import io.github.s7i.doer.domain.output.OutputBuilder;
import io.github.s7i.doer.domain.output.OutputFactory;
import io.github.s7i.doer.domain.output.OutputProvider;
import io.github.s7i.doer.pipeline.Pipeline;
import io.github.s7i.doer.util.ParamFlagExtractor;
import io.github.s7i.doer.util.PropertyResolver;
import io.github.s7i.doer.util.QuitWatcher;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import lombok.Builder;
import lombok.Builder.Default;
import lombok.Getter;

import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static io.github.s7i.doer.Doer.console;
import static java.util.Objects.nonNull;
import static java.util.Objects.requireNonNull;

public interface Context extends ParamFlagExtractor {

@Builder
Expand All @@ -28,28 +36,37 @@ class InitialParameters {
class Initializer {

static {
Runtime.getRuntime().addShutdownHook(new Thread(Initializer::shutdown, "shutdown"));
Runtime.getRuntime().addShutdownHook(new Thread(Globals.INSTANCE::stopAll, "shutdown"));
}

public Initializer(InitialParameters parameters) {
Pipeline.initFrom(parameters::getParams);
var scope = Globals.INSTANCE.getScope();
scope.setRoot(parameters::getWorkDir);
scope.setParams(parameters::getParams);

new QuitWatcher().watchForQuit(() -> System.exit(Doer.EC_QUIT));
}

private static void shutdown() {
console().info("Init shutdown procedure...");
Globals.INSTANCE.stopHooks.forEach(Runnable::run);
console().info("Shutdown completed.");
public Context context() {
return Globals.INSTANCE;
}
}

default void addStopHook(Runnable runnable) {
Globals.INSTANCE.stopHooks.add(runnable);
}

default void stopAll() {
console().info("Init shutdown procedure...");
Globals.INSTANCE.stopHooks.forEach(Runnable::run);
console().info("Shutdown completed.");
}

default void stopAllSilent() {
Globals.INSTANCE.stopHooks.forEach(Runnable::run);
}

default OutputFactory getOutputFactory() {
return Globals.INSTANCE.getScope().getOutputFactory();
}
Expand All @@ -71,4 +88,17 @@ default Map<String, String> getParams() {
default PropertyResolver getPropertyResolver() {
return new PropertyResolver(getParams());
}

default Optional<Pipeline> lookupPipeline() {
var pipeline = Globals.INSTANCE.pipeline;
if (nonNull(pipeline) && pipeline.isEnabled()) {
return Optional.of(pipeline);
}
return Optional.empty();
}

default void shareParams(Map<String, String> otherParams) {
var copy = new HashMap<>(otherParams);
Globals.INSTANCE.getScope().setParams(() -> copy);
}
}
Loading

0 comments on commit 0d160f9

Please sign in to comment.