Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6703 Support native RabbitMQ Streams #49

Merged
merged 3 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions debezium-server-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>0.14.0</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
Expand Down Expand Up @@ -109,10 +113,26 @@
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<id>integration-test-redismq</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemProperties>
<debezium.sink.type>rabbitmq</debezium.sink.type>
</systemProperties>
</configuration>
</execution>
<execution>
<id>integration-test-redismq-stream</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemProperties>
<debezium.sink.type>rabbitmqstream</debezium.sink.type>
</systemProperties>
</configuration>
</execution>
<execution>
<id>verify</id>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.rabbitmq;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionFactoryConfigurator;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamException;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.engine.Header;
import io.debezium.server.BaseChangeConsumer;

/**
* Implementation of the consumer that delivers the messages into RabbitMQ Stream destination.
*
* @author Olivier Boudet
*
*/
@Named("rabbitmqstream")
@Dependent
public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
ahmedrachid marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqStreamNativeChangeConsumer.class);

private static final String PROP_PREFIX = "debezium.sink.rabbitmqstream.";

private static final String PROP_STREAM = PROP_PREFIX + "stream";
private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection.";

@ConfigProperty(name = PROP_STREAM)
Optional<String> stream;

@ConfigProperty(name = PROP_PREFIX + "ackTimeout", defaultValue = "30000")
int ackTimeout;

@ConfigProperty(name = PROP_PREFIX + "null.value", defaultValue = "default")
String nullValue;

Environment environment;

Producer producer;

@PostConstruct
void connect() {

final Config config = ConfigProvider.getConfig();
ConnectionFactory factory = new ConnectionFactory();

Map<String, String> configProperties = getConfigSubset(config, PROP_CONNECTION_PREFIX).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> (entry.getValue() == null) ? null : entry.getValue().toString()));

ConnectionFactoryConfigurator.load(factory, configProperties, "");
LOGGER.info("Using connection to {}:{}", factory.getHost(), factory.getPort());

try {
environment = Environment.builder()
.host(factory.getHost())
.port(factory.getPort()).build();

if (stream.isEmpty()) {
throw new DebeziumException("Mandatory configration option '" + PROP_STREAM + "' is not provided");
}

LOGGER.info("Creating stream '{}'", stream.get());

environment.streamCreator().stream(stream.get()).create();

producer = environment.producerBuilder()
.confirmTimeout(Duration.ofSeconds(ackTimeout))
.stream(stream.get())
.build();

}
catch (StreamException | IllegalArgumentException e) {
throw new DebeziumException(e);
}
}

@PreDestroy
void close() {

try {
if (producer != null) {
producer.close();
}
if (environment != null) {
environment.close();
}
}
catch (Exception e) {
throw new DebeziumException(e);
}

}

@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);
try {
final Object value = (record.value() != null) ? record.value() : nullValue;
producer.send(
producer.messageBuilder().addData(getBytes(value)).build(),
confirmationStatus -> {
});

}
catch (StreamException e) {
throw new DebeziumException(e);
}

committer.markProcessed(record);
}

LOGGER.trace("Sent messages");
committer.markBatchFinished();
}

@Override
public boolean supportsTombstoneEvents() {
return false;
}

private Map<String, Object> convertRabbitMqHeaders(ChangeEvent<Object, Object> record) {
List<Header<Object>> headers = record.headers();
Map<String, Object> rabbitMqHeaders = new HashMap<>();
for (Header<Object> header : headers) {
rabbitMqHeaders.put(header.getKey(), header.getValue());
}
return rabbitMqHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,25 @@
*/
package io.debezium.server.rabbitmq;

import java.time.Duration;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

/**
* RabbitMQ container
*/
public class RabbitMqContainer extends GenericContainer<RabbitMqContainer> {

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("rabbitmq:3.10.19-management");
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("rabbitmq:3.12.9-management");
public static final int BROKER_PORT = 5672;
public static final int STREAM_PORT = 5552;

public RabbitMqContainer() {
super(DEFAULT_IMAGE_NAME);
withExposedPorts(BROKER_PORT, 15672);
withExposedPorts(BROKER_PORT, STREAM_PORT, 15672);
ahmedrachid marked this conversation as resolved.
Show resolved Hide resolved

this.waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1).withStartupTimeout(Duration.ofSeconds(60));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
Expand All @@ -43,6 +44,7 @@
@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(RabbitMqTestResourceLifecycleManager.class)
@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmq")
public class RabbitMqIT {

private static final int MESSAGE_COUNT = 4;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.rabbitmq;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;

import jakarta.enterprise.event.Observes;

import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(value = RabbitMqStreamTestResourceLifecycleManager.class)
@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmqstream")
public class RabbitMqStreamIT {

private static final int MESSAGE_COUNT = 4;

private static Environment environment;
private static Consumer consumer = null;

@ConfigProperty(name = "debezium.source.database.hostname")
String dbHostname;

@ConfigProperty(name = "debezium.source.database.port")
String dbPort;

@ConfigProperty(name = "debezium.source.database.user")
String dbUser;

@ConfigProperty(name = "debezium.source.database.password")
String dbPassword;

@ConfigProperty(name = "debezium.source.database.dbname")
String dbName;

private static final List<String> messages = Collections.synchronizedList(new ArrayList<>());

{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(RabbitMqTestConfigSource.OFFSET_STORE_PATH);
}

void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException, TimeoutException {
// start consumer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMqStreamTestResourceLifecycleManager.container.getHost());
factory.setPort(RabbitMqStreamTestResourceLifecycleManager.getPort());

environment = Environment.builder()
.host(factory.getHost())
.port(factory.getPort()).build();

environment.streamCreator().stream(RabbitMqTestConfigSource.TOPIC_NAME).create();

Consumer consumer = environment.consumerBuilder()
.stream(RabbitMqTestConfigSource.TOPIC_NAME)
.offset(OffsetSpecification.first())
.messageHandler((offset, message) -> {
messages.add(new String(message.getBodyAsBinary()));
})
.build();
}

@AfterAll
static void stop() throws IOException, TimeoutException {
if (consumer != null) {
consumer.close();
}
if (environment != null) {
environment.close();
}
}

void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) {
throw new RuntimeException(event.getError().get());
}
}

@Test
public void testRabbitMqStream() throws Exception {

// consume record
Awaitility.await()
.atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds()))
.until(() -> messages.size() >= MESSAGE_COUNT);

assertThat(messages.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT);
messages.clear();

final JdbcConfiguration config = JdbcConfiguration.create()
.with("hostname", dbHostname)
.with("port", dbPort)
.with("user", dbUser)
.with("password", dbPassword)
.with("dbname", dbName)
.build();
try (PostgresConnection connection = new PostgresConnection(config, "Debezium RabbitMQ Stream Test")) {
connection.execute(
"INSERT INTO inventory.customers VALUES (10000, 'John', 'Doe', '[email protected]')",
"DELETE FROM inventory.customers WHERE id=10000");
}

// consume INSERT, DELETE, null (tombstone)
Awaitility.await()
.atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds()))
.until(() -> messages.size() >= 3);

assertThat(messages.size()).isGreaterThanOrEqualTo(3);
assertThat(messages.get(2)).isEqualTo("default");
}
}
Loading