diff --git a/core/src/main/scala/com.softwaremill.kmq/redelivery/CommitMarkerOffsetsActor.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/CommitMarkerOffsetsActor.scala similarity index 100% rename from core/src/main/scala/com.softwaremill.kmq/redelivery/CommitMarkerOffsetsActor.scala rename to core/src/main/scala/com/softwaremill/kmq/redelivery/CommitMarkerOffsetsActor.scala diff --git a/core/src/main/scala/com.softwaremill.kmq/redelivery/ConsumeMarkersActor.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/ConsumeMarkersActor.scala similarity index 100% rename from core/src/main/scala/com.softwaremill.kmq/redelivery/ConsumeMarkersActor.scala rename to core/src/main/scala/com/softwaremill/kmq/redelivery/ConsumeMarkersActor.scala diff --git a/core/src/main/scala/com.softwaremill.kmq/redelivery/MarkersQueue.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/MarkersQueue.scala similarity index 100% rename from core/src/main/scala/com.softwaremill.kmq/redelivery/MarkersQueue.scala rename to core/src/main/scala/com/softwaremill/kmq/redelivery/MarkersQueue.scala diff --git a/core/src/main/scala/com.softwaremill.kmq/redelivery/RedeliverActor.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/RedeliverActor.scala similarity index 100% rename from core/src/main/scala/com.softwaremill.kmq/redelivery/RedeliverActor.scala rename to core/src/main/scala/com/softwaremill/kmq/redelivery/RedeliverActor.scala diff --git a/core/src/main/scala/com.softwaremill.kmq/redelivery/Redeliverer.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/Redeliverer.scala similarity index 100% rename from core/src/main/scala/com.softwaremill.kmq/redelivery/Redeliverer.scala rename to core/src/main/scala/com/softwaremill/kmq/redelivery/Redeliverer.scala diff --git a/core/src/main/scala/com.softwaremill.kmq/redelivery/RedeliveryActors.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/RedeliveryActors.scala similarity index 100% rename from core/src/main/scala/com.softwaremill.kmq/redelivery/RedeliveryActors.scala rename to core/src/main/scala/com/softwaremill/kmq/redelivery/RedeliveryActors.scala diff --git a/core/src/main/scala/com.softwaremill.kmq/redelivery/package.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/package.scala similarity index 100% rename from core/src/main/scala/com.softwaremill.kmq/redelivery/package.scala rename to core/src/main/scala/com/softwaremill/kmq/redelivery/package.scala diff --git a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/CommitMarkerSink.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/CommitMarkerSink.scala index 4e1db3b0..188006d4 100644 --- a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/CommitMarkerSink.scala +++ b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/CommitMarkerSink.scala @@ -1,6 +1,6 @@ package com.softwaremill.kmq.redelivery.streams -import akka.Done +import akka.NotUsed import akka.actor.ActorSystem import akka.kafka.CommitterSettings import akka.kafka.ConsumerMessage.CommittableMessage @@ -9,11 +9,9 @@ import akka.stream.scaladsl.{Flow, Keep, Sink} import com.softwaremill.kmq.redelivery.Offset import com.softwaremill.kmq.{EndMarker, MarkerKey, MarkerValue, StartMarker} -import scala.concurrent.Future - object CommitMarkerSink { - def apply()(implicit system: ActorSystem): Sink[CommittableMessage[MarkerKey, MarkerValue], Future[Done]] = { + def apply()(implicit system: ActorSystem): Sink[CommittableMessage[MarkerKey, MarkerValue], NotUsed] = { val committerSettings = CommitterSettings(system) Flow[CommittableMessage[MarkerKey, MarkerValue]] @@ -45,8 +43,7 @@ object CommitMarkerSink { prev } .map(_.committableOffset) - .via(Committer.flow(committerSettings)) - .toMat(Sink.ignore)(Keep.right) + .toMat(Committer.sink(committerSettings))(Keep.left) } def bySmallestOffsetAscending(implicit ord: Ordering[Offset]): Ordering[CommittableMessage[MarkerKey, MarkerValue]] = diff --git a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliverySink.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliverySink.scala index 9b7e6cbd..e1c94ac0 100644 --- a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliverySink.scala +++ b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliverySink.scala @@ -1,7 +1,6 @@ package com.softwaremill.kmq.redelivery.streams -import akka.Done -import akka.actor.ActorSystem +import akka.actor.{ActorSystem, Cancellable} import akka.kafka.ConsumerMessage.CommittableMessage import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import com.softwaremill.kmq._ @@ -11,20 +10,18 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import java.time.{Clock, Instant} import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Future import scala.concurrent.duration.DurationInt object RedeliverySink extends StrictLogging { - def apply(partition: Partition) - (implicit system: ActorSystem, kafkaClients: KafkaClients, kmqConfig: KmqConfig, clock: Clock - ): Sink[CommittableMessage[MarkerKey, MarkerValue], Future[Done]] = { + def apply(kafkaClients: KafkaClients, kmqConfig: KmqConfig)(partition: Partition) + (implicit system: ActorSystem, clock: Clock): Sink[CommittableMessage[MarkerKey, MarkerValue], Cancellable] = { val producer = kafkaClients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer]) val redeliverer = new RetryingRedeliverer(new DefaultRedeliverer(partition, producer, kmqConfig, kafkaClients)) Flow[CommittableMessage[MarkerKey, MarkerValue]] .map(MarkerRedeliveryCommand) - .merge(Source.tick(initialDelay = 1.second, interval = 1.second, tick = TickRedeliveryCommand)) + .mergeMat(Source.tick(initialDelay = 1.second, interval = 1.second, tick = TickRedeliveryCommand))(Keep.right) .statefulMapConcat { () => // keep track of open markers; select markers to redeliver val markersByTimestamp = new PriorityQueueMap[MarkerKey, MsgWithTimestamp](valueOrdering = bySmallestTimestampAscending) var latestMarkerSeenTimestamp: Option[Timestamp] = None @@ -74,13 +71,9 @@ object RedeliverySink extends StrictLogging { toRedeliver } } - .statefulMapConcat { () => // redeliver - msg => { + .toMat(Sink.foreach { msg => // redeliver redeliverer.redeliver(List(msg.record.key)) // TODO: maybe bulk redeliver - Some(Done) - } - } - .toMat(Sink.ignore)(Keep.right) + })(Keep.left) } private def bySmallestTimestampAscending(implicit ord: Ordering[Timestamp]): Ordering[MsgWithTimestamp] = diff --git a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTracker.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTracker.scala index 370b24f9..48e1a2f8 100644 --- a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTracker.scala +++ b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTracker.scala @@ -13,7 +13,7 @@ import scala.concurrent.{Await, ExecutionContext} object RedeliveryTracker extends StrictLogging { - def start()(implicit kafkaClients: KafkaClients, config: KmqConfig): Closeable = { + def start(kafkaClients: KafkaClients, kmqConfig: KmqConfig): Closeable = { implicit val system: ActorSystem = ActorSystem("kmq-redelivery") implicit val ec: ExecutionContext = system.dispatcher implicit val clock: Clock = Clock.systemDefaultZone() @@ -21,15 +21,18 @@ object RedeliveryTracker extends StrictLogging { implicit val markerValueDeserializer: Deserializer[MarkerValue] = new MarkerValue.MarkerValueDeserializer() val markerConsumerSettings = ConsumerSettings(system, markerKeyDeserializer, markerValueDeserializer) - .withBootstrapServers(config.getBootstrapServers) - .withGroupId(config.getRedeliveryConsumerGroupId) - .withProperties(config.getConsumerProps) + .withBootstrapServers(kmqConfig.getBootstrapServers) + .withGroupId(kmqConfig.getRedeliveryConsumerGroupId) + .withProperties(kmqConfig.getConsumerProps) - val streamControl = new RedeliveryTrackerStream(markerConsumerSettings, config.getMarkerTopic, Int.MaxValue) + val streamControl = new RedeliveryTrackerStream(markerConsumerSettings, + kafkaClients, kmqConfig, Int.MaxValue) .run() logger.info("Started redelivery stream") - () => Await.result(streamControl.drainAndShutdown().andThen { case _ => system.terminate() }, 1.minute) + () => { + Await.result(streamControl.drainAndShutdown().flatMap(_ => system.terminate()), 1.minute) + } } } diff --git a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStream.scala b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStream.scala index 04e187a1..def57cfa 100644 --- a/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStream.scala +++ b/core/src/main/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStream.scala @@ -1,37 +1,38 @@ package com.softwaremill.kmq.redelivery.streams import akka.Done -import akka.actor.ActorSystem +import akka.actor.{ActorSystem, Cancellable} import akka.kafka.ConsumerMessage.CommittableMessage import akka.kafka.scaladsl.Consumer -import akka.kafka.scaladsl.Consumer.DrainingControl +import akka.kafka.scaladsl.Consumer.{Control, DrainingControl} import akka.kafka.{ConsumerSettings, Subscriptions} -import akka.stream.ClosedShape -import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink} +import akka.stream._ +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, RunnableGraph, Sink} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler} import com.softwaremill.kmq._ import com.typesafe.scalalogging.StrictLogging +import org.apache.kafka.common.{Metric, MetricName} import java.time.Clock +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} /** * Combines functionality of [[RedeliverySink]] and [[CommitMarkerSink]]. */ class RedeliveryTrackerStream(markerConsumerSettings: ConsumerSettings[MarkerKey, MarkerValue], - markersTopic: String, maxPartitions: Int) - (implicit system: ActorSystem, ec: ExecutionContext, - kafkaClients: KafkaClients, kmqConfig: KmqConfig, clock: Clock) extends StrictLogging { + kafkaClients: KafkaClients, kmqConfig: KmqConfig, maxPartitions: Int) + (implicit system: ActorSystem, ec: ExecutionContext, clock: Clock) extends StrictLogging { def run(): DrainingControl[Done] = { - Consumer.committablePartitionedSource(markerConsumerSettings, Subscriptions.topics(markersTopic)) + Consumer.committablePartitionedSource(markerConsumerSettings, Subscriptions.topics(kmqConfig.getMarkerTopic)) .mapAsyncUnordered(maxPartitions) { case (topicPartition, source) => - - val redeliverySink = RedeliverySink(topicPartition.partition) + val redeliverySink = RedeliverySink(kafkaClients, kmqConfig)(topicPartition.partition) val commitMarkerSink = CommitMarkerSink() - RunnableGraph - .fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(combineFutures) { + val cancellable = RunnableGraph + .fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(Keep.left) { implicit builder => (sink1, sink2) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[CommittableMessage[MarkerKey, MarkerValue]](2)) @@ -41,10 +42,64 @@ class RedeliveryTrackerStream(markerConsumerSettings: ConsumerSettings[MarkerKey ClosedShape }) .run() + + Future.successful(cancellable) } + .viaMat(Flow.fromGraph(new AggregatingToMatFlow()))(ControlWithCancellable.apply) .toMat(Sink.ignore)(DrainingControl.apply) .run() } +} + +class AggregatingToMatFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], Iterable[T]] { + val in: Inlet[T] = Inlet("in") + val out: Outlet[T] = Outlet("out") + + override def shape: FlowShape[T, T] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Iterable[T]) = { + val logic = new GraphStageLogic(shape) { + private val _values: mutable.Set[T] = mutable.Set() + + def values: Iterable[T] = _values + + setHandlers(in, out, new InHandler with OutHandler { + override def onPush(): Unit = { + val v = grab(in) + _values += v + push(out, v) + } + + override def onPull(): Unit = { + pull(in) + } + }) + } + + logic -> logic.values + } +} + +class ControlWithCancellable(control: Consumer.Control, cancellables: Iterable[Cancellable]) extends Control { + + override def stop(): Future[Done] = { + cancellables.foreach(_.cancel()) + control.stop() + } + + override def shutdown(): Future[Done] = { + cancellables.foreach(_.cancel()) + control.shutdown() + } + + override def isShutdown: Future[Done] = + control.isShutdown + + override def metrics: Future[Map[MetricName, Metric]] = + control.metrics +} - private def combineFutures(l: Future[Done], r: Future[Done]): Future[Done] = l.flatMap(_ => r) -} \ No newline at end of file +object ControlWithCancellable { + def apply(control: Consumer.Control, cancellables: Iterable[Cancellable]): ControlWithCancellable = + new ControlWithCancellable(control, cancellables) +} diff --git a/core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala b/core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala index a266fed2..8dc3eeb1 100644 --- a/core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala +++ b/core/src/test/scala/com/softwaremill/kmq/redelivery/IntegrationTest.scala @@ -34,7 +34,9 @@ class IntegrationTest "KMQ" should "resend message if not committed" in { val bootstrapServer = s"localhost:${testKafkaConfig.kafkaPort}" val uid = UUID.randomUUID().toString - val kmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000) + + val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000) + val kafkaClients: KafkaClients = new KafkaClients(kmqConfig) val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServer) @@ -84,7 +86,7 @@ class IntegrationTest .to(Producer.plainSink(markerProducerSettings)) // 5. write "end" markers .run() - val redeliveryHook = RedeliveryTracker.start(new KafkaClients(kmqConfig), kmqConfig) + val redeliveryHook = streams.RedeliveryTracker.start(kafkaClients, kmqConfig) val messages = (0 to 20).map(_.toString) messages.foreach(msg => sendToKafka(kmqConfig.getMsgTopic, msg)) @@ -92,16 +94,18 @@ class IntegrationTest eventually { receivedMessages.size should be > processedMessages.size processedMessages.sortBy(_.toInt).distinct shouldBe messages - }(PatienceConfig(timeout = Span(15, Seconds)), implicitly, implicitly) + }(PatienceConfig(timeout = Span(20, Seconds)), implicitly, implicitly) redeliveryHook.close() control.shutdown() } - "KMQ" should "resend message if max redelivery count not exceeded" in { + it should "resend message if max redelivery count not exceeded" in { val bootstrapServer = s"localhost:${testKafkaConfig.kafkaPort}" val uid = UUID.randomUUID().toString - val kmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000) + + val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000) + val kafkaClients: KafkaClients = new KafkaClients(kmqConfig) val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServer) @@ -151,7 +155,7 @@ class IntegrationTest val undeliveredControl = Consumer .plainSource( consumerSettings, - Subscriptions.topics(s"${kmqConfig.getMsgTopic}__undelivered") + Subscriptions.topics(kmqConfig.getDeadLetterTopic) ) // 1. get messages from dead-letter topic .map { msg => undeliveredMessages += msg.value @@ -160,7 +164,7 @@ class IntegrationTest .to(Sink.ignore) .run() - val redeliveryHook = RedeliveryTracker.start(new KafkaClients(kmqConfig), kmqConfig) + val redeliveryHook = streams.RedeliveryTracker.start(kafkaClients, kmqConfig) val messages = (0 to 6).map(_.toString) messages.foreach(msg => sendToKafka(kmqConfig.getMsgTopic, msg)) @@ -170,7 +174,7 @@ class IntegrationTest eventually { receivedMessages.sortBy(_.toInt) shouldBe expectedReceived undeliveredMessages.sortBy(_.toInt) shouldBe expectedUndelivered - }(PatienceConfig(timeout = Span(15, Seconds)), implicitly, implicitly) + }(PatienceConfig(timeout = Span(20, Seconds)), implicitly, implicitly) redeliveryHook.close() control.shutdown() diff --git a/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/LearningTest.scala b/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/LearningTest.scala index bbd628dc..ef1afa4d 100644 --- a/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/LearningTest.scala +++ b/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/LearningTest.scala @@ -21,7 +21,7 @@ class LearningTest extends TestKit(ActorSystem("test-system")) with AnyFlatSpecL implicit val materializer: Materializer = akka.stream.Materializer.matFromSystem implicit val ec: ExecutionContext = system.dispatcher - "FooStream" should "merge ticks" in { + it should "merge ticks" in { Source .tick(0.1.second, 0.1.second, "tick") .merge(Source.tick(0.17.second, 0.17.second, "tack")) @@ -29,7 +29,7 @@ class LearningTest extends TestKit(ActorSystem("test-system")) with AnyFlatSpecL .runWith(TestSink[String]).request(10).expectNextN(10) } - "FooStream" should "broadcast to sinks" in { + it should "broadcast to sinks" in { val source = Source.fromIterator(() => Seq(1, 2, 3).iterator) val multiplyBy2: Sink[Int, Future[Done]] = Flow[Int].map(_ * 2).wireTap(x => println(s"2: $x")).toMat(Sink.ignore)(Keep.right) @@ -49,7 +49,7 @@ class LearningTest extends TestKit(ActorSystem("test-system")) with AnyFlatSpecL .run() } - "FooStream" should "async broadcast to sinks" in { + it should "async broadcast to sinks" in { val source = Source.fromIterator(() => Seq(1, 2, 3).iterator) val multiplyBy2: Sink[Int, Future[Done]] = Flow[Int].map(_ * 2).wireTap(x => println(s"2: $x")).toMat(Sink.ignore)(Keep.right) diff --git a/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStreamIntegrationTest.scala b/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStreamIntegrationTest.scala index 1befa0af..aa389337 100644 --- a/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStreamIntegrationTest.scala +++ b/core/src/test/scala/com/softwaremill/kmq/redelivery/streams/RedeliveryTrackerStreamIntegrationTest.scala @@ -39,9 +39,9 @@ class RedeliveryTrackerStreamIntegrationTest extends TestKit(ActorSystem("test-s val maxRedeliveryCount = 1 val redeliverAfterMs = 300 - implicit val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", + val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000, s"${uid}__undelivered", "kmq-redelivery-count", maxRedeliveryCount, Collections.emptyMap()) - implicit val kafkaClients: KafkaClients = new KafkaClients(kmqConfig) + val kafkaClients: KafkaClients = new KafkaClients(kmqConfig) val markerConsumerSettings = ConsumerSettings(system, markerKeyDeserializer, markerValueDeserializer) .withBootstrapServers(bootstrapServer) @@ -49,7 +49,7 @@ class RedeliveryTrackerStreamIntegrationTest extends TestKit(ActorSystem("test-s .withProperties(kmqConfig.getConsumerProps) val streamControl = new RedeliveryTrackerStream(markerConsumerSettings, - kmqConfig.getMarkerTopic, Int.MaxValue) + kafkaClients, kmqConfig, Int.MaxValue) .run() createTopic(kmqConfig.getMsgTopic) @@ -74,9 +74,9 @@ class RedeliveryTrackerStreamIntegrationTest extends TestKit(ActorSystem("test-s val uid = UUID.randomUUID().toString val redeliverAfterMs = 300 - implicit val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", + val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000) - implicit val kafkaClients: KafkaClients = new KafkaClients(kmqConfig) + val kafkaClients: KafkaClients = new KafkaClients(kmqConfig) val markerConsumerSettings = ConsumerSettings(system, markerKeyDeserializer, markerValueDeserializer) .withBootstrapServers(bootstrapServer) @@ -84,7 +84,7 @@ class RedeliveryTrackerStreamIntegrationTest extends TestKit(ActorSystem("test-s .withProperties(kmqConfig.getConsumerProps) val streamControl = new RedeliveryTrackerStream(markerConsumerSettings, - kmqConfig.getMarkerTopic, Int.MaxValue) + kafkaClients, kmqConfig, Int.MaxValue) .run() createTopic(kmqConfig.getMarkerTopic) diff --git a/example-scala/src/main/scala/com/softwaremill/kmq/example/EmbeddedStream.scala b/example-scala/src/main/scala/com/softwaremill/kmq/example/EmbeddedStream.scala index 1003fc54..0215c093 100644 --- a/example-scala/src/main/scala/com/softwaremill/kmq/example/EmbeddedStream.scala +++ b/example-scala/src/main/scala/com/softwaremill/kmq/example/EmbeddedStream.scala @@ -19,10 +19,10 @@ object EmbeddedStream extends StrictLogging { private val FAIL_RATIO = 0.5 private val PARTITIONS = 1 - private implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig.defaultConfig - private implicit val kmqConfig: KmqConfig = new KmqConfig("localhost:" + kafkaConfig.kafkaPort, "queue", "markers", "kmq_client", "kmq_redelivery", + private val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig.defaultConfig + private val kmqConfig: KmqConfig = new KmqConfig("localhost:" + kafkaConfig.kafkaPort, "queue", "markers", "kmq_client", "kmq_redelivery", Duration.ofSeconds(3).toMillis, 1000) - private implicit val clients: KafkaClients = new KafkaClients(kmqConfig) + private val clients: KafkaClients = new KafkaClients(kmqConfig) private val random: Random = new Random(0) private val processedMessages = new ConcurrentHashMap[Integer, Integer] @@ -34,7 +34,7 @@ object EmbeddedStream extends StrictLogging { EmbeddedKafka.createCustomTopic(kmqConfig.getMsgTopic, partitions = PARTITIONS) logger.info("Kafka started") - val redelivery = RedeliveryTracker.start() + val redelivery = RedeliveryTracker.start(clients, kmqConfig) sleep(1000) // Wait for the stream to warm up TODO: analyze startInBackground(() => processMessages(clients, kmqConfig))