Skip to content

Commit

Permalink
proof of concept for a fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mkrzemien committed Aug 18, 2022
1 parent af989af commit 6d766fd
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.softwaremill.kmq.redelivery.streams

import akka.Done
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Cancellable}
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.softwaremill.kmq._
Expand All @@ -18,13 +18,13 @@ object RedeliverySink extends StrictLogging {

def apply(partition: Partition)
(implicit system: ActorSystem, kafkaClients: KafkaClients, kmqConfig: KmqConfig, clock: Clock
): Sink[CommittableMessage[MarkerKey, MarkerValue], Future[Done]] = {
): Sink[CommittableMessage[MarkerKey, MarkerValue], (Cancellable, Future[Done])] = {
val producer = kafkaClients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
val redeliverer = new RetryingRedeliverer(new DefaultRedeliverer(partition, producer, kmqConfig, kafkaClients))

Flow[CommittableMessage[MarkerKey, MarkerValue]]
.map(MarkerRedeliveryCommand)
.merge(Source.tick(initialDelay = 1.second, interval = 1.second, tick = TickRedeliveryCommand))
.mergeMat(Source.tick(initialDelay = 1.second, interval = 1.second, tick = TickRedeliveryCommand))(Keep.right)
.statefulMapConcat { () => // keep track of open markers; select markers to redeliver
val markersByTimestamp = new PriorityQueueMap[MarkerKey, MsgWithTimestamp](valueOrdering = bySmallestTimestampAscending)
var latestMarkerSeenTimestamp: Option[Timestamp] = None
Expand Down Expand Up @@ -77,7 +77,7 @@ object RedeliverySink extends StrictLogging {
.toMat(Sink.foreach { msg => // redeliver
redeliverer.redeliver(List(msg.record.key)) // TODO: maybe bulk redeliver

})(Keep.right)
})(Keep.both)
}

private def bySmallestTimestampAscending(implicit ord: Ordering[Timestamp]): Ordering[MsgWithTimestamp] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.softwaremill.kmq.redelivery.streams

import akka.Done
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Cancellable}
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.kafka.scaladsl.Consumer.{Control, DrainingControl}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, RunnableGraph, Sink}
import com.softwaremill.kmq._
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.{Metric, MetricName}

import java.time.Clock
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.{ExecutionContext, Future}

/**
Expand All @@ -22,28 +24,53 @@ class RedeliveryTrackerStream(markerConsumerSettings: ConsumerSettings[MarkerKey
(implicit system: ActorSystem, ec: ExecutionContext,
kafkaClients: KafkaClients, kmqConfig: KmqConfig, clock: Clock) extends StrictLogging {

def run(): DrainingControl[Done] = {
val cancellables: java.util.Set[Cancellable] = ConcurrentHashMap.newKeySet()

class MyDrainingControl[T](control: Control, future: Future[T]) extends Control {
private val drainingControl = DrainingControl(control, future)

override def stop(): Future[Done] = {
cancellables.forEach(_.cancel)
drainingControl.stop()
}

def drainAndShutdown()(implicit ec: ExecutionContext): Future[T] = {
cancellables.forEach(_.cancel)
drainingControl.drainAndShutdown()
}

override def shutdown(): Future[Done] = throw new UnsupportedOperationException()

override def isShutdown: Future[Done] = drainingControl.isShutdown

override def metrics: Future[Map[MetricName, Metric]] = drainingControl.metrics
}

def run(): MyDrainingControl[Done] = {
Consumer.committablePartitionedSource(markerConsumerSettings, Subscriptions.topics(markersTopic))
.mapAsyncUnordered(maxPartitions) {
case (topicPartition, source) =>
val redeliverySink = RedeliverySink(topicPartition.partition)
val commitMarkerSink = CommitMarkerSink()

RunnableGraph
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(combineFutures) {
implicit builder => (sink1, sink2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[CommittableMessage[MarkerKey, MarkerValue]](2))
source ~> broadcast
broadcast.out(0) ~> Flow[CommittableMessage[MarkerKey, MarkerValue]].async ~> sink1
broadcast.out(1) ~> Flow[CommittableMessage[MarkerKey, MarkerValue]].async ~> sink2
ClosedShape
val cancellable = RunnableGraph
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(Keep.left) {
implicit builder =>
(sink1, sink2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[CommittableMessage[MarkerKey, MarkerValue]](2))
source ~> broadcast
broadcast.out(0) ~> Flow[CommittableMessage[MarkerKey, MarkerValue]].async ~> sink1
broadcast.out(1) ~> Flow[CommittableMessage[MarkerKey, MarkerValue]].async ~> sink2
ClosedShape
})
.run()
.run()

cancellables.add(cancellable)

Future.successful(())
}
.toMat(Sink.ignore)(DrainingControl.apply)
.toMat(Sink.ignore)((c, f) => new MyDrainingControl(c, f))
.run()
}

private def combineFutures[T](l: Future[T], r: Future[T]): Future[T] = l.flatMap(_ => r)
}
}

0 comments on commit 6d766fd

Please sign in to comment.