diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java index 6a160a6..dd5bcb0 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java @@ -6,7 +6,6 @@ package io.debezium.server.eventhubs; import java.util.HashMap; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,8 +15,6 @@ import com.azure.messaging.eventhubs.models.CreateBatchOptions; import io.debezium.DebeziumException; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; public class BatchManager { private static final Logger LOGGER = LoggerFactory.getLogger(BatchManager.class); @@ -32,8 +29,6 @@ public class BatchManager { // Prepare CreateBatchOptions for N partitions private final HashMap batchOptions = new HashMap<>(); private final HashMap batches = new HashMap<>(); - private List> records; - private DebeziumEngine.RecordCommitter> committer; public BatchManager(EventHubProducerClient producer, String configurePartitionId, String configuredPartitionKey, Integer maxBatchSize) { @@ -43,11 +38,7 @@ public BatchManager(EventHubProducerClient producer, String configurePartitionId this.maxBatchSize = maxBatchSize; } - public void initializeBatch(List> records, - DebeziumEngine.RecordCommitter> committer) { - this.records = records; - this.committer = committer; - + public void initializeBatch() { if (!configuredPartitionId.isEmpty() || !configuredPartitionKey.isEmpty()) { CreateBatchOptions op = new CreateBatchOptions(); @@ -99,7 +90,7 @@ public void closeAndEmitBatches() { batches.forEach((partitionId, batch) -> { if (batch.getCount() > 0) { LOGGER.trace("Dispatching {} events.", batch.getCount()); - emitBatchToEventHub(records, committer, batch); + emitBatchToEventHub(batch); } }); } @@ -118,15 +109,14 @@ public void sendEventToPartitionId(EventData eventData, Integer recordIndex, Int LOGGER.debug("Maximum batch size reached, dispatching {} events.", batch.getCount()); // Max size reached, dispatch the batch to EventHub - emitBatchToEventHub(records, committer, batch); + emitBatchToEventHub(batch); // Renew the batch proxy so we can continue. batch = new EventDataBatchProxy(producer, batchOptions.get(partitionId)); batches.put(partitionId, batch); } } - private void emitBatchToEventHub(List> records, DebeziumEngine.RecordCommitter> committer, - EventDataBatchProxy batch) { + private void emitBatchToEventHub(EventDataBatchProxy batch) { final int batchEventSize = batch.getCount(); if (batchEventSize > 0) { try { diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java index e4aa228..99729ba 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java @@ -126,7 +126,7 @@ public void handleBatch(List> records, throws InterruptedException { LOGGER.trace("Event Hubs sink adapter processing change events"); - batchManager.initializeBatch(records, committer); + batchManager.initializeBatch(); for (int recordIndex = 0; recordIndex < records.size();) { int start = recordIndex;