Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dip1008 #74

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dub.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ configuration "unittest" {
dependency "unit-threaded" version="*"
targetType "executable"
mainSourceFile "tests/ut/ut_runner.d"
dflags "-dip1000"
dflags "-dip1000" "-dip1008"
sourcePaths "source" "tests/ut"
importPaths "source" "tests/ut"
}
configuration "unittest-release" {
dependency "unit-threaded" version="*"
targetType "executable"
mainSourceFile "tests/ut/ut_runner.d"
dflags "-dip1000" "-g"
dflags "-dip1000" "-g" "-dip1008"
sourcePaths "source" "tests/ut"
importPaths "source" "tests/ut"
# buildOptions "unittests" "optimize" "inline"
Expand Down
20 changes: 20 additions & 0 deletions source/concurrency/error.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Throwable clone(Throwable t) nothrow @safe {
return new ThrowableClone!RangeError(t.info, r.file, r.line, r.next);
if (auto e = cast(Error)t)
return new ThrowableClone!Error(t.info, t.msg, t.file, t.line, t.next);
if (auto e = cast(Exception)t)
return new ThrowableClone!Exception(t.info, t.msg, t.file, t.line, t.next);
return new ThrowableClone!Throwable(t.info, t.msg, t.file, t.line, t.next);
}

Expand Down Expand Up @@ -53,3 +55,21 @@ class ClonedTraceInfo : Throwable.TraceInfo {
return buf;
}
}

Exception unscopeException(scope Exception e) @trusted nothrow {
if (e.refcount == 0) {
//TODO: what if exception is allocated on stack or TLS?
return cast(Exception)e;
}

return cast(Exception)e.clone();
}

