Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add withLatestFrom operator. #1315

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

zach-klippenstein
Copy link
Contributor

Use case: The processing of one flow (the operator's receiver) requires using the latest value emitted from another flow at the time of emission from the first flow. My specific use case involves combining a flow of events with a flow of event sinks ((Event) -> Unit). Whenever an event occurs (is emitted from the first flow), it should be sent to the most recent sink emitted from the second flow.

The resulting flow is entirely driven by the first flow. This operator conflates and caches values from the second flow and passes them to a lambda to be combined with values from the first flow whenever the first flow emits. The resulting flow will start collecting both flows immediately, but will not call the transform function or emit anything until both flows emit at least one value. The resulting flow will remain active until either flow throws an exception, or the first flow completes. If the first flow completes before the second, the second flow is cancelled. If the second flow completes before the first, the latest value will continue to be cached and passed to the transform function every time the first flow emits.

This operator is similar to combineLatest, but it always emits immediately and only when the first flow emits (values from the first flow are not cached). The implementation in this PR reuses as much of the infrastructure as possible from combineLatest. It collects the second flow using a CONFLATED channel since the operator only cares about caching the most recently emitted value, so the second flow does not need to have any backpressure applied.

This PR only includes a single overload of the operator, accepting a single other flow, since that is the only overload I have a concrete use case for.

@zach-klippenstein zach-klippenstein force-pushed the zachklipp/withlatestfrom branch from 0c66a10 to 80a0d5e Compare July 3, 2019 04:29
@qwwdfsad qwwdfsad added the flow label Jul 3, 2019
@zach-klippenstein zach-klippenstein force-pushed the zachklipp/withlatestfrom branch from 80a0d5e to 4fa6c5a Compare July 3, 2019 17:00
@zach-klippenstein zach-klippenstein force-pushed the zachklipp/withlatestfrom branch from 4fa6c5a to 0575808 Compare July 6, 2019 16:04
@zach-klippenstein
Copy link
Contributor Author

Gentle ping: Is the use case I've given adequate, or would more information be helpful?

@elizarov
Copy link
Contributor

elizarov commented Jul 22, 2019

This will not be part of 1.3.0 release. We'll look at when planning for the next batch of operations in subsequent versions. To clarify, the main problem we facing here is we are trying to keep our API minimal and orthogonal, so we're looking at more primitive operators that would let us express operators like withLatestFrom and to consistently name them, too.

@hrach
Copy link
Contributor

hrach commented Aug 27, 2019

Do I correctly understand that this is combineTransform with Latest extension? Actually, I'd love to see this.

My usecase is for example: loading places to map; user is moving with map, which cancels the previous fetching, also the loaded data depend on filter or other Flows.

@zach-klippenstein
Copy link
Contributor Author

I don't think you can implement this with combineLatest - that operator is triggered whenever any of it's upstream flows emits. This operator would cache some upstream values but only trigger on a subset of them.

It's closer to a zip, which requires all operators to emit, combined with a (non-existent) operator on some upstreams that would be similar to conflate: A.withLatestFrom(B) means "zip A with B, but if B emits again while waiting for A then use the new value from B instead". Breaking the operator down in this way is more flexible because you could "withLatestFrom" an arbitrary number of streams and have fine grained control over which ones trigger updates and which ones are just cached, but I'm not sure how this variant of conflate could be implemented because it would need to affect synchronization logic that currently exists within and is very specific to the zip operator. It could be done with custom operator fusion but that feels hacky and brittle.

@elizarov
Copy link
Contributor

A simple implementation using only stable Flow APIs can be found here: https://pl.kotl.in/IYfZx_sKY

@curioustechizen
Copy link

I'm having trouble writing unit tests for the implementation in #1315 (comment)

Here's what I've tried.

@Test
fun `When receiver flow emits then resulting flow emits`()  {

    val firstFlowChannel = ConflatedBroadcastChannel<Char>()
    val otherFlowChannel = ConflatedBroadcastChannel<Int>()

    val resultingFlow =
        firstFlowChannel.asFlow()
            .withLatestFrom(otherFlowChannel.asFlow()) { a, b -> "$a$b"}

    otherFlowChannel.offer(1)
    firstFlowChannel.offer('a')

    runBlocking {
        val receiveChannel = resultingFlow.produceIn(this)
        assertEquals("a1", receiveChannel.poll())
        receiveChannel.cancel()
    }
}

This always fails with

java.lang.AssertionError:
Expected :a1
Actual :null

I see receiveChannel is empty. If I try to add breakpoints it looks like the withLatestFrom is not even being called. I suspect I've misconfigured the test but I cannot spot the problem.

@elizarov
Copy link
Contributor

@curioustechizen Don't poll the receive channel. Use receive.

@curioustechizen
Copy link

@elizarov Thanks. That worked. If I change assertEquals(true, receiveChannel.isEmpty) the test passes.

Do I understand correctly that flow.produceIn() does not actually trigger the processing; but rather it is channel.receive() that triggers it?

@zach-klippenstein
Copy link
Contributor Author

@curioustechizen produceIn (may) launch a coroutine to collect upstream, but that coroutine probably won't actually start executing before produceIn returns and you call poll immediately on the next line (depending on dispatcher and thread scheduling details). If it doesn't, poll will return null.

@qwwdfsad qwwdfsad force-pushed the develop branch 3 times, most recently from 4a49830 to aff8202 Compare March 10, 2020 17:27
@Dimezis
Copy link

Dimezis commented Oct 21, 2021

Is there anything blocking this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants