Skip to content

Commit

Permalink
Refactor, added test cases from RedisStreamIT and simple config builder
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinMedek committed Aug 9, 2023
1 parent 2695ebd commit 48462db
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.redis.wip;

import static io.debezium.server.redis.wip.TestConstants.POSTGRES_DATABASE;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PASSWORD;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PORT;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_USER;
import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DebeziumServerConfigBuilder {
private final Map<String, String> config = new HashMap<>();

public DebeziumServerConfigBuilder withValue(String key, String value) {
config.put(key, value);
return this;
}

public List<String> build() {
return config
.entrySet()
.stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.toList());
}

public Map<String, String> baseRedisConfig(TestContainersResource redis) {
return Map.of(
"debezium.sink.type", "redis",
"debezium.sink.redis.address", redis.getContainerIp() + ":" + REDIS_PORT);
}

public Map<String, String> basePostgresConfig(TestContainersResource postgres) {
return Map.of("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector",
"debezium.source.offset.flush.interval.ms", "0",
"debezium.source.topic.prefix", "testc",
"debezium.source.schema.include.list", "inventory",
"debezium.source.database.hostname", String.valueOf(postgres.getContainerIp()),
"debezium.source.database.port", String.valueOf(POSTGRES_PORT),
"debezium.source.database.user", POSTGRES_USER,
"debezium.source.database.password", POSTGRES_PASSWORD,
"debezium.source.database.dbname", POSTGRES_DATABASE,
"debezium.source.offset.storage.file.filename", "offset.dat");
}

public DebeziumServerConfigBuilder withBaseConfig(TestContainersResource redis, TestContainersResource postgres) {
config.putAll(baseRedisConfig(redis));
config.putAll(basePostgresConfig(postgres));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
*/
package io.debezium.server.redis.wip;

import static io.debezium.server.redis.wip.RenameMe_TestUtils.getContainerIp;
import static io.debezium.server.redis.wip.RenameMe_TestUtils.getRedisContainerAddress;
import static io.debezium.server.redis.wip.RenameMe_TestUtils.insertCustomerToPostgres;
import static io.debezium.server.redis.wip.RenameMe_TestUtils.waitForContainerLog;
import static io.debezium.server.redis.wip.RenameMe_TestUtils.waitForContainerStop;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_DATABASE;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_IMAGE;
import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PASSWORD;
Expand All @@ -18,6 +13,13 @@
import static io.debezium.server.redis.wip.TestConstants.REDIS_IMAGE;
import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
import static io.debezium.server.redis.wip.TestProperties.DEBEZIUM_SERVER_IMAGE;
import static io.debezium.server.redis.wip.TestUtils.awaitStreamLength;
import static io.debezium.server.redis.wip.TestUtils.awaitStreamLengthGte;
import static io.debezium.server.redis.wip.TestUtils.getPostgresConnection;
import static io.debezium.server.redis.wip.TestUtils.getRedisContainerAddress;
import static io.debezium.server.redis.wip.TestUtils.insertCustomerToPostgres;
import static io.debezium.server.redis.wip.TestUtils.waitForContainerLog;
import static io.debezium.server.redis.wip.TestUtils.waitForContainerStop;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
Expand All @@ -26,15 +28,21 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.server.redis.TestUtils;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;

public class TestContainersPrototypeIT {

private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersPrototypeIT.class);

private final TestContainersResource postgres = TestContainersResource.builder()
.withName("postgres")
.withImage(POSTGRES_IMAGE)
.withPort(POSTGRES_PORT)
.withEnv(List.of("POSTGRES_USER=" + POSTGRES_USER,
Expand All @@ -45,11 +53,13 @@ public class TestContainersPrototypeIT {
.build();

private final TestContainersResource redis = TestContainersResource.builder()
.withName("redis")
.withImage(REDIS_IMAGE)
.withPort(REDIS_PORT)
.build();

private final TestContainersResource server = TestContainersResource.builder()
.withName("debezium server")
.withImage(DEBEZIUM_SERVER_IMAGE)
.build();

Expand All @@ -61,60 +71,99 @@ public void setUp() {

@AfterEach
public void tearDown() {
server.stop();
postgres.stop();
redis.stop();
}

@Test
public void shouldStreamChanges() throws InterruptedException, IOException {
server.setEnv(
List.of("debezium.sink.type=redis",
"debezium.sink.redis.address=" + getContainerIp(redis.getContainer()) + ":" + REDIS_PORT,
"debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector",
"debezium.source.offset.flush.interval.ms=0",
"debezium.source.topic.prefix=testc",
"debezium.source.schema.include.list=inventory",
"debezium.source.database.hostname=" + getContainerIp(postgres.getContainer()),
"debezium.source.database.port=" + POSTGRES_PORT,
"debezium.source.database.user=" + POSTGRES_USER,
"debezium.source.database.password=" + POSTGRES_PASSWORD,
"debezium.source.database.dbname=" + POSTGRES_DATABASE,
"debezium.source.offset.storage.file.filename=" + "offset.dat"));

server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build());
server.start();
Jedis jedis = new Jedis(HostAndPort.from(getRedisContainerAddress(redis)));
final int MESSAGE_COUNT = 4;
final String STREAM_NAME = "testc.inventory.customers";

TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT);
awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT);
assertThat(server.getStandardOutput()).contains("inventory.customers");

insertCustomerToPostgres(postgres.getContainer(), "Sergei", "Savage", "[email protected]");

TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT + 1);
System.out.println(server.getStandardOutput());
server.stop();
awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT + 1);
}

