Skip to content

Commit

Permalink
Allow filterMap to operate on sequences of empty data
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Sep 18, 2024
1 parent 2acd64f commit f315462
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 12 deletions.
45 changes: 34 additions & 11 deletions source/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,18 @@ auto filterMap(Sequence, Fun)(Sequence s, Fun f) {
return FilterMapSequence!(Sequence, Fun)(s, f);
}

import std.typecons : Nullable;
alias NullabledType(P : Nullable!T, T) = T;

struct FilterMapSequence(Sequence, Fun) {
import std.traits : ReturnType;
alias Value = void;
alias Element = Sequence.Element;
alias FunReturn = ReturnType!Fun;
static if (is(FunReturn == void)) {
alias Element = void;
} else {
alias Element = NullabledType!(ReturnType!(Fun));
}
Sequence s;
Fun f;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
Expand Down Expand Up @@ -1029,16 +1037,31 @@ struct FilterMapSequenceNextState(Fun, NextReceiver, Receiver) {
struct FilterMapSequenceNextReceiver(Value, Fun, NextReceiver, Receiver) {
FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)* state;

auto setValue(Value value) {
import concurrency : just;
import concurrency : connectHeap;
auto result = state.fun(value);
if (result.isNone) {
state.receiver.setValue();
} else {
auto sender = state.nextReceiver.setNext(just(result.getSome));
// TODO: put state in FilterMapSequenceNextOp
sender.connectHeap(state.receiver).start();
static if (is(Value == void)) {
auto setValue() {
import concurrency : just;
import concurrency : connectHeap;
auto result = state.fun();
if (result.isNone) {
state.receiver.setValue();
} else {
auto sender = state.nextReceiver.setNext(just(result.getSome));
// TODO: put state in FilterMapSequenceNextOp
sender.connectHeap(state.receiver).start();
}
}
} else {
auto setValue(Value value) {
import concurrency : just;
import concurrency : connectHeap;
auto result = state.fun(value);
if (result.isNone) {
state.receiver.setValue();
} else {
auto sender = state.nextReceiver.setNext(just(result.getSome));
// TODO: put state in FilterMapSequenceNextOp
sender.connectHeap(state.receiver).start();
}
}
}
auto setError(Throwable t) nothrow @safe {
Expand Down
11 changes: 10 additions & 1 deletion tests/ut/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ import unit_threaded;
iotaSequence(5, 10).toList().syncWait.value.should == [5,6,7,8,9];
}

@("filterMap")
@("filterMap.int")
@safe unittest {
import std.typecons : Nullable;
[1,2,3,4].sequence.filterMap((int i) {
Expand All @@ -164,3 +164,12 @@ import unit_threaded;
return Nullable!(int).init;
}).toList().syncWait.value.should == [9,12];
}

@("filterMap.void")
@safe unittest {
import core.time : msecs;
import std.typecons : Nullable;
interval(1.msecs, false).filterMap(() {
return Nullable!int(1);
}).take(4).toList().syncWait.value.should == [1,1,1,1];
}

0 comments on commit f315462

Please sign in to comment.