Skip to content

Commit

Permalink
DBZ-7244 Fix potential regression
Browse files Browse the repository at this point in the history
Seems like debezium/debezium-server#51 introduced a potential regression, where if the sink is configured with a custom producer, it is not wrapped in a BatchManager and batchManager will be null when handleBatch is invoked.
  • Loading branch information
aleksandervalle committed Dec 11, 2023
1 parent 77af1c3 commit a24d180
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,25 @@ public class EventHubsChangeConsumer extends BaseChangeConsumer

@PostConstruct
void connect() {
final Config config = ConfigProvider.getConfig();

// optional config
maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0);
configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse("");
configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse("");

if (customProducer.isResolvable()) {
producer = customProducer.get();
batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, maxBatchSize);
LOGGER.info("Obtained custom configured Event Hubs client for namespace '{}'",
customProducer.get().getFullyQualifiedNamespace());
return;
}

final Config config = ConfigProvider.getConfig();
// required config
connectionString = config.getValue(PROP_CONNECTION_STRING_NAME, String.class);
eventHubName = config.getValue(PROP_EVENTHUB_NAME, String.class);

// optional config
maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0);
configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse("");
configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse("");

String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, eventHubName);

try {
Expand Down

0 comments on commit a24d180

Please sign in to comment.