@Test
public void shouldFailWithIncorrectRedisAddress() {
server.setEnv(
List.of("debezium.sink.type=redis",
"debezium.sink.redis.address=" + getContainerIp(redis.getContainer()) + ":" + 1000, // Incorrect port
"debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector",
"debezium.source.offset.flush.interval.ms=0",
"debezium.source.topic.prefix=testc",
"debezium.source.schema.include.list=inventory",
"debezium.source.database.hostname=" + getContainerIp(postgres.getContainer()),
"debezium.source.database.port=" + POSTGRES_PORT,
"debezium.source.database.user=" + POSTGRES_USER,
"debezium.source.database.password=" + POSTGRES_PASSWORD,
"debezium.source.database.dbname=" + POSTGRES_DATABASE,
"debezium.source.offset.storage.file.filename=" + "offset.dat"));
server.setEnv(new DebeziumServerConfigBuilder()
.withBaseConfig(redis, postgres)
.withValue("debezium.sink.redis.address", redis.getContainerIp() + ":" + 1000)
.build());

server.start();
waitForContainerLog(server.getContainer(), "Failed to connect to any host resolved for DNS name");
waitForContainerStop(server.getContainer());
}

@Test
@FixFor("DBZ-4510")
public void testRedisConnectionRetry() throws Exception {
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build());
server.start();

final int MESSAGE_COUNT = 5;
final String STREAM_NAME = "testc.inventory.redis_test";
Jedis jedis = new Jedis(HostAndPort.from(getRedisContainerAddress(redis)));
redis.pause();

final PostgresConnection connection = getPostgresConnection(postgres);
LOGGER.info("Creating new redis_test table and inserting 5 records to it");
connection.execute(
"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)",
"INSERT INTO inventory.redis_test VALUES (1)",
"INSERT INTO inventory.redis_test VALUES (2)",
"INSERT INTO inventory.redis_test VALUES (3)",
"INSERT INTO inventory.redis_test VALUES (4)",
"INSERT INTO inventory.redis_test VALUES (5)");
connection.close();

LOGGER.info("Sleeping for 3 seconds to simulate no connection errors");
Thread.sleep(3000);
redis.resume();

awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT);
}

@Test
@FixFor("DBZ-4510")
public void testRedisOOMRetry() throws Exception {
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build());
server.start();

