Single Message Transforms, or SMTs, can be used to customize fields or values of events during data flow.
The examples below demonstrate modifying fields for events flowing from the Kafka topic to a Cloudant database using the sink connector.
-
If the event value contains an existing field, not called
_id
, that is suitable to use as the Cloudant document ID, then you can use theRenameField
transform. For instance, to use the fieldkafka_event_id
:transforms=RenameField transforms.RenameField.type=org.apache.kafka.connect.transforms.ReplaceField$Value transforms.RenameField.renames=kafka_event_id:_id
-
If you have
_id
fields and would prefer to have Cloudant generate a UUID for the document ID, use theReplaceField
transform to exclude the existing_id
field:transforms=ReplaceField transforms.ReplaceField.type=org.apache.kafka.connect.transforms.ReplaceField$Value transforms.ReplaceField.exclude=_id
-
If you want to use the Kafka event key or another custom value as the document ID then use the
cloudant_doc_id
custom header. The value set in this custom header will be added to the_id
field. If the_id
field already exists then it will be overwritten with the value in this header. You can use theHeaderFrom
SMT to move or copy a key to the custom header. The example config below adds the transform to move thedocid
event key to thecloudant_doc_id
custom header and sets the header converter to string:transforms=MoveFieldsToHeaders transforms.MoveFieldsToHeaders.type=org.apache.kafka.connect.transforms.HeaderFrom$Key transforms.MoveFieldsToHeaders.fields=docid transforms.MoveFieldsToHeaders.headers=cloudant_doc_id transforms.MoveFieldsToHeaders.operation=move header.converter=org.apache.kafka.connect.storage.StringConverter
Note: The
header.converter
is required to be set toStringConverter
since the document ID field only supports strings. -
If you have events where the
_id
field is absent ornull
then Cloudant will generate a document ID. If you don't want this to happen then set an_id
(see earlier examples). If you need to filter out those documents or drop_id
fields when the value isnull
then you'll need to create a custom SMT.
Note: For any of the SMTs above, if the field does not exist it will leave the event unmodified and continue processing the next event.
-
If your events already have a
_rev
field, but you do not want this field to be written to Cloudant, remove the field using aReplaceField
SMT:transforms=ReplaceField transforms.ReplaceField.type=org.apache.kafka.connect.transforms.ReplaceField$Value transforms.ReplaceField.exclude=_rev
The examples below demonstrate modifying events produced by the Cloudant source connector.
For certain use cases, such as sending events to a relational database sink, it may be desirable to flatten nested fields:
-
Flatten maps in the JSON document using the Kafka built-in
org.apache.kafka.connect.transforms.Flatten$Value
transforms=FlattenMaps transforms.FlattenMaps.type=org.apache.kafka.connect.transforms.Flatten$Value
-
Flatten arrays in the JSON document using
com.ibm.cloud.cloudant.kafka.transforms.ArrayFlatten
. Note that this transform is only suitable for use with Map event values and will filter events that do not conform. As such if used in conjunction with theMapToStruct
transform, thisArrayFlatten
operation must precedeMapToStruct
in the SMT pipeline. Thedelimiter
configuration property may be used to customize the delimiter, which defaults to.
.transforms=FlattenArrays transforms.FlattenArrays.type=com.ibm.cloud.cloudant.kafka.transforms.ArrayFlatten
- Convert schemaless
java.util.Map
values toorg.apache.kafka.connect.data.Struct
with an inferred schema. This transform is designed to improve compatibility with other connectors and converters that requires aStruct
type event. For complex schemas a schema registry should be used.transforms=MapToStruct transforms.MapToStruct.type=com.ibm.cloud.cloudant.kafka.transforms.MapToStruct
- Omit design documents from the produced events by using the Kafka built-in
org.apache.kafka.connect.transforms.Filter
in conjunction with the predicatecom.ibm.cloud.cloudant.kafka.transforms.predicates.IsDesignDocument
. Note that this predicate relies on the key format of the Cloudant source connector events so must be applied before any other transformations that alter the key format.transforms=OmitDesignDocs transforms.OmitDesignDocs.type=org.apache.kafka.connect.transforms.Filter transforms.OmitDesignDocs.predicate=isDesignDoc predicates=IsDesignDoc predicates.IsDesignDoc.type=com.ibm.cloud.cloudant.kafka.transforms.predicates.IsDesignDocument
-
If you have events where there is no value (tombstone events), you may wish to filter these out.
- In the Cloudant sink connector, these may be undesirable as they will generate an empty document.
- In the Cloudant source connector, tombstone events are generated for deleted documents (in addition to the deleted document itself).
- In either case, you can use the
RecordIsTombstone
predicate with a filter to remove these tombstone events as shown in this example:
transforms=DropNullEvents transforms.DropNullEvents.type=org.apache.kafka.connect.transforms.Filter transforms.DropNullEvents.predicate=isNullEvent predicates=IsNullEvent predicates.IsNullEvent.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone