Skip to content

Commit

Permalink
QPTPool active -> claimed
Browse files Browse the repository at this point in the history
  • Loading branch information
calccrypto committed Aug 22, 2024
1 parent 0d4dc02 commit 14020b8
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions src/QueuePerThreadPool.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ typedef enum {

/* The context for a single thread in QPTPool */
typedef struct QPTPoolThreadData {
sll_t active; /* work items that have already been claimed by this thread */
pthread_mutex_t active_mutex;
sll_t claimed; /* work items that have already been claimed by this thread */
pthread_mutex_t claimed_mutex;

sll_t waiting; /* generally push into this queue */
sll_t deferred; /* push into here if waiting queue is too big; pop when waiting queue is empty */
Expand Down Expand Up @@ -185,7 +185,7 @@ static uint64_t steal_work(QPTPool_t *ctx, const size_t id,
* waiting and deferred queues have been checked and found to be
* empty, and so should not be called frequently.
*/
static uint64_t steal_active(QPTPool_t *ctx, const size_t id,
static uint64_t steal_claimed(QPTPool_t *ctx, const size_t id,
const size_t start, const size_t end) {
QPTPoolThreadData_t *tw = &ctx->data[id];

Expand All @@ -196,16 +196,16 @@ static uint64_t steal_active(QPTPool_t *ctx, const size_t id,

QPTPoolThreadData_t *target = &ctx->data[i];

if (pthread_mutex_trylock(&target->active_mutex) == 0) {
if (target->active.size) {
if (pthread_mutex_trylock(&target->claimed_mutex) == 0) {
if (target->claimed.size) {
/* always take at least 1 */
const uint64_t active = max(target->active.size * ctx->steal.num / ctx->steal.denom, 1);
sll_move_first(&tw->waiting, &target->active, active);
pthread_mutex_unlock(&target->active_mutex);
return active;
const uint64_t claimed = max(target->claimed.size * ctx->steal.num / ctx->steal.denom, 1);
sll_move_first(&tw->waiting, &target->claimed, claimed);
pthread_mutex_unlock(&target->claimed_mutex);
return claimed;
}

pthread_mutex_unlock(&target->active_mutex);
pthread_mutex_unlock(&target->claimed_mutex);
}
}

Expand Down Expand Up @@ -244,15 +244,15 @@ static void *worker_function(void *args) {
if (steal_work(ctx, id, tw->steal_from, ctx->nthreads) == 0) {
if (steal_work(ctx, id, 0, tw->steal_from) == 0) {
/*
* if still can't find anything, try the active queue
* if still can't find anything, try the claimed queue
*
* this should only be called if there is some
* work that is taking so long that the rest of
* the threads have run out of work, so this
* should not happen too often
*/
if (steal_active(ctx, id, tw->steal_from, ctx->nthreads) == 0) {
steal_active(ctx, id, 0, tw->steal_from);
if (steal_claimed(ctx, id, tw->steal_from, ctx->nthreads) == 0) {
steal_claimed(ctx, id, 0, tw->steal_from);
}
}
}
Expand Down Expand Up @@ -289,20 +289,20 @@ static void *worker_function(void *args) {

/* move entire queue into work and clear out queue */
timestamp_create_start(wf_move_queue);
pthread_mutex_lock(&tw->active_mutex);
pthread_mutex_lock(&tw->claimed_mutex);
if (tw->waiting.size) {
sll_move(&tw->active, &tw->waiting);
sll_move(&tw->claimed, &tw->waiting);
}
else {
sll_move(&tw->active, &tw->deferred);
sll_move(&tw->claimed, &tw->deferred);
}
pthread_mutex_unlock(&tw->active_mutex);
pthread_mutex_unlock(&tw->claimed_mutex);
timestamp_set_end(wf_move_queue);

#if defined(DEBUG) && defined (QPTPOOL_QUEUE_SIZE)
pthread_mutex_lock(&ctx->mutex);
pthread_mutex_lock(&print_mutex);
tw->waiting.size = tw->active.size;
tw->waiting.size = tw->claimed.size;

struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
Expand Down Expand Up @@ -330,7 +330,7 @@ static void *worker_function(void *args) {

/*
* pop work item off before it is processed so that if another
* thread steals from the active queue, the current active
* thread steals from the claimed queue, the current claimed
* work will not be re-run
*
* this has the side effect of moving 2 frees into the loop
Expand All @@ -342,9 +342,9 @@ static void *worker_function(void *args) {
* lower memory utilization
*/
timestamp_create_start(wf_get_queue_head);
pthread_mutex_lock(&tw->active_mutex);
struct queue_item *qi = (struct queue_item *) sll_pop(&tw->active);
pthread_mutex_unlock(&tw->active_mutex);
pthread_mutex_lock(&tw->claimed_mutex);
struct queue_item *qi = (struct queue_item *) sll_pop(&tw->claimed);
pthread_mutex_unlock(&tw->claimed_mutex);
timestamp_end_print(ctx->debug_buffers, id, "wf_get_queue_head", wf_get_queue_head);

while (qi) {
Expand All @@ -354,9 +354,9 @@ static void *worker_function(void *args) {
timestamp_end_print(ctx->debug_buffers, id, "wf_process_work", wf_process_work);

timestamp_create_start(wf_next_work);
pthread_mutex_lock(&tw->active_mutex);
qi = (struct queue_item *) sll_pop(&tw->active);
pthread_mutex_unlock(&tw->active_mutex);
pthread_mutex_lock(&tw->claimed_mutex);
qi = (struct queue_item *) sll_pop(&tw->claimed);
pthread_mutex_unlock(&tw->claimed_mutex);
timestamp_end_print(ctx->debug_buffers, id, "wf_next_work", wf_next_work);

work_count++;
Expand Down Expand Up @@ -609,8 +609,8 @@ QPTPool_t *QPTPool_init_with_props(const size_t nthreads,
/* set up thread data, but not threads */
for(size_t i = 0; i < nthreads; i++) {
QPTPoolThreadData_t *data = &ctx->data[i];
sll_init(&data->active);
pthread_mutex_init(&data->active_mutex, NULL);
sll_init(&data->claimed);
pthread_mutex_init(&data->claimed_mutex, NULL);
sll_init(&data->waiting);
sll_init(&data->deferred);
pthread_mutex_init(&data->mutex, NULL);
Expand Down Expand Up @@ -824,8 +824,8 @@ void QPTPool_destroy(QPTPool_t *ctx) {
*/
sll_destroy(&data->deferred, free);
sll_destroy(&data->waiting, free);
pthread_mutex_destroy(&data->active_mutex);
sll_destroy(&data->active, free);
pthread_mutex_destroy(&data->claimed_mutex);
sll_destroy(&data->claimed, free);
}

pthread_cond_destroy(&ctx->cv);
Expand Down

0 comments on commit 14020b8

Please sign in to comment.