Skip to content

Commit

Permalink
Fix flatten operator's subscription to unsubscribe from all sources
Browse files Browse the repository at this point in the history
The `flatten` operator should unsubscribe from all flattened sources
  • Loading branch information
Junseong Jang committed Mar 12, 2019
1 parent 3e27861 commit a158200
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
12 changes: 10 additions & 2 deletions rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,8 @@ end
-- @returns {Observable}
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}

local function onError(message)
return observer:onError(message)
end
Expand All @@ -959,14 +961,20 @@ function Observable:flatten()
observer:onNext(...)
end

observable:subscribe(innerOnNext, onError, util.noop)
local subscription = observable:subscribe(innerOnNext, onError, util.noop)
subscriptions[#subscriptions + 1] = subscription
end

local function onCompleted()
return observer:onCompleted()
end

return self:subscribe(onNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
subscriptions[i]:unsubscribe()
end
end)
end)
end

Expand Down
12 changes: 10 additions & 2 deletions src/operators/flatten.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ local util = require 'util'
-- @returns {Observable}
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}

local function onError(message)
return observer:onError(message)
end
Expand All @@ -15,13 +17,19 @@ function Observable:flatten()
observer:onNext(...)
end

observable:subscribe(innerOnNext, onError, util.noop)
local subscription = observable:subscribe(innerOnNext, onError, util.noop)
subscriptions[#subscriptions + 1] = subscription
end

local function onCompleted()
return observer:onCompleted()
end

return self:subscribe(onNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
subscriptions[i]:unsubscribe()
end
end)
end)
end
21 changes: 21 additions & 0 deletions tests/flatten.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,25 @@ describe('flatten', function()

expect(observable).to.produce(1, 2, 3, 2, 3, 3)
end)

it('should unsubscribe from all source observables', function()
local unsubscribeA = spy()
local observableA = Rx.Observable.create(function(observer)
return Rx.Subscription.create(unsubscribeA)
end)

local unsubscribeB = spy()
local observableB = Rx.Observable.create(function(observer)
return Rx.Subscription.create(unsubscribeB)
end)

local subject = Rx.Subject.create()
local subscription = subject:flatten():subscribe()

subject:onNext(observableA)
subject:onNext(observableB)
subscription:unsubscribe()
expect(#unsubscribeA).to.equal(1)
expect(#unsubscribeB).to.equal(1)
end)
end)

0 comments on commit a158200

Please sign in to comment.