Throwable unscopeException(scope Throwable e) @trusted nothrow {
if (e.refcount == 0) {
//TODO: what if exception is allocated on stack or TLS?
return cast(Throwable)e;
}

return e.clone();
}
3 changes: 2 additions & 1 deletion source/concurrency/nursery.d
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class Nursery : StopSource {
try {
localReceiver.setValue();
} catch (Exception e) {
localReceiver.setError(e);
import concurrency.error;
localReceiver.setError(e.unscopeException);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/onerror.d
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private struct OnErrorReceiver(Value, SideEffect, Receiver) {
try
sideEffect(e);
catch (Exception e2) {
receiver.setError(() @trusted { return Throwable.chainTogether(e2, e); } ());
import concurrency.error;
receiver.setError(() @trusted { return Throwable.chainTogether(e2.unscopeException, e); } ());
return;
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/onresult.d
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ private struct OnResultReceiver(Value, SideEffect, Receiver) {
try {
sideEffect(Result!(Value)(ex));
} catch (Exception e2) {
import concurrency.error;
return receiver.setError(() @trusted {
return Throwable.chainTogether(e2, e);
return Throwable.chainTogether(e2.unscopeException, e);
}());
} catch (Throwable t) {
return receiver.setError(t);
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/repeat.d
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private struct RepeatOp(Receiver, Sender) {
try {
reset();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
private void reset() @trusted scope {
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/retry.d
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private struct RetryReceiver(Receiver, Sender, Logic) {
// From what I gathered that is what libunifex does
sender.connectHeap(this).start();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions source/concurrency/operations/toshared.d
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
try {
localState.op = sender.connect(SharedSenderReceiver!(Sender, Scheduler, resetLogic)(&state, scheduler));
} catch (Exception e) {
state.process!(resetLogic)(InternalValue(e));
import concurrency.error;
state.process!(resetLogic)(InternalValue(e.unscopeException));
}
localState.op.start();
} else {
Expand Down Expand Up @@ -158,10 +159,12 @@ private struct SharedSenderOp(Sender, Scheduler, ResetLogic resetLogic, Receiver
/// this onValue can sometimes be called immediately,
/// leaving no room to set cb.dispose...
cb.dispose();
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}, (Throwable e){
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}, (Done d){
receiver.setDone();
});
Expand Down
6 changes: 4 additions & 2 deletions source/concurrency/receiver.d
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ void setValueOrError(Receiver)(auto ref Receiver receiver) @safe {
try {
receiver.setValue();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand All @@ -85,7 +86,8 @@ void setValueOrError(Receiver, T)(auto ref Receiver receiver, auto ref T value)
try {
receiver.setValue(value);
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
6 changes: 4 additions & 2 deletions source/concurrency/scheduler.d
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct SchedulerAdapter(Worker) {
try {
worker.schedule(cast(VoidDelegate)()=>receiver.setValueOrError());
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down Expand Up @@ -135,7 +136,8 @@ struct ScheduleAfterOp(Worker, Receiver) {
try {
timer = worker.addTimer(cast(void delegate(TimerTrigger) @safe shared)&trigger, dur);
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
return;
}

Expand Down
8 changes: 5 additions & 3 deletions source/concurrency/sender.d
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ struct JustFromSender(Fun) {
try {
set();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down Expand Up @@ -463,14 +464,15 @@ struct PromiseSenderOp(T, Receiver) {
// calling the receiver's termination functions.
if (cb)
cb.dispose();
value.match!((Sender.ValueRep v){
value.match!((Sender.ValueRep v) @safe {
import concurrency.error;
try {
static if (is(Sender.Value == void))
receiver.setValue();
else
receiver.setValue(v);
} catch (Exception e) {
receiver.setError(e);
receiver.setError(e.unscopeException);
}
}, (Throwable e){
receiver.setError(e);
Expand Down
15 changes: 10 additions & 5 deletions source/concurrency/stream/package.d
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,16 @@ auto intervalStream(Duration duration, bool emitAtStart = false) {
}
op.load();
} catch (Exception e) {
op.receiver.setError(e);
import concurrency.error;
op.receiver.setError(e.unscopeException);
}
}
void setDone() @safe nothrow {
op.receiver.setDone();
}
void setError(Throwable t) @safe nothrow {
op.receiver.setError(t);
import concurrency.error;
op.receiver.setError(t.unscopeException);
}
auto getStopToken() @safe {
return op.receiver.getStopToken();
Expand Down Expand Up @@ -165,15 +167,17 @@ auto intervalStream(Duration duration, bool emitAtStart = false) {
}
load();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
private void load() @trusted nothrow {
try {
op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this));
op.start();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down Expand Up @@ -294,9 +298,10 @@ shared struct SharedStream(T) {
try {
dg(element);
} catch (Exception e) {
import concurrency.error;
source.remove(&this.onItem);
cb.dispose();
receiver.setError(e);
receiver.setError(e.unscopeException);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/stream/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ template loopStream(E) {
try {
t.loop(dg, receiver.getStopToken);
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
if (receiver.getStopToken().isStopRequested)
receiver.setDone();
Expand Down
9 changes: 6 additions & 3 deletions source/concurrency/stream/throttling.d
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi
dg();
return true;
} catch (Exception e) {
import concurrency.error;
with (flags.lock(ThrottleFlags.doneOrError_produced)) {
if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
throwable = e;
throwable = e.unscopeException;
}
release();
process(newState);
Expand Down Expand Up @@ -136,9 +137,10 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi
dg(t);
return true;
} catch (Exception e) {
import concurrency.error;
with (flags.lock(ThrottleFlags.doneOrError_produced)) {
if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
throwable = e;
throwable = e.unscopeException;
}
release();
process(newState);
Expand Down Expand Up @@ -198,7 +200,8 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi
else
dg();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
return;
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/syncwait.d
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ package struct SyncWaitReceiver2(Value) {
}

void setError(Throwable e) nothrow @safe {
state.throwable = e;
import concurrency.error;
state.throwable = e.unscopeException;
state.worker.stop();
}
static if (is(Value == void))
Expand Down
9 changes: 6 additions & 3 deletions source/concurrency/thread.d
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ struct ThreadSender {
try {
receiver.setValue();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
} catch (Throwable t) {
receiver.setError(t.clone());
}
Expand Down Expand Up @@ -435,7 +436,8 @@ private struct TaskPoolSender {
try {
receiver.setValue();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
} catch (Throwable t) {
receiver.setError(t.clone);
}
Expand All @@ -454,7 +456,8 @@ private struct TaskPoolSender {
try {
pool.put(myTask);
} catch (Exception e) {
myTask.args[0].setError(e);
import concurrency.error;
myTask.args[0].setError(e.unscopeException);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/ut/concurrency/operations.d
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct OutOfBandValueSender(T) {

@("race.exception.double")
@safe unittest {
// TODO: this is flaky. Find another way.
auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
Expand Down
8 changes: 4 additions & 4 deletions tests/ut/concurrency/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,11 @@ unittest {

@("flatmap.concat.error")
@safe unittest {
import concurrency.sender : just, ErrorSender;
import concurrency.sender : just, ThrowingSender;
import concurrency.operations : via;

[1,2,3].arrayStream
.flatMapConcat((int i) => ErrorSender())
.flatMapConcat((int i) => ThrowingSender())
.collect(()shared{})
.syncWait
.assumeOk
Expand Down Expand Up @@ -659,11 +659,11 @@ unittest {

@("flatmap.latest.error")
@safe unittest {
import concurrency.sender : just, ErrorSender;
import concurrency.sender : just, ThrowingSender;
import concurrency.operations : via;

[1,2,3].arrayStream
.flatMapLatest((int i) => ErrorSender())
.flatMapLatest((int i) => ThrowingSender())
.collect(()shared{})
.syncWait
.assumeOk
Expand Down