Skip to content

Commit

Permalink
Allow scan over value-less sequences
Browse files Browse the repository at this point in the history
While not obviously useful, it becomes useful iff the delegate have
side-effects.
  • Loading branch information
skoppe committed Sep 18, 2024
1 parent b6ef86e commit 2acd64f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
24 changes: 18 additions & 6 deletions source/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,14 @@ auto nextTransform(Sequence, NextTransformer)(Sequence s, NextTransformer t) {
}

struct SequenceNextTransform(Sequence, NextTransformer) {
import concurrency.sender : VoidSender;
import concurrency : just;
alias Value = void;
alias Element = typeof(t.setNext(just(Sequence.Element.init))).Value;
static if (is(Sequence.Element == void)) {
alias Element = typeof(t.setNext(VoidSender())).Value;
} else {
alias Element = typeof(t.setNext(just(Sequence.Element.init))).Value;
}
Sequence s;
NextTransformer t;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
Expand Down Expand Up @@ -506,7 +511,7 @@ struct SequenceTakeReceiver(Receiver) {
return Result!(Sender.Value)(Cancelled());
else {
state.n--;
return Result!(Sender.Value)();
return Result!(Sender.Value)(Completed());
}
}));
} else {
Expand Down Expand Up @@ -923,10 +928,17 @@ struct ScanSequenceTransformer(Fun, Seed) {
Seed seed;
auto setNext(Sender)(Sender sender) {
import concurrency.operations : then;
return sender.then((Sender.Value value) @safe shared {
seed = fun(value, seed);
return seed;
});
static if (is(Sender.Value == void)) {
return sender.then(() @safe shared {
seed = fun(seed);
return seed;
});
} else {
return sender.then((Sender.Value value) @safe shared {
seed = fun(value, seed);
return seed;
});
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion tests/ut/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import unit_threaded;
@("take")
@safe unittest {
[1,2,3,4].sequence.take(3).toList().syncWait.value.should == [1,2,3];
[1,2,3,4].sequence.take(3).transform((int i) => i*2).toList().syncWait.value.should == [2,4,6];
[1,2,3,4].sequence.transform((int i) => i*2).take(3).toList().syncWait.value.should == [2,4,6];
}

@("deferSequence.function")
Expand Down Expand Up @@ -135,11 +137,18 @@ import unit_threaded;
just([1,2].sequence).flatten.nextTransform(Transformer()).toList.syncWait.value.should == [1,2];
}

@("scan")
@("scan.value")
@safe unittest {
[1,1,1,1].sequence.scan((int i, int acc) => acc + i, 0).toList().syncWait.value.should == [1,2,3,4];
}

@("scan.void")
@safe unittest {
import core.time : msecs;
interval(1.msecs, false).scan((int acc) => acc + 1, 0).take(4).toList.syncWait.value.should == [1,2,3,4];
interval(1.msecs, false).take(4).scan((int acc) => acc + 1, 0).toList.syncWait.value.should == [1,2,3,4];
}

@("iotaSequence.basic")
@safe unittest {
iotaSequence(5, 10).toList().syncWait.value.should == [5,6,7,8,9];
Expand Down

0 comments on commit 2acd64f

Please sign in to comment.