From f7149ed3f4aacf2532906f90f2107552361d5fd5 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Mon, 1 Jul 2024 09:07:31 +0100 Subject: [PATCH] Adding flush to ouput. --- stac_generator/plugins/outputs/kafka.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/stac_generator/plugins/outputs/kafka.py b/stac_generator/plugins/outputs/kafka.py index 85dcf61..7c2ece9 100644 --- a/stac_generator/plugins/outputs/kafka.py +++ b/stac_generator/plugins/outputs/kafka.py @@ -61,18 +61,6 @@ def __init__(self, **kwargs): self.producer = Producer(self.config) - def delivery_callback(err, msg): - if err: - print("ERROR: Message failed delivery: {}".format(err)) - else: - print( - "Produced event to topic {topic}: key = {key:12} value = {value:12}".format( - topic=msg.topic(), - key=msg.key().decode("utf-8"), - value=msg.value().decode("utf-8"), - ) - ) - def export(self, data: dict, **kwargs) -> None: """ Post the message to the kafka server. @@ -80,6 +68,7 @@ def export(self, data: dict, **kwargs) -> None: :param data: Data from extraction processes :param kwargs: Not used """ - key = message.get(self.key_term, None) + key = data.get(self.key_term, None) message = json.dumps(data).encode("utf8") - self.producer.produce(self.topic, key, message, callback=self.delivery_callback) + self.producer.produce(self.topic, key=key, value=message) + self.producer.flush()