Skip to content

Commit

Permalink
Adding flush to ouput.
Browse files Browse the repository at this point in the history
  • Loading branch information
rhysrevans3 committed Jul 1, 2024
1 parent 03fd968 commit f7149ed
Showing 1 changed file with 3 additions and 14 deletions.
17 changes: 3 additions & 14 deletions stac_generator/plugins/outputs/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,14 @@ def __init__(self, **kwargs):

self.producer = Producer(self.config)

def delivery_callback(err, msg):
if err:
print("ERROR: Message failed delivery: {}".format(err))
else:
print(
"Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(),
key=msg.key().decode("utf-8"),
value=msg.value().decode("utf-8"),
)
)

def export(self, data: dict, **kwargs) -> None:
"""
Post the message to the kafka server.
:param data: Data from extraction processes
:param kwargs: Not used
"""
key = message.get(self.key_term, None)
key = data.get(self.key_term, None)
message = json.dumps(data).encode("utf8")
self.producer.produce(self.topic, key, message, callback=self.delivery_callback)
self.producer.produce(self.topic, key=key, value=message)
self.producer.flush()

0 comments on commit f7149ed

Please sign in to comment.