diff --git a/source/concurrency/asyncscope.d b/source/concurrency/asyncscope.d index 98d1cb0..8c311fa 100644 --- a/source/concurrency/asyncscope.d +++ b/source/concurrency/asyncscope.d @@ -14,6 +14,12 @@ auto asyncScope() @safe { return as; } +auto asyncScope(shared StopSource source) @safe { + // ensure NRVO + auto as = shared AsyncScope(source); + return as; +} + struct AsyncScope { private: import concurrency.bitfield : SharedBitField; @@ -23,6 +29,7 @@ private: shared Promise!void completion; shared StopSource stopSource; Throwable throwable; + shared StopCallback cb; void forward() @trusted nothrow shared { import core.atomic : atomicLoad; @@ -61,13 +68,18 @@ public: cleanup.syncWait(); } - this(shared StopSource stopSource) @safe shared { + this(shared StopSource stopSource) @trusted shared { completion = new shared Promise!void; this.stopSource = stopSource; + cb = cast(shared)this.stopSource.onStop(() @safe shared nothrow => cast(void)this.stop()); } auto cleanup() @safe shared { stop(); + return onComplete(); + } + + auto onComplete() @safe shared { return completion.sender(); } @@ -76,6 +88,7 @@ public: } bool stop() nothrow @trusted shared { + cb.dispose(); import core.atomic : MemoryOrder; if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0) return false; diff --git a/source/concurrency/stoptoken.d b/source/concurrency/stoptoken.d index 9c2602a..9b3978d 100644 --- a/source/concurrency/stoptoken.d +++ b/source/concurrency/stoptoken.d @@ -4,7 +4,7 @@ module concurrency.stoptoken; // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 class StopSource { - private stop_state state; + private shared stop_state state; bool stop() nothrow @safe { return state.request_stop(); } @@ -51,28 +51,23 @@ struct NeverStopToken { enum isStopPossible = false; } -StopCallback onStop(StopSource stopSource, void delegate() nothrow @safe shared callback) nothrow @safe { - auto cb = new StopCallback(callback); - if (stopSource.state.try_add_callback(cb, true)) - cb.source = stopSource; - return cb; -} - -StopCallback onStop(StopSource stopSource, void function() nothrow @safe callback) nothrow @trusted { - import std.functional : toDelegate; - return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); -} - -StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe { - if (stopToken.isStopPossible) { - return stopToken.source.onStop(callback); +StopCallback onStop(Source)(Source stopSource, void delegate() nothrow @safe shared callback) nothrow @trusted { + static if (is(shared Source == shared StopSource)) { + auto cb = new StopCallback(callback); + if (stopSource.state.try_add_callback(cb, true)) + cb.source = cast(shared)stopSource; + return cb; + } else { + if (stopSource.isStopPossible) { + return stopSource.source.onStop(callback); + } + return new StopCallback(callback); } - return new StopCallback(callback); } -StopCallback onStop(StopToken)(StopToken stopToken, void function() nothrow @safe callback) nothrow @trusted { +StopCallback onStop(Source)(Source stopSource, void function() nothrow @safe callback) nothrow @trusted { import std.functional : toDelegate; - return stopToken.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); + return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); } class StopCallback { @@ -105,16 +100,20 @@ private: } void delegate() nothrow shared @safe callback; - StopSource source; + shared StopSource source; - StopCallback next_ = null; - StopCallback* prev_ = null; - bool* isRemoved_ = null; + shared StopCallback next_ = null; + shared StopCallback* prev_ = null; + shared bool* isRemoved_ = null; shared bool callbackFinishedExecuting = false; void execute() nothrow @safe { callback(); } + + void execute() nothrow @safe shared { + callback(); + } } deprecated("Use regular StopToken") alias StopTokenObject = StopToken; @@ -155,31 +154,31 @@ private struct stop_state { } public: - void add_token_reference() nothrow @safe @nogc { + void add_token_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchAdd but (proper) support is only recent // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); state_.atomicOp!"+="(token_ref_increment); } - void remove_token_reference() nothrow @safe @nogc { + void remove_token_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); state_.atomicOp!"-="(token_ref_increment); } - void add_source_reference() nothrow @safe @nogc { + void add_source_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchAdd but (proper) support is only recent // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); state_.atomicOp!"+="(source_ref_increment); } - void remove_source_reference() nothrow @safe @nogc { + void remove_source_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); state_.atomicOp!"-="(source_ref_increment); } - bool request_stop() nothrow @safe { + bool request_stop() nothrow @safe shared { if (!try_lock_and_signal_until_signalled()) { // Stop has already been requested. @@ -188,7 +187,7 @@ public: // Set the 'stop_requested' signal and acquired the lock. - signallingThread_ = Thread.getThis(); + signallingThread_ = getThis(); while (head_ !is null) { // Dequeue the head of the queue @@ -212,7 +211,7 @@ public: // If the destructor runs on some other thread then the other // thread will block waiting for this thread to signal that the // callback has finished executing. - bool isRemoved = false; + shared bool isRemoved = false; (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. cb.execute(); @@ -238,15 +237,15 @@ public: return true; } - bool is_stop_requested() nothrow @safe @nogc { + bool is_stop_requested() nothrow @safe @nogc shared { return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); } - bool is_stop_requestable() nothrow @safe @nogc { + bool is_stop_requestable() nothrow @safe @nogc shared { return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); } - bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe { + bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe shared { ulong oldState; do { goto load_state; @@ -272,7 +271,7 @@ public: cb.next_.prev_ = &cb.next_; } () @trusted { cb.prev_ = &head_; } (); - head_ = cb; + () @trusted { head_ = cast(shared)cb; } (); if (incrementRefCountIfSuccessful) { unlock_and_increment_token_ref_count(); @@ -285,7 +284,7 @@ public: return true; } - void remove_callback(StopCallback cb) nothrow @safe @nogc { + void remove_callback(StopCallback cb) nothrow @safe @nogc shared { lock(); if (cb.prev_ !is null) { @@ -306,7 +305,7 @@ public: // Callback has either already executed or is executing // concurrently on another thread. - if (signallingThread_ is Thread.getThis()) { + if (signallingThread_ is getThis()) { // Callback executed on this thread or is still currently executing // and is deregistering itself from within the callback. if (cb.isRemoved_ !is null) { @@ -342,7 +341,11 @@ private: return is_stop_requested(state) || (state >= source_ref_increment); } - bool try_lock_and_signal_until_signalled() nothrow @safe @nogc { + shared(Thread) getThis() @trusted @nogc nothrow shared { + return cast(shared)Thread.getThis(); + } + + bool try_lock_and_signal_until_signalled() nothrow @safe @nogc shared { ulong oldState; do { oldState = state_.atomicLoad!(MemoryOrder.acq); @@ -360,7 +363,7 @@ private: return true; } - void lock() nothrow @safe @nogc { + void lock() nothrow @safe @nogc shared { ulong oldState; do { oldState = state_.atomicLoad!(MemoryOrder.raw); @@ -373,19 +376,19 @@ private: oldState | locked_flag)); } - void unlock() nothrow @safe @nogc { + void unlock() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); state_.atomicOp!"-="(locked_flag); } - void unlock_and_increment_token_ref_count() nothrow @safe @nogc { + void unlock_and_increment_token_ref_count() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); state_.atomicOp!"-="(locked_flag - token_ref_increment); } - void unlock_and_decrement_token_ref_count() nothrow @safe @nogc { + void unlock_and_decrement_token_ref_count() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); state_.atomicOp!"-="(locked_flag + token_ref_increment); diff --git a/tests/ut/concurrency/asyncscope.d b/tests/ut/concurrency/asyncscope.d index 6218585..5202264 100644 --- a/tests/ut/concurrency/asyncscope.d +++ b/tests/ut/concurrency/asyncscope.d @@ -182,3 +182,30 @@ auto waitingTask() { s.spawn(VoidSender().withScheduler(localThreadScheduler)); s.cleanup.syncWait.assumeOk; } + +@("onComplete.inline") +@safe unittest { + auto s = asyncScope(); + + s.stop(); + s.onComplete().syncWait.assumeOk; +} + +@("onComplete.wait") +@safe unittest { + import concurrency.thread : ThreadSender; + import concurrency.stoptoken : StopSource; + import concurrency.operations : then; + + auto source = new shared StopSource(); + auto s = asyncScope(source); + + s.spawn(ThreadSender().then(() shared @trusted { + import core.thread : Thread; + import core.time : msecs; + Thread.sleep(10.msecs); + source.stop(); + })); + + s.onComplete().syncWait.assumeOk; +}