Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

36 fix streams drain control #37

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.softwaremill.kmq.redelivery.streams

import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage.CommittableMessage
Expand All @@ -9,11 +9,9 @@ import akka.stream.scaladsl.{Flow, Keep, Sink}
import com.softwaremill.kmq.redelivery.Offset
import com.softwaremill.kmq.{EndMarker, MarkerKey, MarkerValue, StartMarker}

import scala.concurrent.Future

object CommitMarkerSink {

def apply()(implicit system: ActorSystem): Sink[CommittableMessage[MarkerKey, MarkerValue], Future[Done]] = {
def apply()(implicit system: ActorSystem): Sink[CommittableMessage[MarkerKey, MarkerValue], NotUsed] = {
val committerSettings = CommitterSettings(system)

Flow[CommittableMessage[MarkerKey, MarkerValue]]
Expand Down Expand Up @@ -45,8 +43,7 @@ object CommitMarkerSink {
prev
}
.map(_.committableOffset)
.via(Committer.flow(committerSettings))
.toMat(Sink.ignore)(Keep.right)
.toMat(Committer.sink(committerSettings))(Keep.left)
}

def bySmallestOffsetAscending(implicit ord: Ordering[Offset]): Ordering[CommittableMessage[MarkerKey, MarkerValue]] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.softwaremill.kmq.redelivery.streams

import akka.Done
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Cancellable}
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.softwaremill.kmq._
Expand All @@ -11,20 +10,18 @@ import org.apache.kafka.common.serialization.ByteArraySerializer

import java.time.{Clock, Instant}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

object RedeliverySink extends StrictLogging {

def apply(partition: Partition)
(implicit system: ActorSystem, kafkaClients: KafkaClients, kmqConfig: KmqConfig, clock: Clock
): Sink[CommittableMessage[MarkerKey, MarkerValue], Future[Done]] = {
def apply(kafkaClients: KafkaClients, kmqConfig: KmqConfig)(partition: Partition)
(implicit system: ActorSystem, clock: Clock): Sink[CommittableMessage[MarkerKey, MarkerValue], Cancellable] = {
val producer = kafkaClients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
val redeliverer = new RetryingRedeliverer(new DefaultRedeliverer(partition, producer, kmqConfig, kafkaClients))

Flow[CommittableMessage[MarkerKey, MarkerValue]]
.map(MarkerRedeliveryCommand)
.merge(Source.tick(initialDelay = 1.second, interval = 1.second, tick = TickRedeliveryCommand))
.mergeMat(Source.tick(initialDelay = 1.second, interval = 1.second, tick = TickRedeliveryCommand))(Keep.right)
.statefulMapConcat { () => // keep track of open markers; select markers to redeliver
val markersByTimestamp = new PriorityQueueMap[MarkerKey, MsgWithTimestamp](valueOrdering = bySmallestTimestampAscending)
var latestMarkerSeenTimestamp: Option[Timestamp] = None
Expand Down Expand Up @@ -74,13 +71,9 @@ object RedeliverySink extends StrictLogging {
toRedeliver
}
}
.statefulMapConcat { () => // redeliver
msg => {
.toMat(Sink.foreach { msg => // redeliver
redeliverer.redeliver(List(msg.record.key)) // TODO: maybe bulk redeliver
Some(Done)
}
}
.toMat(Sink.ignore)(Keep.right)
})(Keep.left)
}

private def bySmallestTimestampAscending(implicit ord: Ordering[Timestamp]): Ordering[MsgWithTimestamp] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,26 @@ import scala.concurrent.{Await, ExecutionContext}

object RedeliveryTracker extends StrictLogging {

def start()(implicit kafkaClients: KafkaClients, config: KmqConfig): Closeable = {
def start(kafkaClients: KafkaClients, kmqConfig: KmqConfig): Closeable = {
implicit val system: ActorSystem = ActorSystem("kmq-redelivery")
implicit val ec: ExecutionContext = system.dispatcher
implicit val clock: Clock = Clock.systemDefaultZone()
implicit val markerKeyDeserializer: Deserializer[MarkerKey] = new MarkerKey.MarkerKeyDeserializer()
implicit val markerValueDeserializer: Deserializer[MarkerValue] = new MarkerValue.MarkerValueDeserializer()

val markerConsumerSettings = ConsumerSettings(system, markerKeyDeserializer, markerValueDeserializer)
.withBootstrapServers(config.getBootstrapServers)
.withGroupId(config.getRedeliveryConsumerGroupId)
.withProperties(config.getConsumerProps)
.withBootstrapServers(kmqConfig.getBootstrapServers)
.withGroupId(kmqConfig.getRedeliveryConsumerGroupId)
.withProperties(kmqConfig.getConsumerProps)

val streamControl = new RedeliveryTrackerStream(markerConsumerSettings, config.getMarkerTopic, Int.MaxValue)
val streamControl = new RedeliveryTrackerStream(markerConsumerSettings,
kafkaClients, kmqConfig, Int.MaxValue)
.run()

logger.info("Started redelivery stream")

() => Await.result(streamControl.drainAndShutdown().andThen { case _ => system.terminate() }, 1.minute)
() => {
Await.result(streamControl.drainAndShutdown().flatMap(_ => system.terminate()), 1.minute)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,38 @@
package com.softwaremill.kmq.redelivery.streams

import akka.Done
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Cancellable}
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.kafka.scaladsl.Consumer.{Control, DrainingControl}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink}
import akka.stream._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, RunnableGraph, Sink}
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler}
import com.softwaremill.kmq._
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.{Metric, MetricName}

import java.time.Clock
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

/**
* Combines functionality of [[RedeliverySink]] and [[CommitMarkerSink]].
*/
class RedeliveryTrackerStream(markerConsumerSettings: ConsumerSettings[MarkerKey, MarkerValue],
markersTopic: String, maxPartitions: Int)
(implicit system: ActorSystem, ec: ExecutionContext,
kafkaClients: KafkaClients, kmqConfig: KmqConfig, clock: Clock) extends StrictLogging {
kafkaClients: KafkaClients, kmqConfig: KmqConfig, maxPartitions: Int)
(implicit system: ActorSystem, ec: ExecutionContext, clock: Clock) extends StrictLogging {

def run(): DrainingControl[Done] = {
Consumer.committablePartitionedSource(markerConsumerSettings, Subscriptions.topics(markersTopic))
Consumer.committablePartitionedSource(markerConsumerSettings, Subscriptions.topics(kmqConfig.getMarkerTopic))
.mapAsyncUnordered(maxPartitions) {
case (topicPartition, source) =>

val redeliverySink = RedeliverySink(topicPartition.partition)
val redeliverySink = RedeliverySink(kafkaClients, kmqConfig)(topicPartition.partition)
val commitMarkerSink = CommitMarkerSink()

RunnableGraph
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(combineFutures) {
val cancellable = RunnableGraph
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(Keep.left) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to cancel both sinks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's special in the RedeliverySink is an additional TickSource that is instantiated there. Without the TickSource both sinks appear to be closed correctly and the DrainingControl is able to shut down the stream.

Here I'm exposing a hook to the TickSource as Cancellable materialized view from the whole RedeliverySink.

implicit builder => (sink1, sink2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[CommittableMessage[MarkerKey, MarkerValue]](2))
Expand All @@ -41,10 +42,64 @@ class RedeliveryTrackerStream(markerConsumerSettings: ConsumerSettings[MarkerKey
ClosedShape
})
.run()

Future.successful(cancellable)
}
.viaMat(Flow.fromGraph(new AggregatingToMatFlow()))(ControlWithCancellable.apply)
.toMat(Sink.ignore)(DrainingControl.apply)
.run()
}
}

class AggregatingToMatFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], Iterable[T]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting solution :) @aludwiko or @rucek maybe you can take a look, is this the simplest way to cancel sub-streams created using .groupBy or here with Consumer.committablePartitionedSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope it's interesting in a positive way :)

It's certainly not the simplest in general. But here we have additional TickSource merged inside one of the nested sinks which doesn't stop when it's source stops.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in a positive way :)

I think the problem of stopping sub-streams is not that uncommon, as it emerges also when using the .groupBy which I mentioned. So either it's already solved in some way, or we have a solution which would be a great material for a short blog post :)

val in: Inlet[T] = Inlet("in")
val out: Outlet[T] = Outlet("out")

override def shape: FlowShape[T, T] = FlowShape(in, out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Iterable[T]) = {
val logic = new GraphStageLogic(shape) {
private val _values: mutable.Set[T] = mutable.Set()

def values: Iterable[T] = _values

setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
val v = grab(in)
_values += v
push(out, v)
}

override def onPull(): Unit = {
pull(in)
}
})
}

