-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added more test cases, refactored ssl test
- Loading branch information
1 parent
a0f0e31
commit 9b8d213
Showing
7 changed files
with
76 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
keystore and truststore copied from resources. | ||
TODO reconsider if there is more elegant solution that copying truststore here | ||
|
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,8 @@ | |
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import org.slf4j.Logger; | ||
|
@@ -21,8 +23,11 @@ | |
import io.debezium.connector.postgresql.connection.PostgresConnection; | ||
import io.debezium.doc.FixFor; | ||
|
||
public class TestContainersPrototypeIT extends TestContainersRedisTestBase { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersPrototypeIT.class); | ||
import redis.clients.jedis.StreamEntryID; | ||
import redis.clients.jedis.resps.StreamEntry; | ||
|
||
public class TestContainersStreamIT extends TestContainersRedisTestBase { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersStreamIT.class); | ||
|
||
@Test | ||
public void shouldStreamChanges() throws InterruptedException, IOException { | ||
|
@@ -54,37 +59,52 @@ public void shouldFailWithIncorrectRedisAddress() { | |
|
||
@Test | ||
@FixFor("DBZ-4510") | ||
public void testRedisConnectionRetry() throws Exception { | ||
public void shouldRetryAfterRedisCrash() throws Exception { | ||
final int SOCKET_TIMEOUT = 4000; | ||
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres) | ||
.withValue("debezium.sink.redis.socket.timeout.ms", String.valueOf(SOCKET_TIMEOUT)) | ||
.build()); | ||
server.start(); | ||
|
||
final int MESSAGE_COUNT = 5; | ||
final String STREAM_NAME = "testc.inventory.redis_test"; | ||
final int MESSAGE_COUNT = 4; | ||
final String STREAM_NAME = "testc.inventory.customers"; | ||
awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT); | ||
redis.pause(); | ||
|
||
final PostgresConnection connection = getPostgresConnection(postgres); | ||
LOGGER.info("Creating new redis_test table and inserting 5 records to it"); | ||
connection.execute( | ||
"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", | ||
"INSERT INTO inventory.redis_test VALUES (1)", | ||
"INSERT INTO inventory.redis_test VALUES (2)", | ||
"INSERT INTO inventory.redis_test VALUES (3)", | ||
"INSERT INTO inventory.redis_test VALUES (4)", | ||
"INSERT INTO inventory.redis_test VALUES (5)"); | ||
connection.close(); | ||
insertCustomerToPostgres(postgres, "Sergei", "Savage", "[email protected]"); | ||
|
||
LOGGER.info("Sleeping for " + SOCKET_TIMEOUT / 2 + " milis to simulate no connection errors"); | ||
Thread.sleep(SOCKET_TIMEOUT / 2); | ||
redis.unpause(); | ||
awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT + 1); | ||
} | ||
|
||
@Test | ||
public void shouldTimeoutAfterRedisCrash() throws Exception { | ||
final int SOCKET_TIMEOUT = 2000; | ||
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres) | ||
.withValue("debezium.sink.redis.socket.timeout.ms", String.valueOf(SOCKET_TIMEOUT)) | ||
.build()); | ||
server.start(); | ||
|
||
final int MESSAGE_COUNT = 4; | ||
final String STREAM_NAME = "testc.inventory.customers"; | ||
awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT); | ||
|
||
redis.pause(); | ||
insertCustomerToPostgres(postgres, "Sergei", "Savage", "[email protected]"); | ||
|
||
LOGGER.info("Sleeping for " + SOCKET_TIMEOUT / 2 + " milis to simulate no connection errors"); | ||
Thread.sleep(SOCKET_TIMEOUT / 2); | ||
assertThat(server.isRunning()).isTrue(); | ||
|
||
waitForContainerLog(server, "Read timed out", 2); | ||
waitForContainerStop(server); | ||
} | ||
|
||
@Test | ||
@FixFor("DBZ-4510") | ||
public void testRedisOOMRetry() throws Exception { | ||
public void shouldRetryAfterRedisOOM() throws Exception { | ||
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres).build()); | ||
server.start(); | ||
|
||
|
@@ -110,4 +130,25 @@ public void testRedisOOMRetry() throws Exception { | |
jedis.configSet("maxmemory", "30M"); | ||
awaitStreamLength(jedis, STREAM_NAME, TOTAL_RECORDS); | ||
} | ||
|
||
@Test | ||
public void shouldStreamExtendedMessageFormat() { | ||
server.setEnv(new DebeziumServerConfigBuilder().withBaseConfig(redis, postgres) | ||
.withValue("debezium.sink.redis.message.format", "extended") | ||
.build()); | ||
server.start(); | ||
final int MESSAGE_COUNT = 4; | ||
final String STREAM_NAME = "testc.inventory.customers"; | ||
|
||
awaitStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT); | ||
|
||
final List<StreamEntry> entries = jedis.xrange(STREAM_NAME, null, (StreamEntryID) null); | ||
for (StreamEntry entry : entries) { | ||
Map<String, String> map = entry.getFields(); | ||
// TODO verify, that there should really be 2 fields | ||
// assertEquals(3, map.size(), "Expected map of size 3"); | ||
assertThat(map.get("key")).startsWith("{\"schema\":"); | ||
assertThat(map.get("value")).startsWith("{\"schema\":"); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters