Skip to content

Commit

Permalink
update claimed work stealing test
Browse files Browse the repository at this point in the history
Previously, the initial work item could be stolen before it ran,
leading to blocking because the work items were being processed in the
wrong threads. Now, both threads start with work that block, so the
first work items in each thread must run in the expected threads.
  • Loading branch information
calccrypto committed Sep 13, 2024
1 parent c85ce26 commit 456efe8
Showing 1 changed file with 49 additions and 30 deletions.
79 changes: 49 additions & 30 deletions test/unit/googletest/QueuePerThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,55 +539,74 @@ TEST(QueuePerThreadPool, steal_deferred) {
test_steal(QPTPool_enqueue_DEFERRED, true, 1);
}

TEST(QueuePerThreadPool, steal_active) {
std::size_t counter = 0;
TEST(QueuePerThreadPool, steal_claimed) {
struct work_item {
work_item(std::size_t &counter)
: mutex(PTHREAD_MUTEX_INITIALIZER),
tid(0),
counter(counter)
{}

// macOS doesn't seem to like std::mutex
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// macOS doesn't seem to like std::mutex
pthread_mutex_t mutex;
std::size_t tid;
std::size_t &counter;
};

QPTPool_t *pool = QPTPool_init(2, &mutex);
std::size_t counter = 0;
work_item wi[2] = {counter, counter};

QPTPool_t *pool = QPTPool_init(2, nullptr);
ASSERT_NE(pool, nullptr);
ASSERT_EQ(QPTPool_set_steal(pool, 1, 2), 0);

// prevent thread 0 from completing
pthread_mutex_lock(&mutex);

// this work item does not complete until the end
EXPECT_EQ(QPTPool_enqueue_here(pool, 0, QPTPool_enqueue_WAIT,
[](QPTPool_t *ctx, const std::size_t id, void *data, void *args) -> int {
pthread_mutex_t *mutex = static_cast<pthread_mutex_t *>(args);

increment_counter(ctx, id, data, args);

pthread_mutex_lock(mutex);
pthread_mutex_unlock(mutex);
return 0;
}, &counter), QPTPool_enqueue_WAIT);
for(std::size_t i = 0; i < 2; i++) {
work_item &w = wi[i];

// prevent first work item in each thread from completing
pthread_mutex_lock(&w.mutex);

// each work item should be run by the target thread because
// both threads start having work and both will block, so
// nothing should be stolen
EXPECT_EQ(QPTPool_enqueue_here(pool, i, QPTPool_enqueue_WAIT,
[](QPTPool_t *ctx, const std::size_t id, void *data, void *) -> int {
work_item *w = static_cast<work_item *>(data);
w->tid = id;
increment_counter(ctx, id, &w->counter, nullptr);

pthread_mutex_lock(&w->mutex);
pthread_mutex_unlock(&w->mutex);
return 0;
}, &w), QPTPool_enqueue_WAIT);
}

// trap this work item after the first work item
EXPECT_EQ(QPTPool_enqueue_here(pool, 0, QPTPool_enqueue_WAIT, increment_counter, &counter),
QPTPool_enqueue_WAIT);
EXPECT_EQ(QPTPool_enqueue_here(pool, 0, QPTPool_enqueue_WAIT, increment_counter, &counter),
QPTPool_enqueue_WAIT);

// start the thread pool, causing thread 0 to take all work items, but get stuck on the
// first work item, not letting the second work items run
ASSERT_EQ(QPTPool_start(pool), 0);

// need to make sure the first work item started
while (counter < 1) {
// need to make sure the first work item in each thread started
while (counter < 2) {
sched_yield();
}

// push to thread 1, so after it completes, it steals the first item from thread 0's active queue
EXPECT_EQ(QPTPool_enqueue_here(pool, 1, QPTPool_enqueue_WAIT, increment_counter, &counter),
QPTPool_enqueue_WAIT);
for(std::size_t i = 0; i < 2; i++) {
EXPECT_EQ(wi[i].tid, i);
}

// wait for thread 1's original work item and the stolen work item to finish
// let the work item in thread 1 complete so that thread 1
// steals the first item from thread 0's claimed queue
pthread_mutex_unlock(&wi[1].mutex);

// wait for the stolen work item to finish
while (counter < 3) {
sched_yield();
}

// let thread 0 finish
pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&wi[0].mutex);

QPTPool_stop(pool);

Expand Down

0 comments on commit 456efe8

Please sign in to comment.