Skip to content

Commit

Permalink
Merge branch 'main' into r
Browse files Browse the repository at this point in the history
  • Loading branch information
KeranYang authored Oct 27, 2023
2 parents 7991c37 + 9050e75 commit f8f0f40
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 1 deletion.
119 changes: 119 additions & 0 deletions docs/user-guide/user-defined-functions/map/examples.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Map Examples

Please read [map](./map.md) to get the best out of these examples.

## Prerequisites

### Inter-Step Buffer Service (ISB Service)

#### What is ISB Service?
An Inter-Step Buffer Service is described by a [Custom Resource](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/), which is used to pass data between vertices of a numaflow pipeline.
Please refer to the doc [Intern-Step Buffer Service](../../../core-concepts/inter-step-buffer.md) for more information on ISB.


#### How to install the ISB Service

```shell
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/stable/examples/0-isbsvc-jetstream.yaml
```

The expected output of the above command is shown below:

```shell
$ kubectl get isbsvc

NAME TYPE PHASE MESSAGE AGE
default jetstream Running 3d19h

# Wait for pods to be ready
$ kubectl get pods

NAME READY STATUS RESTARTS AGE
isbsvc-default-js-0 3/3 Running 0 19s
isbsvc-default-js-1 3/3 Running 0 19s
isbsvc-default-js-2 3/3 Running 0 19s
```

---
**NOTE**

The Source used in the examples is an HTTP source producing messages with values 5 and 10 with event time
starting from 60000. Please refer to the doc [http source](../../sources/http.md) on how to use an HTTP
source.
An example will be as follows,

```sh
curl -kq -X POST -H "x-numaflow-event-time: 60000" -d "5" ${http-source-url}
curl -kq -X POST -H "x-numaflow-event-time: 60000" -d "10" ${http-source-url}
```
---

## Creating a simple Map pipeline

Now we will walk you through creating a map pipeline. In our example, this is called the `even-odd` pipeline, illustrated by the following diagram:

![Pipeline Diagram](../../../assets/even-odd-square.png)

There are five vertices in this example of a map pipeline. An [HTTP](../../sources/http.md) source vertex which serves an HTTP endpoint to receive numbers as source data, a [UDF](./map.md) vertex to tag the ingested numbers with the key `even` or `odd`, three [Log](../../sinks/log.md) sinks, one to print the `even` numbers, one to print the `odd` numbers, and the other one to print both the even and odd numbers.

Run the following command to create the `even-odd` pipeline.

```shell
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/main/examples/2-even-odd-pipeline.yaml
```

You may opt to view the list of pipelines you've created so far by running `kubectl get pipeline`. Otherwise, proceed to inspect the status of the pipeline, using `kubectl get pods`.

```shell
# Wait for pods to be ready
kubectl get pods

NAME READY STATUS RESTARTS AGE
even-odd-daemon-64d65c945d-vjs9f 1/1 Running 0 5m3s
even-odd-even-or-odd-0-pr4ze 2/2 Running 0 30s
even-odd-even-sink-0-unffo 1/1 Running 0 22s
even-odd-in-0-a7iyd 1/1 Running 0 5m3s
even-odd-number-sink-0-zmg2p 1/1 Running 0 7s
even-odd-odd-sink-0-2736r 1/1 Running 0 15s
isbsvc-default-js-0 3/3 Running 0 10m
isbsvc-default-js-1 3/3 Running 0 10m
isbsvc-default-js-2 3/3 Running 0 10m
```

Next, port-forward the HTTP endpoint, and make a `POST` request using `curl`. Remember to replace `xxxxx` with the appropriate pod names both here and in the next step.

```shell
kubectl port-forward even-odd-in-0-xxxx 8444:8443

# Post data to the HTTP endpoint
curl -kq -X POST -d "101" https://localhost:8444/vertices/in
curl -kq -X POST -d "102" https://localhost:8444/vertices/in
curl -kq -X POST -d "103" https://localhost:8444/vertices/in
curl -kq -X POST -d "104" https://localhost:8444/vertices/in
```

Now you can watch the log for the `even` and `odd` vertices by running the commands below.

```shell
# Watch the log for the even vertex
kubectl logs -f even-odd-even-sink-0-xxxxx
2022/09/07 22:29:40 (even-sink) 102
2022/09/07 22:29:40 (even-sink) 104

# Watch the log for the odd vertex
kubectl logs -f even-odd-odd-sink-0-xxxxx
2022/09/07 22:30:19 (odd-sink) 101
2022/09/07 22:30:19 (odd-sink) 103
```

View the UI for a pipeline at https://localhost:8443/.

![Numaflow UI](../../../assets/numaflow-ui-advanced-pipeline.png)

The source code of the `even-odd` [User Defined Function](../user-defined-functions.md) can be found [here](https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/even_odd). You also can replace the [Log](../../sinks/log.md) Sink with some other sinks like [Kafka](../../sinks/kafka.md) to forward the data to Kafka topics.

The pipeline can be deleted by

```shell
kubectl delete -f https://raw.githubusercontent.com/numaproj/numaflow/main/examples/2-even-odd-pipeline.yaml
```
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ nav:
- Overview: "user-guide/user-defined-functions/map/builtin-functions/README.md"
- Cat: "user-guide/user-defined-functions/map/builtin-functions/cat.md"
- Filter: "user-guide/user-defined-functions/map/builtin-functions/filter.md"
- Examples: "user-guide/user-defined-functions/map/examples.md"
- Reduce:
- Overview: "user-guide/user-defined-functions/reduce/reduce.md"
- Windowing:
Expand Down
5 changes: 4 additions & 1 deletion ui/src/components/pages/Pipeline/partials/Graph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,10 @@ export default function Graph(props: GraphProps) {
}}
>
<CircularProgress
sx={{ width: "1.25rem !important", height: "1.25rem !important" }}
sx={{
width: "1.25rem !important",
height: "1.25rem !important",
}}
/>{" "}
<Box
sx={{
Expand Down
153 changes: 153 additions & 0 deletions ui/src/utils/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ import {
findSuffix,
quantityToScalar,
getPodContainerUsePercentages,
timeAgo,
GetISBType,
DurationString,
a11yProps,
getBaseHref,
getAPIResponseError,
handleCopy,
} from "./index";
import { Pod, PodContainerSpec } from "../types/declarations/pods";

Expand Down Expand Up @@ -48,12 +55,158 @@ describe("index", () => {

it("quantityToScalar", () => {
expect(quantityToScalar("10")).toEqual(10);
expect(quantityToScalar("10n")).toEqual(1e-8);
expect(quantityToScalar("10u")).toEqual(0.00001);
expect(quantityToScalar("10m")).toEqual(0.01);
// @ts-ignore
expect(quantityToScalar("10k")).toEqual(10000n);
// @ts-ignore
expect(quantityToScalar("10M")).toEqual(10000000n);
// @ts-ignore
expect(quantityToScalar("10G")).toEqual(10000000000n);
// @ts-ignore
expect(quantityToScalar("10T")).toEqual(10000000000000n);
// @ts-ignore
expect(quantityToScalar("10P")).toEqual(10000000000000000n);
// @ts-ignore
expect(quantityToScalar("10E")).toEqual(10000000000000000000n);
// @ts-ignore
expect(quantityToScalar("10Ki")).toEqual(10240n);
// @ts-ignore
expect(quantityToScalar("10Mi")).toEqual(10485760n);
// @ts-ignore
expect(quantityToScalar("10Gi")).toEqual(10737418240n);
// @ts-ignore
expect(quantityToScalar("10Ti")).toEqual(10995116277760n);
// @ts-ignore
expect(quantityToScalar("10Pi")).toEqual(11258999068426240n);
// @ts-ignore
expect(quantityToScalar("10Ei")).toEqual(11529215046068469760n);
});

it("getPodContainerUsePercentages", () => {
expect(getPodContainerUsePercentages(pod, podDetail, "numa")).toEqual({
cpuPercent: undefined,
memoryPercent: undefined,
});
const containerMap = new Map<string, PodContainerSpec>();
containerMap.set("numa", {
name: "numa",
cpu: "6991490n",
cpuParsed: 0.00699149,
memory: "33724Ki",
memoryParsed: 34533376,
});
const containerSpecMap = new Map<string, PodContainerSpec>();
containerSpecMap.set("numa", {
name: "numa",
cpu: "100m",
cpuParsed: 0.1,
memory: "128Mi",
memoryParsed: 134217728,
});
const props = {
pod: {
name: "simple-pipeline-2-in-0-dpwxy",
containers: ["numa"],
containerSpecMap: containerSpecMap,
},
podDetails: {
name: "simple-pipeline-2-in-0-dpwxy",
containerMap: containerMap,
},
containerName: "numa",
};

expect(
getPodContainerUsePercentages(props.pod, props.podDetails, "numa")
).toEqual({
cpuPercent: 6.991490000000001,
memoryPercent: 25.7293701171875,
});
});
it("timeAgo", () => {
// flaky test
// expect(timeAgo(new Date().toISOString())).toEqual("Just now");
expect(timeAgo(new Date(Date.now() - 10000).toISOString())).toEqual(
"10 seconds ago"
);
expect(timeAgo(new Date(Date.now() + 1000).toISOString())).toEqual(
"1 seconds from now"
);
});

it("getISB", () => {
const isbSpec = {
jetstream: {
version: "latest",
replicas: 3,
persistence: {
volumeSize: "3Gi",
},
},
};
const isbSpecRedis = {
redis: {
version: "latest",
replicas: 3,
persistence: {
volumeSize: "3Gi",
},
},
};
expect(GetISBType(isbSpec)).toEqual("jetstream");
expect(GetISBType(isbSpecRedis)).toEqual("redis");
});

it("DurationString", () => {
expect(DurationString(1000)).toEqual("1sec 0ms");
expect(DurationString(500)).toEqual("500ms");
expect(DurationString(60000)).toEqual("1min 0sec");
expect(DurationString(60000 * 60)).toEqual("1hr 0min");
expect(DurationString(60000 * 60 * 24)).toEqual("1d 0hr");
expect(DurationString(60000 * 60 * 24 * 32)).toEqual("1mo 1d");
expect(DurationString(60000 * 60 * 24 * 366)).toEqual("1yr 0mo");
});

it("a11yProps", () => {
expect(a11yProps(2)).toEqual({
"aria-controls": "info-tabpanel-2",
id: "info-tab-2",
});
});

// test getBaseHref
it("getBaseHref", () => {
expect(getBaseHref()).toEqual("/");
});

// test getAPIResponseError
// it("getAPIResponseError", () => {
// const error: any = {
// response: {
// ok: true,
// status: 200,
// json: async () => ({
// errMsg: "error message",
// }),
// },
// };
// expect(getAPIResponseError(error)).toEqual("error message");
// });

const clipboard = {
writeText: jest.fn(),
};
Object.defineProperty(global.navigator, 'clipboard', {
value: clipboard,
});
// test handleCopy
it("handleCopy", () => {
const copy: any = {
src: "copy",
};

expect(handleCopy(copy)).toEqual(undefined);
});
});

0 comments on commit f8f0f40

Please sign in to comment.