diff --git a/dub.sdl b/dub.sdl index 6fbf8f7..8a13fa1 100644 --- a/dub.sdl +++ b/dub.sdl @@ -17,7 +17,7 @@ configuration "unittest" { dependency "unit-threaded" version="*" targetType "executable" mainSourceFile "tests/ut/ut_runner.d" - dflags "-dip1000" + dflags "-dip1000" "-dip1008" sourcePaths "source" "tests/ut" importPaths "source" "tests/ut" } @@ -25,7 +25,7 @@ configuration "unittest-release" { dependency "unit-threaded" version="*" targetType "executable" mainSourceFile "tests/ut/ut_runner.d" - dflags "-dip1000" "-g" + dflags "-dip1000" "-g" "-dip1008" sourcePaths "source" "tests/ut" importPaths "source" "tests/ut" # buildOptions "unittests" "optimize" "inline" diff --git a/source/concurrency/error.d b/source/concurrency/error.d index 9324a46..908c29f 100644 --- a/source/concurrency/error.d +++ b/source/concurrency/error.d @@ -19,6 +19,8 @@ Throwable clone(Throwable t) nothrow @safe { return new ThrowableClone!RangeError(t.info, r.file, r.line, r.next); if (auto e = cast(Error)t) return new ThrowableClone!Error(t.info, t.msg, t.file, t.line, t.next); + if (auto e = cast(Exception)t) + return new ThrowableClone!Exception(t.info, t.msg, t.file, t.line, t.next); return new ThrowableClone!Throwable(t.info, t.msg, t.file, t.line, t.next); } @@ -53,3 +55,21 @@ class ClonedTraceInfo : Throwable.TraceInfo { return buf; } } + +Exception unscopeException(scope Exception e) @trusted nothrow { + if (e.refcount == 0) { + //TODO: what if exception is allocated on stack or TLS? + return cast(Exception)e; + } + + return cast(Exception)e.clone(); +} + +Throwable unscopeException(scope Throwable e) @trusted nothrow { + if (e.refcount == 0) { + //TODO: what if exception is allocated on stack or TLS? + return cast(Throwable)e; + } + + return e.clone(); +} diff --git a/source/concurrency/nursery.d b/source/concurrency/nursery.d index 8c8b0b1..bb9781c 100644 --- a/source/concurrency/nursery.d +++ b/source/concurrency/nursery.d @@ -105,7 +105,8 @@ class Nursery : StopSource { try { localReceiver.setValue(); } catch (Exception e) { - localReceiver.setError(e); + import concurrency.error; + localReceiver.setError(e.unscopeException); } } } diff --git a/source/concurrency/operations/onerror.d b/source/concurrency/operations/onerror.d index dfd56a4..08dca6f 100644 --- a/source/concurrency/operations/onerror.d +++ b/source/concurrency/operations/onerror.d @@ -34,7 +34,8 @@ private struct OnErrorReceiver(Value, SideEffect, Receiver) { try sideEffect(e); catch (Exception e2) { - receiver.setError(() @trusted { return Throwable.chainTogether(e2, e); } ()); + import concurrency.error; + receiver.setError(() @trusted { return Throwable.chainTogether(e2.unscopeException, e); } ()); return; } } diff --git a/source/concurrency/operations/onresult.d b/source/concurrency/operations/onresult.d index e474164..ac9c81f 100644 --- a/source/concurrency/operations/onresult.d +++ b/source/concurrency/operations/onresult.d @@ -43,8 +43,9 @@ private struct OnResultReceiver(Value, SideEffect, Receiver) { try { sideEffect(Result!(Value)(ex)); } catch (Exception e2) { + import concurrency.error; return receiver.setError(() @trusted { - return Throwable.chainTogether(e2, e); + return Throwable.chainTogether(e2.unscopeException, e); }()); } catch (Throwable t) { return receiver.setError(t); diff --git a/source/concurrency/operations/repeat.d b/source/concurrency/operations/repeat.d index d42cf9a..5f74d90 100644 --- a/source/concurrency/operations/repeat.d +++ b/source/concurrency/operations/repeat.d @@ -42,7 +42,8 @@ private struct RepeatOp(Receiver, Sender) { try { reset(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } private void reset() @trusted scope { diff --git a/source/concurrency/operations/retry.d b/source/concurrency/operations/retry.d index b45d4a0..8288aa6 100644 --- a/source/concurrency/operations/retry.d +++ b/source/concurrency/operations/retry.d @@ -59,7 +59,8 @@ private struct RetryReceiver(Receiver, Sender, Logic) { // From what I gathered that is what libunifex does sender.connectHeap(this).start(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } } diff --git a/source/concurrency/operations/toshared.d b/source/concurrency/operations/toshared.d index 96e9c78..7f8f35b 100644 --- a/source/concurrency/operations/toshared.d +++ b/source/concurrency/operations/toshared.d @@ -51,7 +51,8 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender, try { localState.op = sender.connect(SharedSenderReceiver!(Sender, Scheduler, resetLogic)(&state, scheduler)); } catch (Exception e) { - state.process!(resetLogic)(InternalValue(e)); + import concurrency.error; + state.process!(resetLogic)(InternalValue(e.unscopeException)); } localState.op.start(); } else { @@ -158,10 +159,12 @@ private struct SharedSenderOp(Sender, Scheduler, ResetLogic resetLogic, Receiver /// this onValue can sometimes be called immediately, /// leaving no room to set cb.dispose... cb.dispose(); - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } }, (Throwable e){ - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); }, (Done d){ receiver.setDone(); }); diff --git a/source/concurrency/receiver.d b/source/concurrency/receiver.d index 56ed378..259a7bf 100644 --- a/source/concurrency/receiver.d +++ b/source/concurrency/receiver.d @@ -72,7 +72,8 @@ void setValueOrError(Receiver)(auto ref Receiver receiver) @safe { try { receiver.setValue(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } } @@ -85,7 +86,8 @@ void setValueOrError(Receiver, T)(auto ref Receiver receiver, auto ref T value) try { receiver.setValue(value); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } } diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d index 9dfca8b..27f1969 100644 --- a/source/concurrency/scheduler.d +++ b/source/concurrency/scheduler.d @@ -77,7 +77,8 @@ struct SchedulerAdapter(Worker) { try { worker.schedule(cast(VoidDelegate)()=>receiver.setValueOrError()); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } } @@ -135,7 +136,8 @@ struct ScheduleAfterOp(Worker, Receiver) { try { timer = worker.addTimer(cast(void delegate(TimerTrigger) @safe shared)&trigger, dur); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); return; } diff --git a/source/concurrency/sender.d b/source/concurrency/sender.d index bbb6865..74c831d 100644 --- a/source/concurrency/sender.d +++ b/source/concurrency/sender.d @@ -137,7 +137,8 @@ struct JustFromSender(Fun) { try { set(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } } @@ -463,14 +464,15 @@ struct PromiseSenderOp(T, Receiver) { // calling the receiver's termination functions. if (cb) cb.dispose(); - value.match!((Sender.ValueRep v){ + value.match!((Sender.ValueRep v) @safe { + import concurrency.error; try { static if (is(Sender.Value == void)) receiver.setValue(); else receiver.setValue(v); } catch (Exception e) { - receiver.setError(e); + receiver.setError(e.unscopeException); } }, (Throwable e){ receiver.setError(e); diff --git a/source/concurrency/stream/package.d b/source/concurrency/stream/package.d index fb0d51d..30e38d7 100644 --- a/source/concurrency/stream/package.d +++ b/source/concurrency/stream/package.d @@ -120,14 +120,16 @@ auto intervalStream(Duration duration, bool emitAtStart = false) { } op.load(); } catch (Exception e) { - op.receiver.setError(e); + import concurrency.error; + op.receiver.setError(e.unscopeException); } } void setDone() @safe nothrow { op.receiver.setDone(); } void setError(Throwable t) @safe nothrow { - op.receiver.setError(t); + import concurrency.error; + op.receiver.setError(t.unscopeException); } auto getStopToken() @safe { return op.receiver.getStopToken(); @@ -165,7 +167,8 @@ auto intervalStream(Duration duration, bool emitAtStart = false) { } load(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } private void load() @trusted nothrow { @@ -173,7 +176,8 @@ auto intervalStream(Duration duration, bool emitAtStart = false) { op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this)); op.start(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } } } @@ -294,9 +298,10 @@ shared struct SharedStream(T) { try { dg(element); } catch (Exception e) { + import concurrency.error; source.remove(&this.onItem); cb.dispose(); - receiver.setError(e); + receiver.setError(e.unscopeException); } } } diff --git a/source/concurrency/stream/stream.d b/source/concurrency/stream/stream.d index fb98f54..6d3860e 100644 --- a/source/concurrency/stream/stream.d +++ b/source/concurrency/stream/stream.d @@ -118,7 +118,8 @@ template loopStream(E) { try { t.loop(dg, receiver.getStopToken); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } if (receiver.getStopToken().isStopRequested) receiver.setDone(); diff --git a/source/concurrency/stream/throttling.d b/source/concurrency/stream/throttling.d index ab0e38f..be00276 100644 --- a/source/concurrency/stream/throttling.d +++ b/source/concurrency/stream/throttling.d @@ -102,9 +102,10 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi dg(); return true; } catch (Exception e) { + import concurrency.error; with (flags.lock(ThrottleFlags.doneOrError_produced)) { if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { - throwable = e; + throwable = e.unscopeException; } release(); process(newState); @@ -136,9 +137,10 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi dg(t); return true; } catch (Exception e) { + import concurrency.error; with (flags.lock(ThrottleFlags.doneOrError_produced)) { if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { - throwable = e; + throwable = e.unscopeException; } release(); process(newState); @@ -198,7 +200,8 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi else dg(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); return; } } diff --git a/source/concurrency/syncwait.d b/source/concurrency/syncwait.d index 000816d..c67a6b2 100644 --- a/source/concurrency/syncwait.d +++ b/source/concurrency/syncwait.d @@ -32,7 +32,8 @@ package struct SyncWaitReceiver2(Value) { } void setError(Throwable e) nothrow @safe { - state.throwable = e; + import concurrency.error; + state.throwable = e.unscopeException; state.worker.stop(); } static if (is(Value == void)) diff --git a/source/concurrency/thread.d b/source/concurrency/thread.d index 93cabe0..c93c346 100644 --- a/source/concurrency/thread.d +++ b/source/concurrency/thread.d @@ -353,7 +353,8 @@ struct ThreadSender { try { receiver.setValue(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } catch (Throwable t) { receiver.setError(t.clone()); } @@ -435,7 +436,8 @@ private struct TaskPoolSender { try { receiver.setValue(); } catch (Exception e) { - receiver.setError(e); + import concurrency.error; + receiver.setError(e.unscopeException); } catch (Throwable t) { receiver.setError(t.clone); } @@ -454,7 +456,8 @@ private struct TaskPoolSender { try { pool.put(myTask); } catch (Exception e) { - myTask.args[0].setError(e); + import concurrency.error; + myTask.args[0].setError(e.unscopeException); } } } diff --git a/tests/ut/concurrency/operations.d b/tests/ut/concurrency/operations.d index 8e4ab84..5e571bf 100644 --- a/tests/ut/concurrency/operations.d +++ b/tests/ut/concurrency/operations.d @@ -72,6 +72,7 @@ struct OutOfBandValueSender(T) { @("race.exception.double") @safe unittest { + // TODO: this is flaky. Find another way. auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); }); auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); }); race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast"); diff --git a/tests/ut/concurrency/stream.d b/tests/ut/concurrency/stream.d index c17966a..c0b83a6 100644 --- a/tests/ut/concurrency/stream.d +++ b/tests/ut/concurrency/stream.d @@ -604,11 +604,11 @@ unittest { @("flatmap.concat.error") @safe unittest { - import concurrency.sender : just, ErrorSender; + import concurrency.sender : just, ThrowingSender; import concurrency.operations : via; [1,2,3].arrayStream - .flatMapConcat((int i) => ErrorSender()) + .flatMapConcat((int i) => ThrowingSender()) .collect(()shared{}) .syncWait .assumeOk @@ -659,11 +659,11 @@ unittest { @("flatmap.latest.error") @safe unittest { - import concurrency.sender : just, ErrorSender; + import concurrency.sender : just, ThrowingSender; import concurrency.operations : via; [1,2,3].arrayStream - .flatMapLatest((int i) => ErrorSender()) + .flatMapLatest((int i) => ThrowingSender()) .collect(()shared{}) .syncWait .assumeOk