-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
36 fix streams drain control #37
Changes from all commits
69a2b25
a6cd587
af989af
e4ffc31
d5e1dc8
748c667
bbce587
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,38 @@ | ||
package com.softwaremill.kmq.redelivery.streams | ||
|
||
import akka.Done | ||
import akka.actor.ActorSystem | ||
import akka.actor.{ActorSystem, Cancellable} | ||
import akka.kafka.ConsumerMessage.CommittableMessage | ||
import akka.kafka.scaladsl.Consumer | ||
import akka.kafka.scaladsl.Consumer.DrainingControl | ||
import akka.kafka.scaladsl.Consumer.{Control, DrainingControl} | ||
import akka.kafka.{ConsumerSettings, Subscriptions} | ||
import akka.stream.ClosedShape | ||
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink} | ||
import akka.stream._ | ||
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, RunnableGraph, Sink} | ||
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler} | ||
import com.softwaremill.kmq._ | ||
import com.typesafe.scalalogging.StrictLogging | ||
import org.apache.kafka.common.{Metric, MetricName} | ||
|
||
import java.time.Clock | ||
import scala.collection.mutable | ||
import scala.concurrent.{ExecutionContext, Future} | ||
|
||
/** | ||
* Combines functionality of [[RedeliverySink]] and [[CommitMarkerSink]]. | ||
*/ | ||
class RedeliveryTrackerStream(markerConsumerSettings: ConsumerSettings[MarkerKey, MarkerValue], | ||
markersTopic: String, maxPartitions: Int) | ||
(implicit system: ActorSystem, ec: ExecutionContext, | ||
kafkaClients: KafkaClients, kmqConfig: KmqConfig, clock: Clock) extends StrictLogging { | ||
kafkaClients: KafkaClients, kmqConfig: KmqConfig, maxPartitions: Int) | ||
(implicit system: ActorSystem, ec: ExecutionContext, clock: Clock) extends StrictLogging { | ||
|
||
def run(): DrainingControl[Done] = { | ||
Consumer.committablePartitionedSource(markerConsumerSettings, Subscriptions.topics(markersTopic)) | ||
Consumer.committablePartitionedSource(markerConsumerSettings, Subscriptions.topics(kmqConfig.getMarkerTopic)) | ||
.mapAsyncUnordered(maxPartitions) { | ||
case (topicPartition, source) => | ||
|
||
val redeliverySink = RedeliverySink(topicPartition.partition) | ||
val redeliverySink = RedeliverySink(kafkaClients, kmqConfig)(topicPartition.partition) | ||
val commitMarkerSink = CommitMarkerSink() | ||
|
||
RunnableGraph | ||
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(combineFutures) { | ||
val cancellable = RunnableGraph | ||
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(Keep.left) { | ||
implicit builder => (sink1, sink2) => | ||
import GraphDSL.Implicits._ | ||
val broadcast = builder.add(Broadcast[CommittableMessage[MarkerKey, MarkerValue]](2)) | ||
|
@@ -41,10 +42,64 @@ class RedeliveryTrackerStream(markerConsumerSettings: ConsumerSettings[MarkerKey | |
ClosedShape | ||
}) | ||
.run() | ||
|
||
Future.successful(cancellable) | ||
} | ||
.viaMat(Flow.fromGraph(new AggregatingToMatFlow()))(ControlWithCancellable.apply) | ||
.toMat(Sink.ignore)(DrainingControl.apply) | ||
.run() | ||
} | ||
} | ||
|
||
class AggregatingToMatFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], Iterable[T]] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I hope it's interesting in a positive way :) It's certainly not the simplest in general. But here we have additional TickSource merged inside one of the nested sinks which doesn't stop when it's source stops. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, in a positive way :) I think the problem of stopping sub-streams is not that uncommon, as it emerges also when using the |
||
val in: Inlet[T] = Inlet("in") | ||
val out: Outlet[T] = Outlet("out") | ||
|
||
override def shape: FlowShape[T, T] = FlowShape(in, out) | ||
|
||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Iterable[T]) = { | ||
val logic = new GraphStageLogic(shape) { | ||
private val _values: mutable.Set[T] = mutable.Set() | ||
|
||
def values: Iterable[T] = _values | ||
|
||
setHandlers(in, out, new InHandler with OutHandler { | ||
override def onPush(): Unit = { | ||
val v = grab(in) | ||
_values += v | ||
push(out, v) | ||
} | ||
|
||
override def onPull(): Unit = { | ||
pull(in) | ||
} | ||
}) | ||
} | ||
|
||
logic -> logic.values | ||
} | ||
} | ||
|
||
class ControlWithCancellable(control: Consumer.Control, cancellables: Iterable[Cancellable]) extends Control { | ||
|
||
override def stop(): Future[Done] = { | ||
cancellables.foreach(_.cancel()) | ||
control.stop() | ||
} | ||
|
||
override def shutdown(): Future[Done] = { | ||
cancellables.foreach(_.cancel()) | ||
control.shutdown() | ||
} | ||
|
||
override def isShutdown: Future[Done] = | ||
control.isShutdown | ||
|
||
override def metrics: Future[Map[MetricName, Metric]] = | ||
control.metrics | ||
} | ||
|
||
private def combineFutures(l: Future[Done], r: Future[Done]): Future[Done] = l.flatMap(_ => r) | ||
} | ||
object ControlWithCancellable { | ||
def apply(control: Consumer.Control, cancellables: Iterable[Cancellable]): ControlWithCancellable = | ||
new ControlWithCancellable(control, cancellables) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to cancel both sinks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's special in the RedeliverySink is an additional TickSource that is instantiated there. Without the TickSource both sinks appear to be closed correctly and the DrainingControl is able to shut down the stream.
Here I'm exposing a hook to the TickSource as Cancellable materialized view from the whole RedeliverySink.