diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java new file mode 100644 index 00000000..2fd26226 --- /dev/null +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.redis.wip; + +import static io.debezium.server.redis.wip.TestConstants.POSTGRES_DATABASE; +import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PASSWORD; +import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PORT; +import static io.debezium.server.redis.wip.TestConstants.POSTGRES_USER; +import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DebeziumServerConfigBuilder { + private final Map config = new HashMap<>(); + + public DebeziumServerConfigBuilder withValue(String key, String value) { + config.put(key, value); + return this; + } + + public List build() { + return config + .entrySet() + .stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.toList()); + } + + public Map baseRedisConfig(TestContainersResource redis) { + return Map.of( + "debezium.sink.type", "redis", + "debezium.sink.redis.address", redis.getContainerIp() + ":" + REDIS_PORT); + } + + public Map basePostgresConfig(TestContainersResource postgres) { + return Map.of("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector", + "debezium.source.offset.flush.interval.ms", "0", + "debezium.source.topic.prefix", "testc", + "debezium.source.schema.include.list", "inventory", + "debezium.source.database.hostname", String.valueOf(postgres.getContainerIp()), + "debezium.source.database.port", String.valueOf(POSTGRES_PORT), + "debezium.source.database.user", POSTGRES_USER, + "debezium.source.database.password", POSTGRES_PASSWORD, + "debezium.source.database.dbname", POSTGRES_DATABASE, + "debezium.source.offset.storage.file.filename", "offset.dat"); + } + + public DebeziumServerConfigBuilder withBaseConfig(TestContainersResource redis, TestContainersResource postgres) { + config.putAll(baseRedisConfig(redis)); + config.putAll(basePostgresConfig(postgres)); + return this; + } +} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersPrototypeIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersPrototypeIT.java index 16a99f66..bc07059b 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersPrototypeIT.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersPrototypeIT.java @@ -5,11 +5,6 @@ */ package io.debezium.server.redis.wip; -import static io.debezium.server.redis.wip.RenameMe_TestUtils.getContainerIp; -import static io.debezium.server.redis.wip.RenameMe_TestUtils.getRedisContainerAddress; -import static io.debezium.server.redis.wip.RenameMe_TestUtils.insertCustomerToPostgres; -import static io.debezium.server.redis.wip.RenameMe_TestUtils.waitForContainerLog; -import static io.debezium.server.redis.wip.RenameMe_TestUtils.waitForContainerStop; import static io.debezium.server.redis.wip.TestConstants.POSTGRES_DATABASE; import static io.debezium.server.redis.wip.TestConstants.POSTGRES_IMAGE; import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PASSWORD; @@ -18,6 +13,13 @@ import static io.debezium.server.redis.wip.TestConstants.REDIS_IMAGE; import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT; import static io.debezium.server.redis.wip.TestProperties.DEBEZIUM_SERVER_IMAGE; +import static io.debezium.server.redis.wip.TestUtils.awaitStreamLength; +import static io.debezium.server.redis.wip.TestUtils.awaitStreamLengthGte; +import static io.debezium.server.redis.wip.TestUtils.getPostgresConnection; +import static io.debezium.server.redis.wip.TestUtils.getRedisContainerAddress; +import static io.debezium.server.redis.wip.TestUtils.insertCustomerToPostgres; +import static io.debezium.server.redis.wip.TestUtils.waitForContainerLog; +import static io.debezium.server.redis.wip.TestUtils.waitForContainerStop; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; @@ -26,15 +28,21 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import io.debezium.server.redis.TestUtils; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.doc.FixFor; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; public class TestContainersPrototypeIT { + private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersPrototypeIT.class); + private final TestContainersResource postgres = TestContainersResource.builder() + .withName("postgres") .withImage(POSTGRES_IMAGE) .withPort(POSTGRES_PORT) .withEnv(List.of("POSTGRES_USER=" + POSTGRES_USER, @@ -45,11 +53,13 @@ public class TestContainersPrototypeIT { .build(); private final TestContainersResource redis = TestContainersResource.builder() + .withName("redis") .withImage(REDIS_IMAGE) .withPort(REDIS_PORT) .build(); private final TestContainersResource server = TestContainersResource.builder() + .withName("debezium server") .withImage(DEBEZIUM_SERVER_IMAGE) .build(); @@ -61,60 +71,99 @@ public void setUp() { @AfterEach public void tearDown() { + server.stop(); postgres.stop(); redis.stop(); } @Test public void shouldStreamChanges() throws InterruptedException, IOException { - server.setEnv( - List.of("debezium.sink.type=redis", - "debezium.sink.redis.address=" + getContainerIp(redis.getContainer()) + ":" + REDIS_PORT, - "debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector", - "debezium.source.offset.flush.interval.ms=0", - "debezium.source.topic.prefix=testc", - "debezium.source.schema.include.list=inventory", - "debezium.source.database.hostname=" + getContainerIp(postgres.getContainer()), - "debezium.source.database.port=" + POSTGRES_PORT, - "debezium.source.database.user=" + POSTGRES_USER, - "debezium.source.database.password=" + POSTGRES_PASSWORD, - "debezium.source.database.dbname=" + POSTGRES_DATABASE, - "debezium.source.offset.storage.file.filename=" + "offset.dat")); - + server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build()); server.start(); Jedis jedis = new Jedis(HostAndPort.from(getRedisContainerAddress(redis))); final int MESSAGE_COUNT = 4; final String STREAM_NAME = "testc.inventory.customers"; - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT); + awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT); assertThat(server.getStandardOutput()).contains("inventory.customers"); insertCustomerToPostgres(postgres.getContainer(), "Sergei", "Savage", "sesa@email.com"); - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT + 1); - System.out.println(server.getStandardOutput()); - server.stop(); + awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT + 1); } @Test public void shouldFailWithIncorrectRedisAddress() { - server.setEnv( - List.of("debezium.sink.type=redis", - "debezium.sink.redis.address=" + getContainerIp(redis.getContainer()) + ":" + 1000, // Incorrect port - "debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector", - "debezium.source.offset.flush.interval.ms=0", - "debezium.source.topic.prefix=testc", - "debezium.source.schema.include.list=inventory", - "debezium.source.database.hostname=" + getContainerIp(postgres.getContainer()), - "debezium.source.database.port=" + POSTGRES_PORT, - "debezium.source.database.user=" + POSTGRES_USER, - "debezium.source.database.password=" + POSTGRES_PASSWORD, - "debezium.source.database.dbname=" + POSTGRES_DATABASE, - "debezium.source.offset.storage.file.filename=" + "offset.dat")); + server.setEnv(new DebeziumServerConfigBuilder() + .withBaseConfig(redis, postgres) + .withValue("debezium.sink.redis.address", redis.getContainerIp() + ":" + 1000) + .build()); server.start(); waitForContainerLog(server.getContainer(), "Failed to connect to any host resolved for DNS name"); waitForContainerStop(server.getContainer()); } + @Test + @FixFor("DBZ-4510") + public void testRedisConnectionRetry() throws Exception { + server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build()); + server.start(); + + final int MESSAGE_COUNT = 5; + final String STREAM_NAME = "testc.inventory.redis_test"; + Jedis jedis = new Jedis(HostAndPort.from(getRedisContainerAddress(redis))); + redis.pause(); + + final PostgresConnection connection = getPostgresConnection(postgres); + LOGGER.info("Creating new redis_test table and inserting 5 records to it"); + connection.execute( + "CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", + "INSERT INTO inventory.redis_test VALUES (1)", + "INSERT INTO inventory.redis_test VALUES (2)", + "INSERT INTO inventory.redis_test VALUES (3)", + "INSERT INTO inventory.redis_test VALUES (4)", + "INSERT INTO inventory.redis_test VALUES (5)"); + connection.close(); + + LOGGER.info("Sleeping for 3 seconds to simulate no connection errors"); + Thread.sleep(3000); + redis.resume(); + + awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT); + } + + @Test + @FixFor("DBZ-4510") + public void testRedisOOMRetry() throws Exception { + server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build()); + server.start(); + + Jedis jedis = new Jedis(HostAndPort.from(getRedisContainerAddress(redis))); + final String STREAM_NAME = "testc.inventory.redis_test2"; + final int TOTAL_RECORDS = 50; + + LOGGER.info("Setting Redis' maxmemory to 1M"); + jedis.configSet("maxmemory", "1M"); + + PostgresConnection connection = getPostgresConnection(postgres); + connection.execute("CREATE TABLE inventory.redis_test2 " + + "(id VARCHAR(100) PRIMARY KEY, " + + "first_name VARCHAR(100), " + + "last_name VARCHAR(100))"); + connection.execute(String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " + + "SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", TOTAL_RECORDS)); + connection.commit(); + + // wait for debezium to stream changes to redis and hit oom + awaitStreamLengthGte(jedis, STREAM_NAME, 0); + Thread.sleep(1000); + LOGGER.info("Entries in " + STREAM_NAME + ":" + jedis.xlen(STREAM_NAME)); + assertThat(jedis.xlen(STREAM_NAME)).isLessThan(TOTAL_RECORDS); + + Thread.sleep(1000); + LOGGER.info("Unsetting Redis' maxmemory"); + jedis.configSet("maxmemory", "30M"); + awaitStreamLength(jedis, STREAM_NAME, TOTAL_RECORDS); + } } diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersResource.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersResource.java index 746e659f..4e0df46d 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersResource.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersResource.java @@ -9,6 +9,8 @@ import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.shaded.org.apache.commons.lang3.NotImplementedException; @@ -16,30 +18,34 @@ import lombok.Getter; import lombok.NonNull; +/** + * Manages the containers ran during tests + * + */ public class TestContainersResource { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersResource.class); + + private final String name; private final String image; @Getter private final int port; + private final String waitForLogRegex; private List env; - private String waitForLogRegex; @Getter private GenericContainer container; - public TestContainersResource(String image, int port, List env) { - this.image = image; - this.port = port; - this.env = env; - } - - public TestContainersResource(@NonNull String image, int port, List env, String waitForLogMessage) { + public TestContainersResource(@NonNull String image, int port, List env, String waitForLogMessage, @NonNull String name) { this.image = image; this.port = port; this.env = env; this.waitForLogRegex = waitForLogMessage; + this.name = name; } public void start() { + LOGGER.info("Starting container " + name); container = new GenericContainer<>(image); if (env != null) { container.setEnv(env); @@ -51,25 +57,35 @@ public void start() { container.waitingFor(new LogMessageWaitStrategy().withRegEx(waitForLogRegex)); } container.start(); - } - // TODO is this best solution? public boolean isRunning() { return container != null && container.isRunning(); } public void stop() { + LOGGER.info("Stopping container " + name); container.stop(); } + public void pause() { + LOGGER.info("Pausing container " + image); + container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec(); + } + + public void resume() { + LOGGER.info("Resuming container " + name); + container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec(); + } + public String getStandardOutput() { return container.getLogs(STDOUT); } public void setEnv(List env) { if (isRunning()) { - throw new NotImplementedException("cannot edit running container.. for now"); + // TODO: consider restarting with different env + throw new NotImplementedException("cannot edit running container"); } this.env = env; @@ -79,11 +95,25 @@ public static Builder builder() { return new Builder(); } + public String getContainerIp() { + return container + .getContainerInfo() + .getNetworkSettings() + .getNetworks() + .entrySet() + .stream() + .findFirst() + .get() + .getValue() + .getIpAddress(); + } + public static class Builder { private String image; private int port = 0; private List env; private String waitForLogRegex; + private String name; public Builder withImage(String image) { this.image = image; @@ -105,8 +135,13 @@ public Builder withWaitForRegex(String waitForLogRegex) { return this; } + public Builder withName(String name) { + this.name = name; + return this; + } + public TestContainersResource build() { - return new TestContainersResource(image, port, env, waitForLogRegex); + return new TestContainersResource(image, port, env, waitForLogRegex, name); } } } diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/RenameMe_TestUtils.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java similarity index 59% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/RenameMe_TestUtils.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java index fcb0763b..f0416739 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/RenameMe_TestUtils.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java @@ -6,6 +6,8 @@ package io.debezium.server.redis.wip; import static io.debezium.server.redis.wip.TestConstants.POSTGRES_DATABASE; +import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PASSWORD; +import static io.debezium.server.redis.wip.TestConstants.POSTGRES_PORT; import static io.debezium.server.redis.wip.TestConstants.POSTGRES_USER; import static org.awaitility.Awaitility.await; @@ -15,7 +17,13 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.OutputFrame; -public class RenameMe_TestUtils { +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.server.TestConfigSource; + +import redis.clients.jedis.Jedis; + +public class TestUtils { public static void waitForContainerLog(GenericContainer container, String expectedLog) { await() .atMost(20, TimeUnit.SECONDS) @@ -52,4 +60,28 @@ public static void insertCustomerToPostgres(GenericContainer container, Strin "-d", POSTGRES_DATABASE, "-c", "INSERT INTO inventory.customers VALUES (default,'" + firstName + "','" + lastName + "','" + email + "')"); } + + public static PostgresConnection getPostgresConnection(TestContainersResource containersResource) { + return new PostgresConnection(JdbcConfiguration.create() + .with("user", POSTGRES_USER) + .with("password", POSTGRES_PASSWORD) + .with("dbname", POSTGRES_DATABASE) + .with("hostname", containersResource.getContainerIp()) + .with("port", POSTGRES_PORT) + .build(), "Debezium Redis Test"); + } + + public static void awaitStreamLengthGte(Jedis jedis, String streamName, int expectedLength) { + await() + .atMost(TestConfigSource.waitForSeconds(), TimeUnit.SECONDS) + .until(() -> jedis.xlen(streamName) >= expectedLength); + + } + + public static void awaitStreamLength(Jedis jedis, String streamName, int expectedLength) { + await() + .atMost(TestConfigSource.waitForSeconds(), TimeUnit.SECONDS) + .until(() -> jedis.xlen(streamName) == expectedLength); + } + }