From fe65301bd8624c7598782b59d9067f79c9ec5de6 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Thu, 18 Jul 2024 12:04:09 +0200 Subject: [PATCH] DBZ-8018 Use all record fields as primary key columns when no primary key fields are specified --- .../connector/jdbc/SinkRecordDescriptor.java | 28 ++++++++----------- .../AbstractJdbcSinkPrimaryKeyModeTest.java | 21 +++++++++----- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java b/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java index c17f7b41..69631fc5 100644 --- a/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java +++ b/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Stream; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -396,30 +397,25 @@ private void applyRecordHeaderAsPrimaryKey(SinkRecord record) { } private void applyRecordValueAsPrimaryKey(SinkRecord record, boolean flattened) { - if (primaryKeyFields.isEmpty()) { - throw new ConnectException("At least one " + JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS + - " field name should be specified when resolving keys from the record's value."); - } final Schema valueSchema = record.valueSchema(); if (valueSchema == null) { throw new ConnectException("Configured primary key mode 'record_value' cannot have null schema"); } - else if (flattened) { - for (Field field : record.valueSchema().fields()) { - if (primaryKeyFields.contains(field.name())) { - addKeyField(record.topic(), field); - } - } + + Stream recordFields; + if (flattened) { + recordFields = record.valueSchema().fields().stream(); } else { - final Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); - for (Field field : after.schema().fields()) { - if (primaryKeyFields.contains(field.name())) { - addKeyField(record.topic(), field); - } - } + recordFields = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER).schema().fields().stream(); } + + if (!primaryKeyFields.isEmpty()) { + recordFields = recordFields.filter(field -> primaryKeyFields.contains(field.name())); + } + + recordFields.forEach(field -> addKeyField(record.topic(), field)); } private void applyPrimitiveRecordKeyAsPrimaryKey(Schema keySchema) { diff --git a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkPrimaryKeyModeTest.java b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkPrimaryKeyModeTest.java index 2260137e..edc0232d 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkPrimaryKeyModeTest.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkPrimaryKeyModeTest.java @@ -292,13 +292,19 @@ public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordValueWithNoFie final String tableName = randomTableName(); final String topicName = topicName("server1", "schema", tableName); - try { - consume(factory.createRecord(topicName)); - stopSinkConnector(); - } - catch (Exception e) { - assertThat(e.getCause().getCause().getMessage()).contains("At least one primary.key.fields field name should be specified"); - } + final SinkRecord createRecord = factory.createRecordNoKey(topicName); + consume(createRecord); + + final String destinationTableName = destinationTableName(createRecord); + + final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName); + tableAssert.exists().hasNumberOfColumns(3); + + getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1); + getSink().assertColumnType(tableAssert, "name", ValueType.TEXT, "John Doe"); + getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$"); + + assertHasPrimaryKeyColumns(destinationTableName, "id", "name", "nick_name$"); } @ParameterizedTest @@ -396,6 +402,7 @@ protected void assertHasPrimaryKeyColumns(String tableName, boolean caseInsensit } else if (caseInsensitive) { pkColumnNames = pkColumnNames.stream().map(String::toLowerCase).collect(Collectors.toList()); + assertThat(pkColumnNames.size()).isEqualTo(columnNames.length); for (int columnIndex = 0; columnIndex < columnNames.length; ++columnIndex) { assertThat(pkColumnNames).contains(columnNames[columnIndex].toLowerCase(), Index.atIndex(columnIndex)); }