Skip to content

Commit

Permalink
58 add futex support for coroutine (#60)
Browse files Browse the repository at this point in the history
* add babylon::coroutine::Futex
  • Loading branch information
oathdruid authored Oct 11, 2024
1 parent cbf3f2a commit 8568513
Show file tree
Hide file tree
Showing 12 changed files with 887 additions and 37 deletions.
137 changes: 131 additions & 6 deletions src/babylon/concurrent/deposit_box.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,46 @@

BABYLON_NAMESPACE_BEGIN

// DepositBox is a container design for multiple visitor compete ownership for
// item put into it beforehand
template <typename T>
class DepositBox {
public:
class Accessor;

inline static DepositBox& instance() noexcept;

// Construct a new item as T(args...) and get an id back. This id is used by
// take function, to retrieve the item back, like a receipt.
//
// Object is reused between emplace -> take -> emplace cycle. Each time
// reused, an id of same value but different version is returned. An already
// taken id of old version is safely used for take function, and ensured to
// get empty accessor back no matter the same slot is resued or not.
template <typename... Args>
inline VersionedValue<uint32_t> emplace(Args&&... args) noexcept;

inline ::absl::optional<T> take(VersionedValue<uint32_t> id) noexcept;
// Get an accessor by id returned by emplace. Same id can be safe to called
// multiple times from different thread concurrently. Only the first call will
// get a valid accessor back. Later ones all get empty accessor, even after
// the slot is released and reused again.
//
// Slot to store item is keep valid in accesor's scope. After the accessor
// is destructed, slot and stored item is recycled and should not be
// accessed again for this round.
inline Accessor take(VersionedValue<uint32_t> id) noexcept;

// Used to retrieve item back by id just like take function, in a non RAII
// way. Useful when take in batch.
inline T* take_released(VersionedValue<uint32_t> id) noexcept;
inline void finish_released(VersionedValue<uint32_t> id) noexcept;

// The **unsafe** way to get item instead of take it. Caller must ensure that
// no other visitor could exist right now.
//
// Useful when you need to modify item just after emplace, before the id
// is shared to others.
inline T& unsafe_get(VersionedValue<uint32_t> id) noexcept;

private:
struct Slot {
Expand All @@ -33,9 +64,86 @@ class DepositBox {
ConcurrentVector<Slot> _slots;
};

template <typename T>
class DepositBox<T>::Accessor {
public:
inline Accessor() noexcept = default;
inline Accessor(Accessor&&) noexcept;
inline Accessor(const Accessor&) noexcept = delete;
inline Accessor& operator=(Accessor&&) noexcept;
inline Accessor& operator=(const Accessor&) noexcept = delete;
inline ~Accessor() noexcept;

inline operator bool() const noexcept;
inline T* operator->() noexcept;
inline T& operator*() noexcept;

private:
inline Accessor(DepositBox* box, T* object,
VersionedValue<uint32_t> id) noexcept;

DepositBox* _box {nullptr};
T* _object {nullptr};
VersionedValue<uint32_t> _id {UINT64_MAX};

friend DepositBox;
};

////////////////////////////////////////////////////////////////////////////////
// DepositBox::Accessor begin
template <typename T>
inline DepositBox<T>::Accessor::Accessor(Accessor&& other) noexcept
: Accessor {other._box, ::std::exchange(other._object, nullptr),
other._id} {}

template <typename T>
inline typename DepositBox<T>::Accessor& DepositBox<T>::Accessor::operator=(
Accessor&& other) noexcept {
::std::swap(_box, other._box);
::std::swap(_object, other._object);
::std::swap(_id, other._id);
return *this;
}

template <typename T>
inline DepositBox<T>::Accessor::~Accessor() noexcept {
if (_object) {
_box->finish_released(_id);
}
}

template <typename T>
inline DepositBox<T>::Accessor::operator bool() const noexcept {
return _object;
}

template <typename T>
inline T* DepositBox<T>::Accessor::operator->() noexcept {
return _object;
}

template <typename T>
inline T& DepositBox<T>::Accessor::operator*() noexcept {
return *_object;
}

template <typename T>
inline DepositBox<T>::Accessor::Accessor(DepositBox* box, T* object,
VersionedValue<uint32_t> id) noexcept
: _box {box}, _object {object}, _id {id} {}
// DepositBox::Accessor end
////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////////////////////////////////////////////
// DepositBox begin
template <typename T>
inline DepositBox<T>& DepositBox<T>::instance() noexcept {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wunknown-warning-option"
#pragma GCC diagnostic ignored "-Wexit-time-destructors"
static DepositBox<T> object;
#pragma GCC diagnostic pop
return object;
}

Expand All @@ -51,16 +159,33 @@ inline VersionedValue<uint32_t> DepositBox<T>::emplace(
}

template <typename T>
inline ::absl::optional<T> DepositBox<T>::take(
inline typename DepositBox<T>::Accessor DepositBox<T>::take(
VersionedValue<uint32_t> id) noexcept {
return {this, take_released(id), id};
}

template <typename T>
inline T* DepositBox<T>::take_released(VersionedValue<uint32_t> id) noexcept {
auto& slot = _slots.ensure(id.value);
if (slot.version.compare_exchange_strong(id.version, id.version + 1,
::std::memory_order_relaxed)) {
::absl::optional<T> result = ::std::move(slot.object);
_slot_id_allocator.deallocate(id.value);
return result;
return &*(slot.object);
}
return {};
return nullptr;
}

template <typename T>
inline void DepositBox<T>::finish_released(
VersionedValue<uint32_t> id) noexcept {
_slot_id_allocator.deallocate(id.value);
}

template <typename T>
inline T& DepositBox<T>::unsafe_get(VersionedValue<uint32_t> id) noexcept {
auto& slot = _slots.ensure(id.value);
return *slot.object;
}
// DepositBox end
////////////////////////////////////////////////////////////////////////////////

BABYLON_NAMESPACE_END
15 changes: 14 additions & 1 deletion src/babylon/coroutine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load('//:copts.bzl', 'BABYLON_COPTS')
cc_library(
name = 'coroutine',
deps = [
'cancelable', ':promise', ':task', 'traits',
'cancelable', ':futex', ':promise', ':task', 'traits',
],
)

Expand All @@ -17,6 +17,19 @@ cc_library(
copts = BABYLON_COPTS,
includes = ['//src'],
strip_include_prefix = '//src',
deps = [
':task',
'//src/babylon/concurrent:deposit_box',
],
)

cc_library(
name = 'futex',
srcs = ['futex.cpp'],
hdrs = ['futex.h'],
copts = BABYLON_COPTS,
includes = ['//src'],
strip_include_prefix = '//src',
deps = [
':promise',
'//src/babylon/concurrent:deposit_box',
Expand Down
50 changes: 41 additions & 9 deletions src/babylon/coroutine/cancelable.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class BasicCancellable {
struct Optional;
template <typename T>
using OptionalType = typename Optional<T>::type;
class Cancellation;

// Setup a coroutine_handle as a callback, will be resumed after finish or
// cancel.
Expand Down Expand Up @@ -62,7 +63,7 @@ struct BasicCancellable::Optional<void> {

// Handle get back from Cancellable::on_suspend, Use operator() to trigger
// cancellation.
class Cancellation {
class BasicCancellable::Cancellation {
public:
inline Cancellation() noexcept = default;
inline Cancellation(Cancellation&&) noexcept = default;
Expand All @@ -83,6 +84,28 @@ class Cancellation {
};

// Wrap an awaitable A to make it cancelable.
//
// Cancellable<A> can be co_await-ed just like A it self, but get an optional<T>
// instead. Additionally, a callback can be registered with on_suspend. When
// suspend happen in co_await, this callback will be called with a cancellation
// token.
//
// This cancellation token can be saved and used later to resume that
// suspension, before the inner awaitable A finished. If resumed by
// cancellation, the result optional<T> is empty.
//
// Also, it is safe to do cancellation after inner awaitable A finished. The
// resumption will happen only once.
//
// The typical usage of Cancellable is to add timeout support for co_await, by
// sending the cancellation token to a timer. E.g.
// replace:
// T result = co_await awaitable;
// to:
// optional<T> result = co_await
// Cancellable<A>(awaitable).on_suspend([](Cancellation token) {
// on_timer(token, 100ms);
// });
template <typename A>
class Cancellable : public BasicCancellable {
public:
Expand All @@ -91,11 +114,19 @@ class Cancellable : public BasicCancellable {

inline explicit Cancellable(A awaitable) noexcept;

// Called as callable(Cancellation) when co_await suspend current coroutine.
// Received Cancellation can be invoked to cancel this co_await, at **any**
// time, even inside the callback it self or long after awaitable finished.
//
// Usually it can be send to a timer and be called unconditionally after
// arbitrary period. The co_await will canceled if not finished after that
// period, or the cancellation will just no-op.
template <typename C>
inline Cancellable<A>& on_suspend(C&& callable) & noexcept;
template <typename C>
inline Cancellable<A>&& on_suspend(C&& callable) && noexcept;

// Cancellable it self is awaitable by proxy to awaitable A inside.
inline constexpr bool await_ready() noexcept;
template <typename P>
requires(::std::is_base_of<BasicCoroutinePromise, P>::value)
Expand Down Expand Up @@ -138,18 +169,18 @@ inline BasicCoroutinePromise* BasicCancellable::proxy_promise() noexcept {
}

inline bool BasicCancellable::cancel(VersionedValue<uint32_t> id) noexcept {
auto taken = DepositBox<BasicCancellable*>::instance().take(id);
if (taken) {
(*taken)->do_cancel();
auto accessor = DepositBox<BasicCancellable*>::instance().take(id);
if (accessor) {
(*accessor)->do_cancel();
return true;
}
return false;
}

inline bool BasicCancellable::resume(VersionedValue<uint32_t> id) noexcept {
auto taken = DepositBox<BasicCancellable*>::instance().take(id);
if (taken) {
(*taken)->do_resume();
auto accessor = DepositBox<BasicCancellable*>::instance().take(id);
if (accessor) {
(*accessor)->do_resume();
return true;
}
return false;
Expand All @@ -172,11 +203,12 @@ inline void BasicCancellable::do_resume() noexcept {

////////////////////////////////////////////////////////////////////////////////
// Cancellation begin
inline bool Cancellation::operator()() const noexcept {
inline bool BasicCancellable::Cancellation::operator()() const noexcept {
return BasicCancellable::cancel(_id);
}

inline Cancellation::Cancellation(VersionedValue<uint32_t> id) noexcept
inline BasicCancellable::Cancellation::Cancellation(
VersionedValue<uint32_t> id) noexcept
: _id {id} {}
// Cancellation end
////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 8568513

Please sign in to comment.