Skip to content

Commit

Permalink
Merge pull request #32 from naxxster/fix-switch-subscription
Browse files Browse the repository at this point in the history
Fix switch operator subscription to unsubscribe from inner subscription
  • Loading branch information
bjornbytes authored Dec 2, 2018
2 parents ebe4eb8 + 2445071 commit 66af07f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
19 changes: 14 additions & 5 deletions rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ end
-- @returns {Observable}
function Observable:switch()
return Observable.create(function(observer)
local subscription
local innerSubscription

local function onNext(...)
return observer:onNext(...)
Expand All @@ -1474,14 +1474,23 @@ function Observable:switch()
end

local function switch(source)
if subscription then
subscription:unsubscribe()
if innerSubscription then
innerSubscription:unsubscribe()
end

subscription = source:subscribe(onNext, onError, nil)
innerSubscription = source:subscribe(onNext, onError, nil)
end

return self:subscribe(switch, onError, onCompleted)
local subscription = self:subscribe(switch, onError, onCompleted)
return Subscription.create(function()
if innerSubscription then
innerSubscription:unsubscribe()
end

if subscription then
subscription:unsubscribe()
end
end)
end)
end

Expand Down
19 changes: 14 additions & 5 deletions src/operators/switch.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ local Observable = require 'observable'
-- @returns {Observable}
function Observable:switch()
return Observable.create(function(observer)
local subscription
local innerSubscription

local function onNext(...)
return observer:onNext(...)
Expand All @@ -20,13 +20,22 @@ function Observable:switch()
end

local function switch(source)
if subscription then
subscription:unsubscribe()
if innerSubscription then
innerSubscription:unsubscribe()
end

subscription = source:subscribe(onNext, onError, nil)
innerSubscription = source:subscribe(onNext, onError, nil)
end

return self:subscribe(switch, onError, onCompleted)
local subscription = self:subscribe(switch, onError, onCompleted)
return Subscription.create(function()
if innerSubscription then
innerSubscription:unsubscribe()
end

if subscription then
subscription:unsubscribe()
end
end)
end)
end
13 changes: 13 additions & 0 deletions tests/switch.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,17 @@ describe('switch', function()
expect(#onError).to.equal(0)
expect(#onCompleted).to.equal(1)
end)

it('should unsubscribe from inner subscription too', function()
local unsubscribeA = spy()
local observableA = Rx.Observable.create(function(observer)
return Rx.Subscription.create(unsubscribeA)
end)

local subject = Rx.Subject.create()
local subscription = subject:switch():subscribe()
subject:onNext(observableA)
subscription:unsubscribe()
expect(#unsubscribeA).to.equal(1)
end)
end)

0 comments on commit 66af07f

Please sign in to comment.