Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

semaphore plus #667

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1749,44 +1749,52 @@ R"(
{
if (count == 0) return 0;
splock.lock();
CURRENT->semaphore_count = count;
int ret = 0;
DEFER(splock.unlock());
auto& counter = CURRENT->semaphore_count;
counter = count;
DEFER(counter = 0);
while (!try_subtract(count)) {
ret = waitq::wait_defer(timeout, spinlock_unlock, &splock);
ERRNO err;
splock.lock();
if (ret < 0) {
CURRENT->semaphore_count = 0;
// when timeout, we need to try to resume next thread(s) in q
if (err.no == ETIMEDOUT) try_resume();
splock.unlock();
errno = err.no;
int ret = waitq::wait_defer(timeout, spinlock_unlock, &splock);
splock.lock(); // assuming errno NOT changed
if (unlikely(ret < 0)) { // got interrupted
uint64_t cnt;
if (!m_ooo_resume && (cnt = m_count.load())) {
auto eno = errno;
try_resume(cnt);
errno = eno;
}
return ret;
}
}
try_resume();
splock.unlock();
return 0;
}
void semaphore::try_resume()
{
auto cnt = m_count.load();
while(true)
{
void semaphore::try_resume(uint64_t cnt) {
assert(cnt);
while(true) {
ScopedLockHead h(this);
if (!h) return;
if (!h) break;
auto th = (thread*)h;
auto& qfcount = th->semaphore_count;
if (qfcount > cnt) break;
cnt -= qfcount;
qfcount = 0;
auto& c = th->semaphore_count;
if (c > cnt) break;
cnt -= c;
prelocked_thread_interrupt(th, -1);
}
if (!q.th || !cnt || !m_ooo_resume)
return;
SCOPED_LOCK(q.lock);
for (auto th = q.th->next();
th!= q.th && cnt;
th = th->next()) {
SCOPED_LOCK(th->lock);
auto& c = th->semaphore_count;
if (c <= cnt) {
cnt -= c;
prelocked_thread_interrupt(th, -1);
}
}
}
bool semaphore::try_subtract(uint64_t count)
{
while(true)
{
inline bool semaphore::try_subtract(uint64_t count) {
while(true) {
auto mc = m_count.load();
if (mc < count)
return false;
Expand Down
13 changes: 7 additions & 6 deletions thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ namespace photon
class semaphore : protected waitq
{
public:
explicit semaphore(uint64_t count = 0) : m_count(count) { }
explicit semaphore(uint64_t count = 0, bool in_order_resume = true)
: m_count(count), m_ooo_resume(!in_order_resume) { }
int wait(uint64_t count, Timeout timeout = {}) {
int ret = 0;
do {
Expand All @@ -441,12 +442,11 @@ namespace photon
return ret;
}
int wait_interruptible(uint64_t count, Timeout timeout = {});
int signal(uint64_t count)
{
int signal(uint64_t count) {
if (count == 0) return 0;
SCOPED_LOCK(splock);
m_count.fetch_add(count);
resume_one();
auto cnt = m_count.fetch_add(count) + count;
try_resume(cnt);
return 0;
}
uint64_t count() const {
Expand All @@ -455,9 +455,10 @@ namespace photon

protected:
std::atomic<uint64_t> m_count;
bool m_ooo_resume;
spinlock splock;
bool try_subtract(uint64_t count);
void try_resume();
void try_resume(uint64_t count);
};

// to be different to timer flags
Expand Down
Loading