diff --git a/rx.lua b/rx.lua index dd32be3..bef15ac 100644 --- a/rx.lua +++ b/rx.lua @@ -1813,6 +1813,7 @@ function Observable.zip(...) return Observable.create(function(observer) local values = {} local active = {} + local subscriptions = {} for i = 1, count do values[i] = {n = 0} active[i] = true @@ -1858,8 +1859,14 @@ function Observable.zip(...) end for i = 1, count do - sources[i]:subscribe(onNext(i), onError, onCompleted(i)) + subscriptions[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i)) end + + return Subscription.create(function() + for i = 1, count do + if subscriptions[i] then subscriptions[i]:unsubscribe() end + end + end) end) end diff --git a/src/operators/zip.lua b/src/operators/zip.lua index a005b96..62ff6a9 100644 --- a/src/operators/zip.lua +++ b/src/operators/zip.lua @@ -15,6 +15,7 @@ function Observable.zip(...) return Observable.create(function(observer) local values = {} local active = {} + local subscriptions = {} for i = 1, count do values[i] = {n = 0} active[i] = true @@ -60,7 +61,13 @@ function Observable.zip(...) end for i = 1, count do - sources[i]:subscribe(onNext(i), onError, onCompleted(i)) + subscriptions[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i)) end + + return Subscription.create(function() + for i = 1, count do + if subscriptions[i] then subscriptions[i]:unsubscribe() end + end + end) end) end diff --git a/tests/zip.lua b/tests/zip.lua index 3e7c0ef..fa8b540 100644 --- a/tests/zip.lua +++ b/tests/zip.lua @@ -3,6 +3,25 @@ describe('zip', function() expect(Rx.Observable.fromRange(1, 5):zip()).to.produce(1, 2, 3, 4, 5) end) + it('unsubscribes from all input observables', function() + local unsubscribeA = spy() + local subscriptionA = Rx.Subscription.create(unsubscribeA) + local observableA = Rx.Observable.create(function(observer) + return subscriptionA + end) + + local unsubscribeB = spy() + local subscriptionB = Rx.Subscription.create(unsubscribeB) + local observableB = Rx.Observable.create(function(observer) + return subscriptionB + end) + + local subscription = Rx.Observable.zip(observableA, observableB):subscribe() + subscription:unsubscribe() + expect(#unsubscribeA).to.equal(1) + expect(#unsubscribeB).to.equal(1) + end) + it('groups values produced by the sources by their index', function() local observableA = Rx.Observable.fromRange(1, 3) local observableB = Rx.Observable.fromRange(2, 4)