From 476f38d8b80a660aafd5bd6d7a2cffe0ce0ea722 Mon Sep 17 00:00:00 2001 From: this-Aditya Date: Fri, 6 Dec 2024 21:46:10 +0530 Subject: [PATCH 1/4] Modified typeInfo creation for sender --- .../producer/rest/RadarParameterizedType.kt | 25 +++++++++++++++++++ .../producer/rest/RestKafkaSender.kt | 20 +++++++-------- 2 files changed, 35 insertions(+), 10 deletions(-) create mode 100644 radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt new file mode 100644 index 00000000..7d995ee4 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt @@ -0,0 +1,25 @@ +package org.radarbase.producer.rest + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type + +class RadarParameterizedType( + private val raw: Class<*>, + private val args: Array, + private val owner: Type? = null +) : ParameterizedType { + override fun getRawType(): Type = raw + override fun getActualTypeArguments(): Array = args + override fun getOwnerType(): Type? = owner + + override fun toString(): String { + return buildString { + append(raw.typeName) + if (args.isNotEmpty()) { + append('<') + append(args.joinToString(", ") { it.typeName }) + append('>') + } + } + } +} diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt index 9c3cc082..ad641cdb 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt @@ -61,8 +61,6 @@ import org.radarbase.topic.AvroTopic import org.radarbase.util.RadarProducerDsl import org.slf4j.LoggerFactory import java.io.IOException -import kotlin.reflect.javaType -import kotlin.reflect.typeOf import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -143,6 +141,16 @@ class RestKafkaSender(config: Config) : KafkaSender { inner class RestKafkaTopicSender( override val topic: AvroTopic, ) : KafkaTopicSender { + + val recordDataTypeInfo: TypeInfo = TypeInfo( + type = RecordData::class, + kotlinType = null, + reifiedType = RadarParameterizedType( + raw = RecordData::class.java, + args = arrayOf(topic.keyClass, topic.valueClass) + ) + ) + override suspend fun send(records: RecordData) = withContext(scope.coroutineContext) { try { val response: HttpResponse = restClient.post { @@ -275,7 +283,6 @@ class RestKafkaSender(config: Config) : KafkaSender { companion object { private val logger = LoggerFactory.getLogger(RestKafkaSender::class.java) - private val recordDataTypeInfo: TypeInfo val DEFAULT_TIMEOUT: Duration = 20.seconds val KAFKA_REST_BINARY_ENCODING = ContentType("application", "vnd.radarbase.avro.v1+binary") @@ -283,13 +290,6 @@ class RestKafkaSender(config: Config) : KafkaSender { val KAFKA_REST_ACCEPT = ContentType("application", "vnd.kafka.v2+json") const val GZIP_CONTENT_ENCODING = "gzip" - init { - val kType = typeOf>() - - @OptIn(ExperimentalStdlibApi::class) - val reifiedType = kType.javaType - recordDataTypeInfo = TypeInfo(RecordData::class, reifiedType, kType) - } fun restKafkaSender(builder: Config.() -> Unit): RestKafkaSender = RestKafkaSender(Config().apply(builder)) From 4148a827722691bb5bde004615842c72c59e1e65 Mon Sep 17 00:00:00 2001 From: this-Aditya Date: Fri, 6 Dec 2024 21:47:30 +0530 Subject: [PATCH 2/4] Replacing ::metadata with ::getByVersion in content converters --- .../org/radarbase/producer/rest/AvroContentConverter.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt index 9b3a184f..35b8d64f 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt @@ -25,17 +25,17 @@ class AvroContentConverter( return coroutineScope { val keySchema = async { - schemaRetriever.metadata( + schemaRetriever.getByVersion( topic = value.topic.name, ofValue = false, - schema = value.topic.keySchema, + version = -1, ) } val valueSchema = async { - schemaRetriever.metadata( + schemaRetriever.getByVersion( topic = value.topic.name, ofValue = true, - schema = value.topic.valueSchema, + version = -1, ) } val maker = if (binary) { From 3d1e37e8891e2c8923541e5f4423efe1a3cf40ae Mon Sep 17 00:00:00 2001 From: this-Aditya Date: Fri, 6 Dec 2024 22:19:16 +0530 Subject: [PATCH 3/4] fix CI --- .../org/radarbase/producer/rest/RadarParameterizedType.kt | 2 +- .../main/java/org/radarbase/producer/rest/RestKafkaSender.kt | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt index 7d995ee4..392725d3 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt @@ -6,7 +6,7 @@ import java.lang.reflect.Type class RadarParameterizedType( private val raw: Class<*>, private val args: Array, - private val owner: Type? = null + private val owner: Type? = null, ) : ParameterizedType { override fun getRawType(): Type = raw override fun getActualTypeArguments(): Array = args diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt index ad641cdb..11efb53c 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt @@ -147,8 +147,8 @@ class RestKafkaSender(config: Config) : KafkaSender { kotlinType = null, reifiedType = RadarParameterizedType( raw = RecordData::class.java, - args = arrayOf(topic.keyClass, topic.valueClass) - ) + args = arrayOf(topic.keyClass, topic.valueClass), + ), ) override suspend fun send(records: RecordData) = withContext(scope.coroutineContext) { @@ -290,7 +290,6 @@ class RestKafkaSender(config: Config) : KafkaSender { val KAFKA_REST_ACCEPT = ContentType("application", "vnd.kafka.v2+json") const val GZIP_CONTENT_ENCODING = "gzip" - fun restKafkaSender(builder: Config.() -> Unit): RestKafkaSender = RestKafkaSender(Config().apply(builder)) } From 7e1f45270f0defbf2a51cf6868f45636500cf9ae Mon Sep 17 00:00:00 2001 From: this-Aditya Date: Fri, 6 Dec 2024 22:49:13 +0530 Subject: [PATCH 4/4] Passing test for RestKafkaSender --- .../producer/rest/RestKafkaSender.kt | 2 +- .../producer/rest/RestKafkaSenderTest.kt | 28 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt index 11efb53c..efccbf84 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt @@ -148,7 +148,7 @@ class RestKafkaSender(config: Config) : KafkaSender { reifiedType = RadarParameterizedType( raw = RecordData::class.java, args = arrayOf(topic.keyClass, topic.valueClass), - ), + ), ) override suspend fun send(records: RecordData) = withContext(scope.coroutineContext) { diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt b/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt index 2194ba9a..d2c65432 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt @@ -101,15 +101,15 @@ class RestKafkaSenderTest { val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema) val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } webServer.enqueueJson("{\"offset\": 100}") topicSender.send(key, value) verify(retriever, times(1)) - .metadata("test", false, keySchema) + .getByVersion("test", false, -1) verify(retriever, times(1)) - .metadata("test", true, valueSchema) + .getByVersion("test", true, -1) val request = webServer.takeRequest() assertEquals("/topics/test", request.path) val body = READER.readTree(request.body.inputStream()) @@ -145,15 +145,15 @@ class RestKafkaSenderTest { val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema) val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } webServer.enqueueJson("{\"offset\": 100}") topicSender.send(key, value) verify(retriever, times(1)) - .metadata("test", false, keySchema) + .getByVersion("test", false, -1) verify(retriever, times(1)) - .metadata("test", true, valueSchema) + .getByVersion("test", true, -1) val request = webServer.takeRequest() assertEquals("/topics/test", request.path) var decoder = DecoderFactory.get().directBinaryDecoder(request.body.inputStream(), null) @@ -192,15 +192,15 @@ class RestKafkaSenderTest { val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } webServer.enqueueJson("{\"offset\": 100}") topicSender.send(AvroRecordData(topic, key, listOf(value, value))) verify(retriever, times(1)) - .metadata("test", false, keySchema) + .getByVersion("test", false, -1) verify(retriever, times(1)) - .metadata("test", true, valueSchema) + .getByVersion("test", true, -1) val request = webServer.takeRequest() assertEquals("/topics/test", request.path) val bodyString = request.body.readString(StandardCharsets.UTF_8) @@ -293,8 +293,8 @@ class RestKafkaSenderTest { val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema) val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } topicSender.send(key, value)