diff --git a/source/concurrency/sequence.d b/source/concurrency/sequence.d index c60be93..cc04595 100644 --- a/source/concurrency/sequence.d +++ b/source/concurrency/sequence.d @@ -957,10 +957,18 @@ auto filterMap(Sequence, Fun)(Sequence s, Fun f) { return FilterMapSequence!(Sequence, Fun)(s, f); } +import std.typecons : Nullable; +alias NullabledType(P : Nullable!T, T) = T; + struct FilterMapSequence(Sequence, Fun) { import std.traits : ReturnType; alias Value = void; - alias Element = Sequence.Element; + alias FunReturn = ReturnType!Fun; + static if (is(FunReturn == void)) { + alias Element = void; + } else { + alias Element = NullabledType!(ReturnType!(Fun)); + } Sequence s; Fun f; auto connect(Receiver)(return Receiver receiver) @safe return scope { @@ -1029,16 +1037,31 @@ struct FilterMapSequenceNextState(Fun, NextReceiver, Receiver) { struct FilterMapSequenceNextReceiver(Value, Fun, NextReceiver, Receiver) { FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)* state; - auto setValue(Value value) { - import concurrency : just; - import concurrency : connectHeap; - auto result = state.fun(value); - if (result.isNone) { - state.receiver.setValue(); - } else { - auto sender = state.nextReceiver.setNext(just(result.getSome)); - // TODO: put state in FilterMapSequenceNextOp - sender.connectHeap(state.receiver).start(); + static if (is(Value == void)) { + auto setValue() { + import concurrency : just; + import concurrency : connectHeap; + auto result = state.fun(); + if (result.isNone) { + state.receiver.setValue(); + } else { + auto sender = state.nextReceiver.setNext(just(result.getSome)); + // TODO: put state in FilterMapSequenceNextOp + sender.connectHeap(state.receiver).start(); + } + } + } else { + auto setValue(Value value) { + import concurrency : just; + import concurrency : connectHeap; + auto result = state.fun(value); + if (result.isNone) { + state.receiver.setValue(); + } else { + auto sender = state.nextReceiver.setNext(just(result.getSome)); + // TODO: put state in FilterMapSequenceNextOp + sender.connectHeap(state.receiver).start(); + } } } auto setError(Throwable t) nothrow @safe { diff --git a/tests/ut/concurrency/sequence.d b/tests/ut/concurrency/sequence.d index a9721ca..b980ea1 100644 --- a/tests/ut/concurrency/sequence.d +++ b/tests/ut/concurrency/sequence.d @@ -154,7 +154,7 @@ import unit_threaded; iotaSequence(5, 10).toList().syncWait.value.should == [5,6,7,8,9]; } -@("filterMap") +@("filterMap.int") @safe unittest { import std.typecons : Nullable; [1,2,3,4].sequence.filterMap((int i) { @@ -164,3 +164,12 @@ import unit_threaded; return Nullable!(int).init; }).toList().syncWait.value.should == [9,12]; } + +@("filterMap.void") +@safe unittest { + import core.time : msecs; + import std.typecons : Nullable; + interval(1.msecs, false).filterMap(() { + return Nullable!int(1); + }).take(4).toList().syncWait.value.should == [1,1,1,1]; +}