From a47d8f7bcc59cbf06b24d051184c7b1b8ea3058d Mon Sep 17 00:00:00 2001 From: ahazourli Date: Sun, 19 Nov 2023 22:40:41 +0100 Subject: [PATCH 1/3] DBZ-6703 Add RabbitMQ Native Stream using Streams API --- .../io/debezium/server/TestConfigSource.java | 11 +- debezium-server-rabbitmq/pom.xml | 6 +- .../RabbitMqStreamNativeChangeConsumer.java | 159 ++++++++++++++++++ .../server/rabbitmq/RabbitMqContainer.java | 11 +- .../server/rabbitmq/RabbitMqStreamIT.java | 139 +++++++++++++++ ...tMqStreamTestResourceLifecycleManager.java | 65 +++++++ .../rabbitmq/RabbitMqTestConfigSource.java | 14 +- .../RabbitMqTestResourceLifecycleManager.java | 15 +- 8 files changed, 404 insertions(+), 16 deletions(-) create mode 100644 debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java create mode 100644 debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java create mode 100644 debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java diff --git a/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java b/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java index eff84a6c..49f16424 100644 --- a/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java +++ b/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java @@ -5,17 +5,16 @@ */ package io.debezium.server; +import io.debezium.data.Json; +import io.debezium.util.Testing; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.eclipse.microprofile.config.spi.ConfigSource; + import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; -import org.eclipse.microprofile.config.spi.ConfigSource; - -import io.debezium.data.Json; -import io.debezium.util.Testing; - /** * A config source used during tests. Amended/overridden by values exposed from test lifecycle listeners. */ diff --git a/debezium-server-rabbitmq/pom.xml b/debezium-server-rabbitmq/pom.xml index 22ff38d3..0891dfcd 100644 --- a/debezium-server-rabbitmq/pom.xml +++ b/debezium-server-rabbitmq/pom.xml @@ -23,7 +23,11 @@ amqp-client 5.16.0 - + + com.rabbitmq + stream-client + 0.14.0 + io.quarkus diff --git a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java new file mode 100644 index 00000000..2451d8eb --- /dev/null +++ b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java @@ -0,0 +1,159 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.rabbitmq; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ConnectionFactoryConfigurator; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.StreamException; +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.engine.Header; +import io.debezium.server.BaseChangeConsumer; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Named; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Implementation of the consumer that delivers the messages into RabbitMQ Stream destination. + * + * @author Olivier Boudet + * + */ +@Named("rabbitmqstream") +@Dependent +public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqStreamNativeChangeConsumer.class); + + private static final String PROP_PREFIX = "debezium.sink.rabbitmqstream."; + private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection."; + private static final String STREAM_NAME = PROP_PREFIX + "stream"; + private String stream; + + /** + * When true, the routing key is calculated from topic name using stream name mapper. + * When false the routingKey value or empty string is used. + */ + + @ConfigProperty(name = PROP_PREFIX + "deliveryMode", defaultValue = "2") + int deliveryMode; + + @ConfigProperty(name = PROP_PREFIX + "ackTimeout", defaultValue = "30000") + int ackTimeout; + + @ConfigProperty(name = PROP_PREFIX + "null.value", defaultValue = "default") + String nullValue; + + Environment environment; + + Producer producer; + + @PostConstruct + void connect() { + + final Config config = ConfigProvider.getConfig(); + ConnectionFactory factory = new ConnectionFactory(); + + Map configProperties = getConfigSubset(config, PROP_CONNECTION_PREFIX).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + entry -> (entry.getValue() == null) ? null : entry.getValue().toString())); + + ConnectionFactoryConfigurator.load(factory, configProperties, ""); + LOGGER.info("Using connection to {}:{}", factory.getHost(), factory.getPort()); + + try { + environment = Environment.builder() + .host(factory.getHost()) + .port(factory.getPort()).build(); + + stream = config.getValue(STREAM_NAME, String.class); + + LOGGER.info("Creating stream '{}'", stream); + + environment.streamCreator().stream(stream).create(); + + producer = environment.producerBuilder() + .confirmTimeout(Duration.ofSeconds(ackTimeout)) + .stream(stream) + .build(); + + } + catch (StreamException | IllegalArgumentException e) { + throw new DebeziumException(e); + } + } + + @PreDestroy + void close() { + + try { + if (producer != null) { + producer.close(); + } + if (environment != null) { + environment.close(); + } + } + catch (Exception e) { + throw new DebeziumException(e); + } + + } + + @Override + public void handleBatch(List> records, RecordCommitter> committer) + throws InterruptedException { + for (ChangeEvent record : records) { + LOGGER.trace("Received event '{}'", record); + try { + final Object value = (record.value() != null) ? record.value() : nullValue; + producer.send( + producer.messageBuilder().addData(getBytes(value)).build(), + confirmationStatus -> { + }); + + } + catch (StreamException e) { + throw new DebeziumException(e); + } + + committer.markProcessed(record); + } + + LOGGER.trace("Sent messages"); + committer.markBatchFinished(); + } + + @Override + public boolean supportsTombstoneEvents() { + return false; + } + + private Map convertRabbitMqHeaders(ChangeEvent record) { + List> headers = record.headers(); + Map rabbitMqHeaders = new HashMap<>(); + for (Header header : headers) { + rabbitMqHeaders.put(header.getKey(), header.getValue()); + } + return rabbitMqHeaders; + } +} diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java index 8a0f712f..b5612574 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java @@ -6,18 +6,25 @@ package io.debezium.server.rabbitmq; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; +import java.time.Duration; + /** * RabbitMQ container */ public class RabbitMqContainer extends GenericContainer { - private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("rabbitmq:3.10.19-management"); + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("rabbitmq:3.12.9-management"); public static final int BROKER_PORT = 5672; + public static final int STREAM_PORT = 5552; public RabbitMqContainer() { super(DEFAULT_IMAGE_NAME); - withExposedPorts(BROKER_PORT, 15672); + withExposedPorts(BROKER_PORT, STREAM_PORT, 15672); + + this.waitStrategy = + Wait.forLogMessage(".*Server startup complete.*", 1).withStartupTimeout(Duration.ofSeconds(60)); } } diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java new file mode 100644 index 00000000..de3ab4f0 --- /dev/null +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.rabbitmq; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.OffsetSpecification; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.server.TestConfigSource; +import io.debezium.server.events.ConnectorCompletedEvent; +import io.debezium.server.events.ConnectorStartedEvent; +import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; +import io.debezium.util.Testing; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import jakarta.enterprise.event.Observes; +import org.awaitility.Awaitility; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; + +@QuarkusTest +@QuarkusTestResource(PostgresTestResourceLifecycleManager.class) +@QuarkusTestResource(value = RabbitMqStreamTestResourceLifecycleManager.class, parallel = true) +@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmqstream") +public class RabbitMqStreamIT { + + private static final int MESSAGE_COUNT = 4; + + private static Environment environment; + private static Consumer consumer = null; + + @ConfigProperty(name = "debezium.source.database.hostname") + String dbHostname; + + @ConfigProperty(name = "debezium.source.database.port") + String dbPort; + + @ConfigProperty(name = "debezium.source.database.user") + String dbUser; + + @ConfigProperty(name = "debezium.source.database.password") + String dbPassword; + + @ConfigProperty(name = "debezium.source.database.dbname") + String dbName; + + private static final List messages = Collections.synchronizedList(new ArrayList<>()); + + { + Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); + Testing.Files.createTestingFile(RabbitMqTestConfigSource.OFFSET_STORE_PATH); + } + + void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException, TimeoutException { + // start consumer + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(RabbitMqStreamTestResourceLifecycleManager.container.getHost()); + factory.setPort(RabbitMqStreamTestResourceLifecycleManager.getPort()); + + environment = Environment.builder() + .host(factory.getHost()) + .port(factory.getPort()).build(); + + environment.streamCreator().stream(RabbitMqTestConfigSource.TOPIC_NAME).create(); + + Consumer consumer = environment.consumerBuilder() + .stream(RabbitMqTestConfigSource.TOPIC_NAME) + .offset(OffsetSpecification.first()) + .messageHandler((offset, message) -> { + messages.add(new String(message.getBodyAsBinary())); + }) + .build(); + } + + @AfterAll + static void stop() throws IOException, TimeoutException { + if (consumer != null) { + consumer.close(); + } + if (environment != null) { + environment.close(); + } + } + + void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception { + if (!event.isSuccess()) { + throw new RuntimeException(event.getError().get()); + } + } + + @Test + public void testRabbitMqStream() throws Exception { + + // consume record + Awaitility.await() + .atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds())) + .until(() -> messages.size() >= MESSAGE_COUNT); + + assertThat(messages.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT); + messages.clear(); + + final JdbcConfiguration config = JdbcConfiguration.create() + .with("hostname", dbHostname) + .with("port", dbPort) + .with("user", dbUser) + .with("password", dbPassword) + .with("dbname", dbName) + .build(); + try (PostgresConnection connection = new PostgresConnection(config, "Debezium RabbitMQ Stream Test")) { + connection.execute( + "INSERT INTO inventory.customers VALUES (10000, 'John', 'Doe', 'jdoe@example.org')", + "DELETE FROM inventory.customers WHERE id=10000"); + } + + // consume INSERT, DELETE, null (tombstone) + Awaitility.await() + .atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds())) + .until(() -> messages.size() >= 3); + + assertThat(messages.size()).isGreaterThanOrEqualTo(3); + assertThat(messages.get(2)).isEqualTo("default"); + } +} diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java new file mode 100644 index 00000000..08981c94 --- /dev/null +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java @@ -0,0 +1,65 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.rabbitmq; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +/** + * Manages the lifecycle of a RabbitMQ cluster test resource. + */ +public class RabbitMqStreamTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager { + + public static final int PORT = 5552; + public static RabbitMqContainer container = new RabbitMqContainer(); + private static final AtomicBoolean running = new AtomicBoolean(false); + + private static synchronized void init() throws IOException, InterruptedException { + if (!running.get()) { + + container.start(); + container.execInContainer("rabbitmq-plugins", "enable", "--all"); + running.set(true); + } + } + + @Override + public Map start() { + try { + init(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map params = new ConcurrentHashMap<>(); + params.put("debezium.sink.rabbitmqstream.connection.host", container.getHost()); + params.put("debezium.sink.rabbitmqstream.connection.port", String.valueOf(getPort())); + return params; + } + + @Override + public void stop() { + try { + if (container != null) { + container.stop(); + } + } + catch (Exception e) { + // ignored + } + } + + public static int getPort() { + return container.getMappedPort(PORT); + } +} diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java index 47d4fe88..8bab276e 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java @@ -5,12 +5,11 @@ */ package io.debezium.server.rabbitmq; -import java.util.HashMap; -import java.util.Map; - +import io.debezium.server.TestConfigSource; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; -import io.debezium.server.TestConfigSource; +import java.util.HashMap; +import java.util.Map; public class RabbitMqTestConfigSource extends TestConfigSource { @@ -19,7 +18,12 @@ public class RabbitMqTestConfigSource extends TestConfigSource { public RabbitMqTestConfigSource() { final Map rabbitmqConfig = new HashMap<>(); - rabbitmqConfig.put("debezium.sink.type", "rabbitmq"); + String sinkType = System.getProperty("debezium.sink.type"); + if ("rabbitmqstream".equals(sinkType)) { + rabbitmqConfig.put("debezium.sink.type", "rabbitmqstream"); + } else { + rabbitmqConfig.put("debezium.sink.type", "rabbitmq"); + } rabbitmqConfig.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); rabbitmqConfig.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); rabbitmqConfig.put("debezium.source.offset.flush.interval.ms", "0"); diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java index bafe0e74..c5d4ccd3 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java @@ -5,6 +5,7 @@ */ package io.debezium.server.rabbitmq; +import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -20,16 +21,26 @@ public class RabbitMqTestResourceLifecycleManager implements QuarkusTestResource public static RabbitMqContainer container = new RabbitMqContainer(); private static final AtomicBoolean running = new AtomicBoolean(false); - private static synchronized void init() { + private static synchronized void init() throws IOException, InterruptedException { if (!running.get()) { + container.start(); + container.execInContainer("rabbitmq-plugins", "enable", "--all"); running.set(true); } } @Override public Map start() { - init(); + try { + init(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } Map params = new ConcurrentHashMap<>(); params.put("debezium.sink.rabbitmq.connection.host", container.getHost()); params.put("debezium.sink.rabbitmq.connection.port", String.valueOf(getPort())); From d6c1828a5bad1d159b1173443cef7a00949a3eb5 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 27 Nov 2023 13:06:59 +0100 Subject: [PATCH 2/3] DBZ-6703 Fix formatting --- .../io/debezium/server/TestConfigSource.java | 11 ++++--- .../RabbitMqStreamNativeChangeConsumer.java | 33 ++++++++++--------- .../server/rabbitmq/RabbitMqContainer.java | 7 ++-- .../server/rabbitmq/RabbitMqStreamIT.java | 33 ++++++++++--------- .../rabbitmq/RabbitMqTestConfigSource.java | 10 +++--- 5 files changed, 51 insertions(+), 43 deletions(-) diff --git a/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java b/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java index 49f16424..eff84a6c 100644 --- a/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java +++ b/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java @@ -5,16 +5,17 @@ */ package io.debezium.server; -import io.debezium.data.Json; -import io.debezium.util.Testing; -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; -import org.eclipse.microprofile.config.spi.ConfigSource; - import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.eclipse.microprofile.config.spi.ConfigSource; + +import io.debezium.data.Json; +import io.debezium.util.Testing; + /** * A config source used during tests. Amended/overridden by values exposed from test lifecycle listeners. */ diff --git a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java index 2451d8eb..888b6355 100644 --- a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java +++ b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java @@ -5,32 +5,35 @@ */ package io.debezium.server.rabbitmq; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Named; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactoryConfigurator; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Producer; import com.rabbitmq.stream.StreamException; + import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.RecordCommitter; import io.debezium.engine.Header; import io.debezium.server.BaseChangeConsumer; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import jakarta.enterprise.context.Dependent; -import jakarta.inject.Named; -import org.eclipse.microprofile.config.Config; -import org.eclipse.microprofile.config.ConfigProvider; -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; /** * Implementation of the consumer that delivers the messages into RabbitMQ Stream destination. diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java index b5612574..4649cd41 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqContainer.java @@ -5,12 +5,12 @@ */ package io.debezium.server.rabbitmq; +import java.time.Duration; + import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; -import java.time.Duration; - /** * RabbitMQ container */ @@ -24,7 +24,6 @@ public RabbitMqContainer() { super(DEFAULT_IMAGE_NAME); withExposedPorts(BROKER_PORT, STREAM_PORT, 15672); - this.waitStrategy = - Wait.forLogMessage(".*Server startup complete.*", 1).withStartupTimeout(Duration.ofSeconds(60)); + this.waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1).withStartupTimeout(Duration.ofSeconds(60)); } } diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java index de3ab4f0..842bffda 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java @@ -5,10 +5,28 @@ */ package io.debezium.server.rabbitmq; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import jakarta.enterprise.event.Observes; + +import org.awaitility.Awaitility; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.OffsetSpecification; + import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.server.TestConfigSource; @@ -18,21 +36,6 @@ import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; -import jakarta.enterprise.event.Observes; -import org.awaitility.Awaitility; -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeoutException; - -import static org.assertj.core.api.Assertions.assertThat; @QuarkusTest @QuarkusTestResource(PostgresTestResourceLifecycleManager.class) diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java index 8bab276e..b866659c 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java @@ -5,12 +5,13 @@ */ package io.debezium.server.rabbitmq; -import io.debezium.server.TestConfigSource; -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; - import java.util.HashMap; import java.util.Map; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; + +import io.debezium.server.TestConfigSource; + public class RabbitMqTestConfigSource extends TestConfigSource { public static final String TOPIC_NAME = "testc.inventory.customers"; @@ -21,7 +22,8 @@ public RabbitMqTestConfigSource() { String sinkType = System.getProperty("debezium.sink.type"); if ("rabbitmqstream".equals(sinkType)) { rabbitmqConfig.put("debezium.sink.type", "rabbitmqstream"); - } else { + } + else { rabbitmqConfig.put("debezium.sink.type", "rabbitmq"); } rabbitmqConfig.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); From 6fd4ed83ce353dbb31aa9203f0671453a8a1ffdd Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 27 Nov 2023 13:46:35 +0100 Subject: [PATCH 3/3] DBZ-6703 Run tests correctly --- debezium-server-rabbitmq/pom.xml | 18 +++++++++++++- .../RabbitMqStreamNativeChangeConsumer.java | 24 +++++++++---------- .../debezium/server/rabbitmq/RabbitMqIT.java | 2 ++ .../server/rabbitmq/RabbitMqStreamIT.java | 2 +- .../rabbitmq/RabbitMqTestConfigSource.java | 2 ++ .../RabbitMqTestResourceLifecycleManager.java | 5 +--- 6 files changed, 34 insertions(+), 19 deletions(-) diff --git a/debezium-server-rabbitmq/pom.xml b/debezium-server-rabbitmq/pom.xml index 0891dfcd..24a2c9bd 100644 --- a/debezium-server-rabbitmq/pom.xml +++ b/debezium-server-rabbitmq/pom.xml @@ -113,10 +113,26 @@ maven-failsafe-plugin - integration-test + integration-test-redismq integration-test + + + rabbitmq + + + + + integration-test-redismq-stream + + integration-test + + + + rabbitmqstream + + verify diff --git a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java index 888b6355..7fef7f8a 100644 --- a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java +++ b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import jakarta.annotation.PostConstruct; @@ -48,17 +49,12 @@ public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer imple private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqStreamNativeChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.rabbitmqstream."; - private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection."; - private static final String STREAM_NAME = PROP_PREFIX + "stream"; - private String stream; - /** - * When true, the routing key is calculated from topic name using stream name mapper. - * When false the routingKey value or empty string is used. - */ + private static final String PROP_STREAM = PROP_PREFIX + "stream"; + private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection."; - @ConfigProperty(name = PROP_PREFIX + "deliveryMode", defaultValue = "2") - int deliveryMode; + @ConfigProperty(name = PROP_STREAM) + Optional stream; @ConfigProperty(name = PROP_PREFIX + "ackTimeout", defaultValue = "30000") int ackTimeout; @@ -88,15 +84,17 @@ void connect() { .host(factory.getHost()) .port(factory.getPort()).build(); - stream = config.getValue(STREAM_NAME, String.class); + if (stream.isEmpty()) { + throw new DebeziumException("Mandatory configration option '" + PROP_STREAM + "' is not provided"); + } - LOGGER.info("Creating stream '{}'", stream); + LOGGER.info("Creating stream '{}'", stream.get()); - environment.streamCreator().stream(stream).create(); + environment.streamCreator().stream(stream.get()).create(); producer = environment.producerBuilder() .confirmTimeout(Duration.ofSeconds(ackTimeout)) - .stream(stream) + .stream(stream.get()) .build(); } diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqIT.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqIT.java index e0b2ea20..d68b2983 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqIT.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqIT.java @@ -21,6 +21,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; @@ -43,6 +44,7 @@ @QuarkusTest @QuarkusTestResource(PostgresTestResourceLifecycleManager.class) @QuarkusTestResource(RabbitMqTestResourceLifecycleManager.class) +@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmq") public class RabbitMqIT { private static final int MESSAGE_COUNT = 4; diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java index 842bffda..33fe3718 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamIT.java @@ -39,7 +39,7 @@ @QuarkusTest @QuarkusTestResource(PostgresTestResourceLifecycleManager.class) -@QuarkusTestResource(value = RabbitMqStreamTestResourceLifecycleManager.class, parallel = true) +@QuarkusTestResource(value = RabbitMqStreamTestResourceLifecycleManager.class) @EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmqstream") public class RabbitMqStreamIT { diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java index b866659c..5045de3a 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestConfigSource.java @@ -22,10 +22,12 @@ public RabbitMqTestConfigSource() { String sinkType = System.getProperty("debezium.sink.type"); if ("rabbitmqstream".equals(sinkType)) { rabbitmqConfig.put("debezium.sink.type", "rabbitmqstream"); + rabbitmqConfig.put("debezium.sink.rabbitmqstream.stream", TOPIC_NAME); } else { rabbitmqConfig.put("debezium.sink.type", "rabbitmq"); } + rabbitmqConfig.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); rabbitmqConfig.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); rabbitmqConfig.put("debezium.source.offset.flush.interval.ms", "0"); diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java index c5d4ccd3..e1b7352d 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqTestResourceLifecycleManager.java @@ -35,10 +35,7 @@ public Map start() { try { init(); } - catch (IOException e) { - throw new RuntimeException(e); - } - catch (InterruptedException e) { + catch (IOException | InterruptedException e) { throw new RuntimeException(e); } Map params = new ConcurrentHashMap<>();