Skip to content

Commit

Permalink
DBZ-8563: Add support for setting BatcherBuilder in Pulsar Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
iamseki committed Jan 10, 2025
1 parent 18d319f commit 5ed3983
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;

import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -73,6 +74,9 @@ public interface ProducerBuilder {
@ConfigProperty(name = PROP_PREFIX + "timeout", defaultValue = "0")
Integer timeout;

@ConfigProperty(name = PROP_PRODUCER_PREFIX + "batcherBuilder", defaultValue = "DEFAULT")
String batcherBuilderConfig;

@PostConstruct
void connect() {
final Config config = ConfigProvider.getConfig();
Expand Down Expand Up @@ -112,12 +116,14 @@ private Producer<?> createProducer(String topicName, Object value) {
return pulsarClient.newProducer(Schema.STRING)
.loadConf(producerConfig)
.topic(topicFullName)
.batcherBuilder(getBatcherBuilder(batcherBuilderConfig))
.create();
}
else {
return pulsarClient.newProducer()
.loadConf(producerConfig)
.topic(topicFullName)
.batcherBuilder(getBatcherBuilder(batcherBuilderConfig))
.create();
}
}
Expand All @@ -126,6 +132,16 @@ private Producer<?> createProducer(String topicName, Object value) {
}
}

private BatcherBuilder getBatcherBuilder(String configValue) {
switch (configValue) {
case "KEY_BASED":
return BatcherBuilder.KEY_BASED;
case "DEFAULT":
default:
return BatcherBuilder.DEFAULT;
}
}

@SuppressWarnings("unchecked")
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
Expand Down

0 comments on commit 5ed3983

Please sign in to comment.