logic -> logic.values
}
}

class ControlWithCancellable(control: Consumer.Control, cancellables: Iterable[Cancellable]) extends Control {

override def stop(): Future[Done] = {
cancellables.foreach(_.cancel())
control.stop()
}

override def shutdown(): Future[Done] = {
cancellables.foreach(_.cancel())
control.shutdown()
}

override def isShutdown: Future[Done] =
control.isShutdown

override def metrics: Future[Map[MetricName, Metric]] =
control.metrics
}

private def combineFutures(l: Future[Done], r: Future[Done]): Future[Done] = l.flatMap(_ => r)
}
object ControlWithCancellable {
def apply(control: Consumer.Control, cancellables: Iterable[Cancellable]): ControlWithCancellable =
new ControlWithCancellable(control, cancellables)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class IntegrationTest
"KMQ" should "resend message if not committed" in {
val bootstrapServer = s"localhost:${testKafkaConfig.kafkaPort}"
val uid = UUID.randomUUID().toString
val kmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000)

val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000)
val kafkaClients: KafkaClients = new KafkaClients(kmqConfig)

val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServer)
Expand Down Expand Up @@ -84,24 +86,26 @@ class IntegrationTest
.to(Producer.plainSink(markerProducerSettings)) // 5. write "end" markers
.run()

