Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

36 fix streams drain control #37

Closed
wants to merge 7 commits into from

Conversation

mkrzemien
Copy link
Contributor

Closes #36

@mkrzemien mkrzemien requested review from adamw and aludwiko August 17, 2022 12:49
@mkrzemien mkrzemien force-pushed the 36-fix-streams-drain-control branch 3 times, most recently from 6d766fd to 7a0c34d Compare August 18, 2022 17:31
@mkrzemien mkrzemien force-pushed the 36-fix-streams-drain-control branch from 7a0c34d to e4ffc31 Compare August 18, 2022 17:33
@mkrzemien mkrzemien requested a review from adamw August 19, 2022 10:38
RunnableGraph
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(combineFutures) {
val cancellable = RunnableGraph
.fromGraph(GraphDSL.createGraph(redeliverySink, commitMarkerSink)(Keep.left) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to cancel both sinks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's special in the RedeliverySink is an additional TickSource that is instantiated there. Without the TickSource both sinks appear to be closed correctly and the DrainingControl is able to shut down the stream.

Here I'm exposing a hook to the TickSource as Cancellable materialized view from the whole RedeliverySink.

.toMat(Sink.ignore)(DrainingControl.apply)
.run()
}
}

class AggregatingToMatFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], Iterable[T]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting solution :) @aludwiko or @rucek maybe you can take a look, is this the simplest way to cancel sub-streams created using .groupBy or here with Consumer.committablePartitionedSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope it's interesting in a positive way :)

It's certainly not the simplest in general. But here we have additional TickSource merged inside one of the nested sinks which doesn't stop when it's source stops.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in a positive way :)

I think the problem of stopping sub-streams is not that uncommon, as it emerges also when using the .groupBy which I mentioned. So either it's already solved in some way, or we have a solution which would be a great material for a short blog post :)

@mkrzemien mkrzemien deleted the branch release-0.3 December 21, 2022 15:29
@mkrzemien mkrzemien closed this Dec 21, 2022
@katlasik katlasik deleted the 36-fix-streams-drain-control branch April 5, 2023 09:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants