From c9fef6d80ac996243827f91d6b5bc14bd2477a44 Mon Sep 17 00:00:00 2001 From: Patrick Pelissier Date: Tue, 26 Mar 2024 23:11:57 +0100 Subject: [PATCH] Fix issues in reworked lock: * starving of writer threads by reader threads * monopolization of memory bus by reader threads. --- m-concurrent.h | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/m-concurrent.h b/m-concurrent.h index 3568e972..52d3485c 100644 --- a/m-concurrent.h +++ b/m-concurrent.h @@ -778,6 +778,7 @@ struct M_F(name, _s) *self; \ m_mutex_t lock; \ atomic_uint num_reader; /* number of reader threads */ \ + atomic_uint num_waiting_writer; /* number of waiting writer threads */ \ m_cond_t there_is_data; /* condition raised when there is data */ \ type data; \ } concurrent_t[1]; \ @@ -795,6 +796,7 @@ { \ m_mutex_init(out->lock); \ atomic_store(&out->num_reader, 0); \ + atomic_store(&out->num_waiting_writer, 0); \ m_cond_init(out->there_is_data); \ out->self = out; \ M_C0NCURRENT_CONTRACT(out); \ @@ -814,19 +816,28 @@ { \ M_C0NCURRENT_CONTRACT(out); \ struct M_F(name, _s) *self = out->self; \ + m_core_backoff_ct backoff; \ + m_core_backoff_init(backoff); \ while (true) { \ unsigned int num = atomic_load(&self->num_reader); \ - /* FIXME: this lock is unfair for writers as readers may starve then */ \ + /* To avoid reader threads that starve writer threads \ + Disable sharing of lock for reader threads if some writer thread \ + waits */ \ + /* Otherwise try to reuse the lock opened by another reader thread */ \ if (num != 0 \ + && atomic_load(&self->num_waiting_writer) == 0 \ && atomic_compare_exchange_strong(&self->num_reader, &num, num+1)) { \ break; \ } \ + /* Otherwise try to get the lock ourself */ \ if (m_mutex_trylock(self->lock)) { \ atomic_store(&self->num_reader, 1); \ break; \ } \ - /* TODO: exponential backoff */ \ + /* Perform exponential backoff to avoid monopolizing the memory bus */ \ + m_core_backoff_wait(backoff); \ } \ + m_core_backoff_clear(backoff); \ } \ \ M_INLINE void \ @@ -836,6 +847,7 @@ struct M_F(name, _s) *self = out->self; \ unsigned int num = atomic_fetch_sub(&self->num_reader, 1); \ if (num == 1) { \ + /* We are the last reader thread: release the lock */ \ m_mutex_unlock(self->lock); \ } \ } \ @@ -844,7 +856,9 @@ M_F(name, _write_lock)(concurrent_t out) \ { \ M_C0NCURRENT_CONTRACT(out); \ + atomic_fetch_add(&out->num_waiting_writer, 1); \ m_mutex_lock (out->lock); \ + atomic_fetch_sub(&out->num_waiting_writer, 1); \ M_ASSERT(atomic_load(&out->num_reader) == 0); \ } \ \