Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-8018 Use all record fields as primary key columns when no primary…
Browse files Browse the repository at this point in the history
… key fields are specified
  • Loading branch information
mfvitale authored and Naros committed Jul 18, 2024
1 parent b14ba9b commit fe65301
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
28 changes: 12 additions & 16 deletions src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -396,30 +397,25 @@ private void applyRecordHeaderAsPrimaryKey(SinkRecord record) {
}

private void applyRecordValueAsPrimaryKey(SinkRecord record, boolean flattened) {
if (primaryKeyFields.isEmpty()) {
throw new ConnectException("At least one " + JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS +
" field name should be specified when resolving keys from the record's value.");
}

final Schema valueSchema = record.valueSchema();
if (valueSchema == null) {
throw new ConnectException("Configured primary key mode 'record_value' cannot have null schema");
}
else if (flattened) {
for (Field field : record.valueSchema().fields()) {
if (primaryKeyFields.contains(field.name())) {
addKeyField(record.topic(), field);
}
}

Stream<Field> recordFields;
if (flattened) {
recordFields = record.valueSchema().fields().stream();
}
else {
final Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
for (Field field : after.schema().fields()) {
if (primaryKeyFields.contains(field.name())) {
addKeyField(record.topic(), field);
}
}
recordFields = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER).schema().fields().stream();
}

if (!primaryKeyFields.isEmpty()) {
recordFields = recordFields.filter(field -> primaryKeyFields.contains(field.name()));
}

recordFields.forEach(field -> addKeyField(record.topic(), field));
}

private void applyPrimitiveRecordKeyAsPrimaryKey(Schema keySchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,19 @@ public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordValueWithNoFie

final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
try {
consume(factory.createRecord(topicName));
stopSinkConnector();
}
catch (Exception e) {
assertThat(e.getCause().getCause().getMessage()).contains("At least one primary.key.fields field name should be specified");
}
final SinkRecord createRecord = factory.createRecordNoKey(topicName);
consume(createRecord);

final String destinationTableName = destinationTableName(createRecord);

final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName);
tableAssert.exists().hasNumberOfColumns(3);

getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
getSink().assertColumnType(tableAssert, "name", ValueType.TEXT, "John Doe");
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$");

assertHasPrimaryKeyColumns(destinationTableName, "id", "name", "nick_name$");
}

@ParameterizedTest
Expand Down Expand Up @@ -396,6 +402,7 @@ protected void assertHasPrimaryKeyColumns(String tableName, boolean caseInsensit
}
else if (caseInsensitive) {
pkColumnNames = pkColumnNames.stream().map(String::toLowerCase).collect(Collectors.toList());
assertThat(pkColumnNames.size()).isEqualTo(columnNames.length);
for (int columnIndex = 0; columnIndex < columnNames.length; ++columnIndex) {
assertThat(pkColumnNames).contains(columnNames[columnIndex].toLowerCase(), Index.atIndex(columnIndex));
}
Expand Down

0 comments on commit fe65301

Please sign in to comment.