-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-7032 Implement Retries for Amazon Kinesis Event Transmittion #43
DBZ-7032 Implement Retries for Amazon Kinesis Event Transmittion #43
Conversation
Add spaces on the conditions
170d82e
to
b610ffa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for you PR @ilyasahsan123 ! This needs few changes to make it working as you like.
@@ -86,6 +97,12 @@ void connect() { | |||
LOGGER.info("Using default KinesisClient '{}'", client); | |||
} | |||
|
|||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are no tests for this method, so no need to mark is @VisibleForTesting
and making public
@@ -86,6 +97,12 @@ void connect() { | |||
LOGGER.info("Using default KinesisClient '{}'", client); | |||
} | |||
|
|||
@VisibleForTesting | |||
void initWithConfig(Config config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is actually not called anywhere, so it won't do anything
@@ -101,6 +118,23 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt | |||
throws InterruptedException { | |||
for (ChangeEvent<Object, Object> record : records) { | |||
LOGGER.trace("Received event '{}'", record); | |||
|
|||
int attempts = 0; | |||
if (!recordSent(record)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need some while loop to retry sending the records when it fails, see e.g. https://github.com/debezium/debezium-server/blob/main/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java#L164
@@ -112,8 +146,10 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt | |||
.data(SdkBytes.fromByteArray(getBytes(rv))) | |||
.build(); | |||
client.putRecord(putRecord); | |||
committer.markProcessed(record); | |||
} catch (SdkClientException exception) { | |||
LOGGER.error("Failed to send record to {}:", record.destination(), exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to log the exception and re-throw. Either you should handle the exception and if you don't handle it, don't catch it a let the caller to handle it (and eventually log the exception)
1. Remove unused variables. 2. Remove unused function. 3. Enhance the function to handle batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, LGTM now @ilyasahsan123 !
@@ -55,6 +60,8 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu | |||
private String region; | |||
private Optional<String> endpointOverride; | |||
private Optional<String> credentialsProfile; | |||
private static final int DEFAULT_RETRIES = 5; | |||
private static final Long RETRY_INTERVAL = Integer.toUnsignedLong(1_000); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be interesting if the DEFAULT_RETRIES and RETRY_INTERVAL were config properties. Something like debezium.sink.kinesis.default.retries and debezium.sink.kinesis.retry.interval. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CMIIW] I am not sure about this; it's technically possible, but in my opinion, it's not necessary. If we take a look at other sinks:
- For HTTP, the retries are hardcoded to 5.
- For EventHub, the client uses the default retries, which is 9 times.
The exception is Pub/Sub, where the retry properties can be dynamically set via properties.
Do you have any comment @vjuranek?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's start with hard-coded values only and a separate Jira that would make it configurable for all sinks should be created to make it consistent.
@ilyasahsan123 the CI fails due to wrong formatting. Could you please take a look and fix it (I guess something like |
hi @vjuranek, thanks for the suggestion. I have formatted the code. Could you please take a look? thanks |
3783994
to
4727862
Compare
@ilyasahsan123 LGTM, apliied. Thanks! |
https://issues.redhat.com/browse/DBZ-7032