Jedis jedis = new Jedis(HostAndPort.from(getRedisContainerAddress(redis)));
final String STREAM_NAME = "testc.inventory.redis_test2";
final int TOTAL_RECORDS = 50;

LOGGER.info("Setting Redis' maxmemory to 1M");
jedis.configSet("maxmemory", "1M");

PostgresConnection connection = getPostgresConnection(postgres);
connection.execute("CREATE TABLE inventory.redis_test2 " +
"(id VARCHAR(100) PRIMARY KEY, " +
"first_name VARCHAR(100), " +
"last_name VARCHAR(100))");
connection.execute(String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " +
"SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", TOTAL_RECORDS));
connection.commit();

// wait for debezium to stream changes to redis and hit oom
awaitStreamLengthGte(jedis, STREAM_NAME, 0);
Thread.sleep(1000);
LOGGER.info("Entries in " + STREAM_NAME + ":" + jedis.xlen(STREAM_NAME));
assertThat(jedis.xlen(STREAM_NAME)).isLessThan(TOTAL_RECORDS);

Thread.sleep(1000);
LOGGER.info("Unsetting Redis' maxmemory");
jedis.configSet("maxmemory", "30M");
awaitStreamLength(jedis, STREAM_NAME, TOTAL_RECORDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,43 @@

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.shaded.org.apache.commons.lang3.NotImplementedException;

import lombok.Getter;
import lombok.NonNull;

/**
* Manages the containers ran during tests
*
*/
public class TestContainersResource {

private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersResource.class);

private final String name;
private final String image;
@Getter
private final int port;
private final String waitForLogRegex;
private List<String> env;
private String waitForLogRegex;

@Getter
private GenericContainer<?> container;

public TestContainersResource(String image, int port, List<String> env) {
this.image = image;
this.port = port;
this.env = env;
}

public TestContainersResource(@NonNull String image, int port, List<String> env, String waitForLogMessage) {
public TestContainersResource(@NonNull String image, int port, List<String> env, String waitForLogMessage, @NonNull String name) {
this.image = image;
this.port = port;
this.env = env;
this.waitForLogRegex = waitForLogMessage;
this.name = name;
}

public void start() {
LOGGER.info("Starting container " + name);
container = new GenericContainer<>(image);
if (env != null) {
container.setEnv(env);
Expand All @@ -51,25 +57,35 @@ public void start() {
container.waitingFor(new LogMessageWaitStrategy().withRegEx(waitForLogRegex));
}
container.start();

}

// TODO is this best solution?
public boolean isRunning() {
return container != null && container.isRunning();
}

public void stop() {
LOGGER.info("Stopping container " + name);
container.stop();
}

public void pause() {
LOGGER.info("Pausing container " + image);
container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
}

public void resume() {
LOGGER.info("Resuming container " + name);
container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
}

public String getStandardOutput() {
return container.getLogs(STDOUT);
}

public void setEnv(List<String> env) {
if (isRunning()) {
throw new NotImplementedException("cannot edit running container.. for now");
// TODO: consider restarting with different env
throw new NotImplementedException("cannot edit running container");
}

this.env = env;
Expand All @@ -79,11 +95,25 @@ public static Builder builder() {
return new Builder();
}

public String getContainerIp() {
return container
.getContainerInfo()
.getNetworkSettings()
.getNetworks()
.entrySet()
.stream()
.findFirst()
.get()
.getValue()
.getIpAddress();
}

public static class Builder {
private String image;
private int port = 0;
private List<String> env;
private String waitForLogRegex;
private String name;

public Builder withImage(String image) {
this.image = image;
Expand All @@ -105,8 +135,13 @@ public Builder withWaitForRegex(String waitForLogRegex) {
return this;
}

public Builder withName(String name) {
this.name = name;
return this;
}

public TestContainersResource build() {
return new TestContainersResource(image, port, env, waitForLogRegex);
return new TestContainersResource(image, port, env, waitForLogRegex, name);
}
}
}
Loading

0 comments on commit 48462db

Please sign in to comment.