Skip to content

Commit

Permalink
build: fix kafka connector integration test (#2898)
Browse files Browse the repository at this point in the history
  • Loading branch information
chillleader authored Jul 23, 2024
1 parent 591c67c commit 305a07c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ public void startConsumer() {
.exceptionally(
(e) -> {
shouldLoop = false;
return null;
throw new RuntimeException(
"Consumer loop failed, retries exhausted: " + e.getMessage(), e);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.fasterxml.jackson.databind.node.ObjectNode;
import dev.failsafe.RetryPolicy;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.kafka.inbound.KafkaConnectorConsumer;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.junit.ClassRule;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -97,7 +99,8 @@ private static void createTopics(String... topics) {
.collect(Collectors.toList());
try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, getKafkaBrokers()))) {
admin.createTopics(newTopics);
admin.createPartitions(Map.of(TOPIC, NewPartitions.increaseTo(2)));
Arrays.stream(topics)
.forEach(topic -> admin.createPartitions(Map.of(topic, NewPartitions.increaseTo(2))));
}
}

Expand Down Expand Up @@ -197,12 +200,13 @@ void setInvalidOffsetForInboundConnectorWhenAutoOffsetResetIsNone() {
.properties(kafkaConnectorProperties)
.definition(InboundConnectorDefinitionBuilder.create().build())
.build();
KafkaExecutable executable = new KafkaExecutable();
KafkaExecutable executable =
new KafkaExecutable(KafkaConsumer::new, RetryPolicy.builder().withMaxAttempts(1).build());

// When
OffsetOutOfRangeException thrown =
Exception thrown =
assertThrows(
OffsetOutOfRangeException.class,
Exception.class,
() -> {
try {
executable.activate(context);
Expand All @@ -213,6 +217,8 @@ void setInvalidOffsetForInboundConnectorWhenAutoOffsetResetIsNone() {
},
"OffsetOutOfRangeException was expected");

assertThat(thrown.getCause()).isInstanceOf(OffsetOutOfRangeException.class);

// Then we except exception with message
assertThat(thrown.getMessage()).contains("Fetch position FetchPosition");
assertThat(thrown.getMessage()).contains("is out of range for partition " + TOPIC + "-");
Expand Down Expand Up @@ -448,7 +454,7 @@ void consumeMessageWithHeaders() {
kafkaTopic,
null,
null,
List.of(0L, 0L),
null,
KafkaConnectorProperties.AutoOffsetReset.EARLIEST,
null);

Expand All @@ -469,8 +475,8 @@ void consumeMessageWithHeaders() {
context.getCorrelations().stream()
.filter(
m -> {
var messge = (KafkaInboundMessage) m;
return !(messge.getValue() instanceof String);
var msg = (KafkaInboundMessage) m;
return !(msg.getValue() instanceof String);
})
.findFirst()
.orElse(null);
Expand Down

0 comments on commit 305a07c

Please sign in to comment.