Skip to content

Commit

Permalink
flatten: avoid early completion
Browse files Browse the repository at this point in the history
The flatten operator currently calls onCompleted as soon as the
parent signals that it has completed.  However, there could be
more data incoming from the returned observables; in fact, the
flattened observable might even be infinite.

Track how many subscriptions are still active, and only call
onCompleted at the time of the last completion.
  • Loading branch information
bonzini committed Jan 14, 2020
1 parent 98f6eaa commit 1cbd5b2
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
15 changes: 10 additions & 5 deletions rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -963,24 +963,29 @@ end
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}
local remaining = 1

local function onError(message)
return observer:onError(message)
end

local function onCompleted()
remaining = remaining - 1
if remaining == 0 then
return observer:onCompleted()
end
end

local function onNext(observable)
local function innerOnNext(...)
observer:onNext(...)
end

local subscription = observable:subscribe(innerOnNext, onError, util.noop)
remaining = remaining + 1
local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = subscription
end

local function onCompleted()
return observer:onCompleted()
end

subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
Expand Down
15 changes: 10 additions & 5 deletions src/operators/flatten.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,29 @@ local util = require 'util'
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}
local remaining = 1

local function onError(message)
return observer:onError(message)
end

local function onCompleted()
remaining = remaining - 1
if remaining == 0 then
return observer:onCompleted()
end
end

local function onNext(observable)
local function innerOnNext(...)
observer:onNext(...)
end

local subscription = observable:subscribe(innerOnNext, onError, util.noop)
remaining = remaining + 1
local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = subscription
end

local function onCompleted()
return observer:onCompleted()
end

subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
Expand Down
13 changes: 13 additions & 0 deletions tests/flatMap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,17 @@ describe('flatMap', function()

expect(observable).to.produce(1, 2, 3, 2, 3, 3)
end)

it('completes after all observables produced by its parent', function()
s = Rx.CooperativeScheduler.create()
local observable = Rx.Observable.fromRange(3):flatMap(function(i)
return Rx.Observable.fromRange(i, 3):delay(i, s)
end)

local onNext, onError, onCompleted, order = observableSpy(observable)
repeat s:update(1)
until s:isEmpty()
expect(#onNext).to.equal(6)
expect(#onCompleted).to.equal(1)
end)
end)
13 changes: 13 additions & 0 deletions tests/flatten.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ describe('flatten', function()
expect(observable).to.produce(1, 2, 3, 2, 3, 3)
end)

it('completes after all observables produced by its parent', function()
s = Rx.CooperativeScheduler.create()
local observable = Rx.Observable.fromRange(3):map(function(i)
return Rx.Observable.fromRange(i, 3):delay(i, s)
end):flatten()

local onNext, onError, onCompleted, order = observableSpy(observable)
repeat s:update(1)
until s:isEmpty()
expect(#onNext).to.equal(6)
expect(#onCompleted).to.equal(1)
end)

it('should unsubscribe from all source observables', function()
local unsubscribeA = spy()
local observableA = Rx.Observable.create(function(observer)
Expand Down

0 comments on commit 1cbd5b2

Please sign in to comment.