-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexample.py
54 lines (40 loc) · 1.16 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import random
from typing import Any
import sys
import faust
from faust.serializers.codecs import codecs
from faust.types import ProcessingGuarantee
from mode import CrashingSupervisor
import os
brokers = "127.0.0.1:9094"
broker_list = list(map(lambda x: "kafka://" + str(x), brokers.split(",")))
settings = faust.Settings(
id="test-faust-config",
processing_guarantee=ProcessingGuarantee.EXACTLY_ONCE,
agent_supervisor=CrashingSupervisor,
broker=broker_list,
broker_commit_interval=0.1,
topic_disable_leader=True
)
app = faust.App(id="test-faust-config")
settings.web_port = random.randint(6066,7000)
app.conf = settings
topic_input = app.topic(
"fake-messages",
key_serializer=codecs["raw"],
key_type=bytes,
value_serializer=codecs["raw"],
)
topic_output = app.topic(
"processed-messages",
key_serializer=codecs["raw"],
key_type=bytes,
value_serializer=codecs["raw"],
)
@app.agent(topic_input)
async def processor(stream: faust.Stream[Any]):
async for key, value in stream.items():
#print(key)
await topic_output.send(key=key, value=value)
sys.argv = ["", "worker", "-l", "info"]
app.main()