Skip to content

Commit

Permalink
Merge pull request #51 from bonzini/master
Browse files Browse the repository at this point in the history
flatten: avoid early completion
  • Loading branch information
bjornbytes authored Jan 15, 2020
2 parents 98f6eaa + 1cbd5b2 commit 16edbf9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
15 changes: 10 additions & 5 deletions rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -963,24 +963,29 @@ end
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}
local remaining = 1

local function onError(message)
return observer:onError(message)
end

local function onCompleted()
remaining = remaining - 1
if remaining == 0 then
return observer:onCompleted()
end
end

local function onNext(observable)
local function innerOnNext(...)
observer:onNext(...)
end

local subscription = observable:subscribe(innerOnNext, onError, util.noop)
remaining = remaining + 1
local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = subscription
end

local function onCompleted()
return observer:onCompleted()
end

subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
Expand Down
15 changes: 10 additions & 5 deletions src/operators/flatten.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,29 @@ local util = require 'util'
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}
local remaining = 1

local function onError(message)
return observer:onError(message)
end

local function onCompleted()
remaining = remaining - 1
if remaining == 0 then
return observer:onCompleted()
end
end

local function onNext(observable)
local function innerOnNext(...)
observer:onNext(...)
end

local subscription = observable:subscribe(innerOnNext, onError, util.noop)
remaining = remaining + 1
local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = subscription
end

local function onCompleted()
return observer:onCompleted()
end

subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
Expand Down
13 changes: 13 additions & 0 deletions tests/flatMap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,17 @@ describe('flatMap', function()

expect(observable).to.produce(1, 2, 3, 2, 3, 3)
end)

it('completes after all observables produced by its parent', function()
s = Rx.CooperativeScheduler.create()
local observable = Rx.Observable.fromRange(3):flatMap(function(i)
return Rx.Observable.fromRange(i, 3):delay(i, s)
end)

local onNext, onError, onCompleted, order = observableSpy(observable)
repeat s:update(1)
until s:isEmpty()
expect(#onNext).to.equal(6)
expect(#onCompleted).to.equal(1)
end)
end)
13 changes: 13 additions & 0 deletions tests/flatten.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ describe('flatten', function()
expect(observable).to.produce(1, 2, 3, 2, 3, 3)
end)

it('completes after all observables produced by its parent', function()
s = Rx.CooperativeScheduler.create()
local observable = Rx.Observable.fromRange(3):map(function(i)
return Rx.Observable.fromRange(i, 3):delay(i, s)
end):flatten()

local onNext, onError, onCompleted, order = observableSpy(observable)
repeat s:update(1)
until s:isEmpty()
expect(#onNext).to.equal(6)
expect(#onCompleted).to.equal(1)
end)

it('should unsubscribe from all source observables', function()
local unsubscribeA = spy()
local observableA = Rx.Observable.create(function(observer)
Expand Down

0 comments on commit 16edbf9

Please sign in to comment.