val redeliveryHook = RedeliveryTracker.start(new KafkaClients(kmqConfig), kmqConfig)
val redeliveryHook = streams.RedeliveryTracker.start(kafkaClients, kmqConfig)

val messages = (0 to 20).map(_.toString)
messages.foreach(msg => sendToKafka(kmqConfig.getMsgTopic, msg))

eventually {
receivedMessages.size should be > processedMessages.size
processedMessages.sortBy(_.toInt).distinct shouldBe messages
}(PatienceConfig(timeout = Span(15, Seconds)), implicitly, implicitly)
}(PatienceConfig(timeout = Span(20, Seconds)), implicitly, implicitly)

redeliveryHook.close()
control.shutdown()
}

"KMQ" should "resend message if max redelivery count not exceeded" in {
it should "resend message if max redelivery count not exceeded" in {
val bootstrapServer = s"localhost:${testKafkaConfig.kafkaPort}"
val uid = UUID.randomUUID().toString
val kmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000)

val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery", 1000, 1000)
val kafkaClients: KafkaClients = new KafkaClients(kmqConfig)

val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServer)
Expand Down Expand Up @@ -151,7 +155,7 @@ class IntegrationTest
val undeliveredControl = Consumer
.plainSource(
consumerSettings,
Subscriptions.topics(s"${kmqConfig.getMsgTopic}__undelivered")
Subscriptions.topics(kmqConfig.getDeadLetterTopic)
) // 1. get messages from dead-letter topic
.map { msg =>
undeliveredMessages += msg.value
Expand All @@ -160,7 +164,7 @@ class IntegrationTest
.to(Sink.ignore)
.run()

val redeliveryHook = RedeliveryTracker.start(new KafkaClients(kmqConfig), kmqConfig)
val redeliveryHook = streams.RedeliveryTracker.start(kafkaClients, kmqConfig)

val messages = (0 to 6).map(_.toString)
messages.foreach(msg => sendToKafka(kmqConfig.getMsgTopic, msg))
Expand All @@ -170,7 +174,7 @@ class IntegrationTest
eventually {
receivedMessages.sortBy(_.toInt) shouldBe expectedReceived
undeliveredMessages.sortBy(_.toInt) shouldBe expectedUndelivered
}(PatienceConfig(timeout = Span(15, Seconds)), implicitly, implicitly)
}(PatienceConfig(timeout = Span(20, Seconds)), implicitly, implicitly)

redeliveryHook.close()
control.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ class LearningTest extends TestKit(ActorSystem("test-system")) with AnyFlatSpecL
implicit val materializer: Materializer = akka.stream.Materializer.matFromSystem
implicit val ec: ExecutionContext = system.dispatcher

"FooStream" should "merge ticks" in {
it should "merge ticks" in {
Source
.tick(0.1.second, 0.1.second, "tick")
.merge(Source.tick(0.17.second, 0.17.second, "tack"))
.wireTap(println(_))
.runWith(TestSink[String]).request(10).expectNextN(10)
}

"FooStream" should "broadcast to sinks" in {
it should "broadcast to sinks" in {
val source = Source.fromIterator(() => Seq(1, 2, 3).iterator)

val multiplyBy2: Sink[Int, Future[Done]] = Flow[Int].map(_ * 2).wireTap(x => println(s"2: $x")).toMat(Sink.ignore)(Keep.right)
Expand All @@ -49,7 +49,7 @@ class LearningTest extends TestKit(ActorSystem("test-system")) with AnyFlatSpecL
.run()
}

"FooStream" should "async broadcast to sinks" in {
it should "async broadcast to sinks" in {
val source = Source.fromIterator(() => Seq(1, 2, 3).iterator)

val multiplyBy2: Sink[Int, Future[Done]] = Flow[Int].map(_ * 2).wireTap(x => println(s"2: $x")).toMat(Sink.ignore)(Keep.right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ class RedeliveryTrackerStreamIntegrationTest extends TestKit(ActorSystem("test-s
val maxRedeliveryCount = 1
val redeliverAfterMs = 300

implicit val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery",
val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery",
1000, 1000, s"${uid}__undelivered", "kmq-redelivery-count", maxRedeliveryCount, Collections.emptyMap())
implicit val kafkaClients: KafkaClients = new KafkaClients(kmqConfig)
val kafkaClients: KafkaClients = new KafkaClients(kmqConfig)

val markerConsumerSettings = ConsumerSettings(system, markerKeyDeserializer, markerValueDeserializer)
.withBootstrapServers(bootstrapServer)
.withGroupId(kmqConfig.getRedeliveryConsumerGroupId)
.withProperties(kmqConfig.getConsumerProps)

val streamControl = new RedeliveryTrackerStream(markerConsumerSettings,
kmqConfig.getMarkerTopic, Int.MaxValue)
kafkaClients, kmqConfig, Int.MaxValue)
.run()

createTopic(kmqConfig.getMsgTopic)
Expand All @@ -74,17 +74,17 @@ class RedeliveryTrackerStreamIntegrationTest extends TestKit(ActorSystem("test-s
val uid = UUID.randomUUID().toString
val redeliverAfterMs = 300

implicit val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery",
val kmqConfig: KmqConfig = new KmqConfig(bootstrapServer, s"$uid-queue", s"$uid-markers", "kmq_client", "kmq_redelivery",
1000, 1000)
implicit val kafkaClients: KafkaClients = new KafkaClients(kmqConfig)
val kafkaClients: KafkaClients = new KafkaClients(kmqConfig)

val markerConsumerSettings = ConsumerSettings(system, markerKeyDeserializer, markerValueDeserializer)
.withBootstrapServers(bootstrapServer)
.withGroupId(kmqConfig.getRedeliveryConsumerGroupId)
.withProperties(kmqConfig.getConsumerProps)

val streamControl = new RedeliveryTrackerStream(markerConsumerSettings,
kmqConfig.getMarkerTopic, Int.MaxValue)
kafkaClients, kmqConfig, Int.MaxValue)
.run()

createTopic(kmqConfig.getMarkerTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ object EmbeddedStream extends StrictLogging {
private val FAIL_RATIO = 0.5
private val PARTITIONS = 1

private implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig.defaultConfig
private implicit val kmqConfig: KmqConfig = new KmqConfig("localhost:" + kafkaConfig.kafkaPort, "queue", "markers", "kmq_client", "kmq_redelivery",
private val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig.defaultConfig
private val kmqConfig: KmqConfig = new KmqConfig("localhost:" + kafkaConfig.kafkaPort, "queue", "markers", "kmq_client", "kmq_redelivery",
Duration.ofSeconds(3).toMillis, 1000)
private implicit val clients: KafkaClients = new KafkaClients(kmqConfig)
private val clients: KafkaClients = new KafkaClients(kmqConfig)

private val random: Random = new Random(0)
private val processedMessages = new ConcurrentHashMap[Integer, Integer]
Expand All @@ -34,7 +34,7 @@ object EmbeddedStream extends StrictLogging {
EmbeddedKafka.createCustomTopic(kmqConfig.getMsgTopic, partitions = PARTITIONS)
logger.info("Kafka started")

val redelivery = RedeliveryTracker.start()
val redelivery = RedeliveryTracker.start(clients, kmqConfig)
sleep(1000) // Wait for the stream to warm up TODO: analyze

startInBackground(() => processMessages(clients, kmqConfig))
Expand Down