From 21c1bca3d063237d31dabe63dc7319445d65a6b9 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Sat, 18 Jun 2022 16:52:28 +0200 Subject: [PATCH 1/3] Make StopToken shared --- source/concurrency/stoptoken.d | 85 ++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/source/concurrency/stoptoken.d b/source/concurrency/stoptoken.d index 9c2602a..9b3978d 100644 --- a/source/concurrency/stoptoken.d +++ b/source/concurrency/stoptoken.d @@ -4,7 +4,7 @@ module concurrency.stoptoken; // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 class StopSource { - private stop_state state; + private shared stop_state state; bool stop() nothrow @safe { return state.request_stop(); } @@ -51,28 +51,23 @@ struct NeverStopToken { enum isStopPossible = false; } -StopCallback onStop(StopSource stopSource, void delegate() nothrow @safe shared callback) nothrow @safe { - auto cb = new StopCallback(callback); - if (stopSource.state.try_add_callback(cb, true)) - cb.source = stopSource; - return cb; -} - -StopCallback onStop(StopSource stopSource, void function() nothrow @safe callback) nothrow @trusted { - import std.functional : toDelegate; - return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); -} - -StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe { - if (stopToken.isStopPossible) { - return stopToken.source.onStop(callback); +StopCallback onStop(Source)(Source stopSource, void delegate() nothrow @safe shared callback) nothrow @trusted { + static if (is(shared Source == shared StopSource)) { + auto cb = new StopCallback(callback); + if (stopSource.state.try_add_callback(cb, true)) + cb.source = cast(shared)stopSource; + return cb; + } else { + if (stopSource.isStopPossible) { + return stopSource.source.onStop(callback); + } + return new StopCallback(callback); } - return new StopCallback(callback); } -StopCallback onStop(StopToken)(StopToken stopToken, void function() nothrow @safe callback) nothrow @trusted { +StopCallback onStop(Source)(Source stopSource, void function() nothrow @safe callback) nothrow @trusted { import std.functional : toDelegate; - return stopToken.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); + return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); } class StopCallback { @@ -105,16 +100,20 @@ private: } void delegate() nothrow shared @safe callback; - StopSource source; + shared StopSource source; - StopCallback next_ = null; - StopCallback* prev_ = null; - bool* isRemoved_ = null; + shared StopCallback next_ = null; + shared StopCallback* prev_ = null; + shared bool* isRemoved_ = null; shared bool callbackFinishedExecuting = false; void execute() nothrow @safe { callback(); } + + void execute() nothrow @safe shared { + callback(); + } } deprecated("Use regular StopToken") alias StopTokenObject = StopToken; @@ -155,31 +154,31 @@ private struct stop_state { } public: - void add_token_reference() nothrow @safe @nogc { + void add_token_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchAdd but (proper) support is only recent // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); state_.atomicOp!"+="(token_ref_increment); } - void remove_token_reference() nothrow @safe @nogc { + void remove_token_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); state_.atomicOp!"-="(token_ref_increment); } - void add_source_reference() nothrow @safe @nogc { + void add_source_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchAdd but (proper) support is only recent // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); state_.atomicOp!"+="(source_ref_increment); } - void remove_source_reference() nothrow @safe @nogc { + void remove_source_reference() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); state_.atomicOp!"-="(source_ref_increment); } - bool request_stop() nothrow @safe { + bool request_stop() nothrow @safe shared { if (!try_lock_and_signal_until_signalled()) { // Stop has already been requested. @@ -188,7 +187,7 @@ public: // Set the 'stop_requested' signal and acquired the lock. - signallingThread_ = Thread.getThis(); + signallingThread_ = getThis(); while (head_ !is null) { // Dequeue the head of the queue @@ -212,7 +211,7 @@ public: // If the destructor runs on some other thread then the other // thread will block waiting for this thread to signal that the // callback has finished executing. - bool isRemoved = false; + shared bool isRemoved = false; (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. cb.execute(); @@ -238,15 +237,15 @@ public: return true; } - bool is_stop_requested() nothrow @safe @nogc { + bool is_stop_requested() nothrow @safe @nogc shared { return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); } - bool is_stop_requestable() nothrow @safe @nogc { + bool is_stop_requestable() nothrow @safe @nogc shared { return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); } - bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe { + bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe shared { ulong oldState; do { goto load_state; @@ -272,7 +271,7 @@ public: cb.next_.prev_ = &cb.next_; } () @trusted { cb.prev_ = &head_; } (); - head_ = cb; + () @trusted { head_ = cast(shared)cb; } (); if (incrementRefCountIfSuccessful) { unlock_and_increment_token_ref_count(); @@ -285,7 +284,7 @@ public: return true; } - void remove_callback(StopCallback cb) nothrow @safe @nogc { + void remove_callback(StopCallback cb) nothrow @safe @nogc shared { lock(); if (cb.prev_ !is null) { @@ -306,7 +305,7 @@ public: // Callback has either already executed or is executing // concurrently on another thread. - if (signallingThread_ is Thread.getThis()) { + if (signallingThread_ is getThis()) { // Callback executed on this thread or is still currently executing // and is deregistering itself from within the callback. if (cb.isRemoved_ !is null) { @@ -342,7 +341,11 @@ private: return is_stop_requested(state) || (state >= source_ref_increment); } - bool try_lock_and_signal_until_signalled() nothrow @safe @nogc { + shared(Thread) getThis() @trusted @nogc nothrow shared { + return cast(shared)Thread.getThis(); + } + + bool try_lock_and_signal_until_signalled() nothrow @safe @nogc shared { ulong oldState; do { oldState = state_.atomicLoad!(MemoryOrder.acq); @@ -360,7 +363,7 @@ private: return true; } - void lock() nothrow @safe @nogc { + void lock() nothrow @safe @nogc shared { ulong oldState; do { oldState = state_.atomicLoad!(MemoryOrder.raw); @@ -373,19 +376,19 @@ private: oldState | locked_flag)); } - void unlock() nothrow @safe @nogc { + void unlock() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); state_.atomicOp!"-="(locked_flag); } - void unlock_and_increment_token_ref_count() nothrow @safe @nogc { + void unlock_and_increment_token_ref_count() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); state_.atomicOp!"-="(locked_flag - token_ref_increment); } - void unlock_and_decrement_token_ref_count() nothrow @safe @nogc { + void unlock_and_decrement_token_ref_count() nothrow @safe @nogc shared { // TODO: want to use atomicFetchSub but (proper) support is only recent // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); state_.atomicOp!"-="(locked_flag + token_ref_increment); From a9bb8c0f8aec7fb0ba145c97337e800d91b40177 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Sat, 18 Jun 2022 16:53:34 +0200 Subject: [PATCH 2/3] Have asyncScope stop on StopSource trigger --- source/concurrency/asyncscope.d | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/concurrency/asyncscope.d b/source/concurrency/asyncscope.d index 98d1cb0..382165f 100644 --- a/source/concurrency/asyncscope.d +++ b/source/concurrency/asyncscope.d @@ -23,6 +23,7 @@ private: shared Promise!void completion; shared StopSource stopSource; Throwable throwable; + shared StopCallback cb; void forward() @trusted nothrow shared { import core.atomic : atomicLoad; @@ -61,9 +62,10 @@ public: cleanup.syncWait(); } - this(shared StopSource stopSource) @safe shared { + this(shared StopSource stopSource) @trusted shared { completion = new shared Promise!void; this.stopSource = stopSource; + cb = cast(shared)this.stopSource.onStop(() @safe shared nothrow => cast(void)this.stop()); } auto cleanup() @safe shared { @@ -76,6 +78,7 @@ public: } bool stop() nothrow @trusted shared { + cb.dispose(); import core.atomic : MemoryOrder; if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0) return false; From 0e6739bcf0cb3d561038d5b168562c4c4fa48862 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Sat, 18 Jun 2022 17:08:06 +0200 Subject: [PATCH 3/3] add onComplete to asyncScope --- source/concurrency/asyncscope.d | 10 ++++++++++ tests/ut/concurrency/asyncscope.d | 27 +++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/source/concurrency/asyncscope.d b/source/concurrency/asyncscope.d index 382165f..8c311fa 100644 --- a/source/concurrency/asyncscope.d +++ b/source/concurrency/asyncscope.d @@ -14,6 +14,12 @@ auto asyncScope() @safe { return as; } +auto asyncScope(shared StopSource source) @safe { + // ensure NRVO + auto as = shared AsyncScope(source); + return as; +} + struct AsyncScope { private: import concurrency.bitfield : SharedBitField; @@ -70,6 +76,10 @@ public: auto cleanup() @safe shared { stop(); + return onComplete(); + } + + auto onComplete() @safe shared { return completion.sender(); } diff --git a/tests/ut/concurrency/asyncscope.d b/tests/ut/concurrency/asyncscope.d index 6218585..5202264 100644 --- a/tests/ut/concurrency/asyncscope.d +++ b/tests/ut/concurrency/asyncscope.d @@ -182,3 +182,30 @@ auto waitingTask() { s.spawn(VoidSender().withScheduler(localThreadScheduler)); s.cleanup.syncWait.assumeOk; } + +@("onComplete.inline") +@safe unittest { + auto s = asyncScope(); + + s.stop(); + s.onComplete().syncWait.assumeOk; +} + +@("onComplete.wait") +@safe unittest { + import concurrency.thread : ThreadSender; + import concurrency.stoptoken : StopSource; + import concurrency.operations : then; + + auto source = new shared StopSource(); + auto s = asyncScope(source); + + s.spawn(ThreadSender().then(() shared @trusted { + import core.thread : Thread; + import core.time : msecs; + Thread.sleep(10.msecs); + source.stop(); + })); + + s.onComplete().syncWait.assumeOk; +}