From 0e36bb2ecb3cd7ef4a3b591630aa23220b181b27 Mon Sep 17 00:00:00 2001 From: lihuiba Date: Tue, 17 Dec 2024 16:34:40 +0800 Subject: [PATCH] semaphore plus: (1) add an option for in-order resume of threads (by default not); (2) avoid thundering herd of threads whose needs are not met; --- thread/thread.cpp | 62 ++++++++++++++++++++++++++--------------------- thread/thread.h | 13 +++++----- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/thread/thread.cpp b/thread/thread.cpp index a79a2a46..3d06790c 100644 --- a/thread/thread.cpp +++ b/thread/thread.cpp @@ -1749,44 +1749,52 @@ R"( { if (count == 0) return 0; splock.lock(); - CURRENT->semaphore_count = count; - int ret = 0; + DEFER(splock.unlock()); + auto& counter = CURRENT->semaphore_count; + counter = count; + DEFER(counter = 0); while (!try_subtract(count)) { - ret = waitq::wait_defer(timeout, spinlock_unlock, &splock); - ERRNO err; - splock.lock(); - if (ret < 0) { - CURRENT->semaphore_count = 0; - // when timeout, we need to try to resume next thread(s) in q - if (err.no == ETIMEDOUT) try_resume(); - splock.unlock(); - errno = err.no; + int ret = waitq::wait_defer(timeout, spinlock_unlock, &splock); + splock.lock(); // assuming errno NOT changed + if (unlikely(ret < 0)) { // got interrupted + uint64_t cnt; + if (!m_ooo_resume && (cnt = m_count.load())) { + auto eno = errno; + try_resume(cnt); + errno = eno; + } return ret; } } - try_resume(); - splock.unlock(); return 0; } - void semaphore::try_resume() - { - auto cnt = m_count.load(); - while(true) - { + void semaphore::try_resume(uint64_t cnt) { + assert(cnt); + while(true) { ScopedLockHead h(this); - if (!h) return; + if (!h) break; auto th = (thread*)h; - auto& qfcount = th->semaphore_count; - if (qfcount > cnt) break; - cnt -= qfcount; - qfcount = 0; + auto& c = th->semaphore_count; + if (c > cnt) break; + cnt -= c; prelocked_thread_interrupt(th, -1); } + if (!q.th || !cnt || !m_ooo_resume) + return; + SCOPED_LOCK(q.lock); + for (auto th = q.th->next(); + th!= q.th && cnt; + th = th->next()) { + SCOPED_LOCK(th->lock); + auto& c = th->semaphore_count; + if (c <= cnt) { + cnt -= c; + prelocked_thread_interrupt(th, -1); + } + } } - bool semaphore::try_subtract(uint64_t count) - { - while(true) - { + inline bool semaphore::try_subtract(uint64_t count) { + while(true) { auto mc = m_count.load(); if (mc < count) return false; diff --git a/thread/thread.h b/thread/thread.h index 59e9a7b8..e5766351 100644 --- a/thread/thread.h +++ b/thread/thread.h @@ -432,7 +432,8 @@ namespace photon class semaphore : protected waitq { public: - explicit semaphore(uint64_t count = 0) : m_count(count) { } + explicit semaphore(uint64_t count = 0, bool in_order_resume = true) + : m_count(count), m_ooo_resume(!in_order_resume) { } int wait(uint64_t count, Timeout timeout = {}) { int ret = 0; do { @@ -441,12 +442,11 @@ namespace photon return ret; } int wait_interruptible(uint64_t count, Timeout timeout = {}); - int signal(uint64_t count) - { + int signal(uint64_t count) { if (count == 0) return 0; SCOPED_LOCK(splock); - m_count.fetch_add(count); - resume_one(); + auto cnt = m_count.fetch_add(count) + count; + try_resume(cnt); return 0; } uint64_t count() const { @@ -455,9 +455,10 @@ namespace photon protected: std::atomic m_count; + bool m_ooo_resume; spinlock splock; bool try_subtract(uint64_t count); - void try_resume(); + void try_resume(uint64_t count); }; // to be different to timer flags