-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmy_streaming_synth_pipeline.py
28 lines (19 loc) · 1 KB
/
my_streaming_synth_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import argparse
import logging
import apache_beam as beam
from apache_beam import PCollection
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from mydofns.synthetic_sdfn_streaming import GeneratePartitionsDoFn, MyPartition, ProcessPartitionsSplittableDoFn
def run_pipeline_streaming(beam_options):
pipeline_options: PipelineOptions = PipelineOptions(beam_options)
standard_options = pipeline_options.view_as(StandardOptions)
standard_options.streaming = True
with beam.pipeline.Pipeline(options=pipeline_options) as p:
partitions: PCollection[MyPartition] = p | beam.Create([0]) | beam.ParDo(GeneratePartitionsDoFn())
proc_chunks = partitions | beam.ParDo(ProcessPartitionsSplittableDoFn())
proc_chunks | beam.Map(logging.info)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
my_args, beam_args = parser.parse_known_args()
run_pipeline_streaming(beam_options=beam_args)