From a15820023132ebf77e1c2c5df1a2be30a6e0a355 Mon Sep 17 00:00:00 2001 From: Junseong Jang Date: Tue, 12 Mar 2019 17:58:11 +0900 Subject: [PATCH] Fix `flatten` operator's subscription to unsubscribe from all sources The `flatten` operator should unsubscribe from all flattened sources --- rx.lua | 12 ++++++++++-- src/operators/flatten.lua | 12 ++++++++++-- tests/flatten.lua | 21 +++++++++++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/rx.lua b/rx.lua index 36cabfc..b25219b 100644 --- a/rx.lua +++ b/rx.lua @@ -950,6 +950,8 @@ end -- @returns {Observable} function Observable:flatten() return Observable.create(function(observer) + local subscriptions = {} + local function onError(message) return observer:onError(message) end @@ -959,14 +961,20 @@ function Observable:flatten() observer:onNext(...) end - observable:subscribe(innerOnNext, onError, util.noop) + local subscription = observable:subscribe(innerOnNext, onError, util.noop) + subscriptions[#subscriptions + 1] = subscription end local function onCompleted() return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted) + return Subscription.create(function () + for i = 1, #subscriptions do + subscriptions[i]:unsubscribe() + end + end) end) end diff --git a/src/operators/flatten.lua b/src/operators/flatten.lua index d7be92d..381ce29 100644 --- a/src/operators/flatten.lua +++ b/src/operators/flatten.lua @@ -6,6 +6,8 @@ local util = require 'util' -- @returns {Observable} function Observable:flatten() return Observable.create(function(observer) + local subscriptions = {} + local function onError(message) return observer:onError(message) end @@ -15,13 +17,19 @@ function Observable:flatten() observer:onNext(...) end - observable:subscribe(innerOnNext, onError, util.noop) + local subscription = observable:subscribe(innerOnNext, onError, util.noop) + subscriptions[#subscriptions + 1] = subscription end local function onCompleted() return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted) + return Subscription.create(function () + for i = 1, #subscriptions do + subscriptions[i]:unsubscribe() + end + end) end) end diff --git a/tests/flatten.lua b/tests/flatten.lua index efdc6c1..03f50d3 100644 --- a/tests/flatten.lua +++ b/tests/flatten.lua @@ -12,4 +12,25 @@ describe('flatten', function() expect(observable).to.produce(1, 2, 3, 2, 3, 3) end) + + it('should unsubscribe from all source observables', function() + local unsubscribeA = spy() + local observableA = Rx.Observable.create(function(observer) + return Rx.Subscription.create(unsubscribeA) + end) + + local unsubscribeB = spy() + local observableB = Rx.Observable.create(function(observer) + return Rx.Subscription.create(unsubscribeB) + end) + + local subject = Rx.Subject.create() + local subscription = subject:flatten():subscribe() + + subject:onNext(observableA) + subject:onNext(observableB) + subscription:unsubscribe() + expect(#unsubscribeA).to.equal(1) + expect(#unsubscribeB).to.equal(1) + end) end)