diff --git a/src/QueuePerThreadPool.c b/src/QueuePerThreadPool.c index 70d73d089..6646d0dca 100644 --- a/src/QueuePerThreadPool.c +++ b/src/QueuePerThreadPool.c @@ -77,8 +77,8 @@ typedef enum { /* The context for a single thread in QPTPool */ typedef struct QPTPoolThreadData { - sll_t active; /* work items that have already been claimed by this thread */ - pthread_mutex_t active_mutex; + sll_t claimed; /* work items that have already been claimed by this thread */ + pthread_mutex_t claimed_mutex; sll_t waiting; /* generally push into this queue */ sll_t deferred; /* push into here if waiting queue is too big; pop when waiting queue is empty */ @@ -185,7 +185,7 @@ static uint64_t steal_work(QPTPool_t *ctx, const size_t id, * waiting and deferred queues have been checked and found to be * empty, and so should not be called frequently. */ -static uint64_t steal_active(QPTPool_t *ctx, const size_t id, +static uint64_t steal_claimed(QPTPool_t *ctx, const size_t id, const size_t start, const size_t end) { QPTPoolThreadData_t *tw = &ctx->data[id]; @@ -196,16 +196,16 @@ static uint64_t steal_active(QPTPool_t *ctx, const size_t id, QPTPoolThreadData_t *target = &ctx->data[i]; - if (pthread_mutex_trylock(&target->active_mutex) == 0) { - if (target->active.size) { + if (pthread_mutex_trylock(&target->claimed_mutex) == 0) { + if (target->claimed.size) { /* always take at least 1 */ - const uint64_t active = max(target->active.size * ctx->steal.num / ctx->steal.denom, 1); - sll_move_first(&tw->waiting, &target->active, active); - pthread_mutex_unlock(&target->active_mutex); - return active; + const uint64_t claimed = max(target->claimed.size * ctx->steal.num / ctx->steal.denom, 1); + sll_move_first(&tw->waiting, &target->claimed, claimed); + pthread_mutex_unlock(&target->claimed_mutex); + return claimed; } - pthread_mutex_unlock(&target->active_mutex); + pthread_mutex_unlock(&target->claimed_mutex); } } @@ -244,15 +244,15 @@ static void *worker_function(void *args) { if (steal_work(ctx, id, tw->steal_from, ctx->nthreads) == 0) { if (steal_work(ctx, id, 0, tw->steal_from) == 0) { /* - * if still can't find anything, try the active queue + * if still can't find anything, try the claimed queue * * this should only be called if there is some * work that is taking so long that the rest of * the threads have run out of work, so this * should not happen too often */ - if (steal_active(ctx, id, tw->steal_from, ctx->nthreads) == 0) { - steal_active(ctx, id, 0, tw->steal_from); + if (steal_claimed(ctx, id, tw->steal_from, ctx->nthreads) == 0) { + steal_claimed(ctx, id, 0, tw->steal_from); } } } @@ -289,20 +289,20 @@ static void *worker_function(void *args) { /* move entire queue into work and clear out queue */ timestamp_create_start(wf_move_queue); - pthread_mutex_lock(&tw->active_mutex); + pthread_mutex_lock(&tw->claimed_mutex); if (tw->waiting.size) { - sll_move(&tw->active, &tw->waiting); + sll_move(&tw->claimed, &tw->waiting); } else { - sll_move(&tw->active, &tw->deferred); + sll_move(&tw->claimed, &tw->deferred); } - pthread_mutex_unlock(&tw->active_mutex); + pthread_mutex_unlock(&tw->claimed_mutex); timestamp_set_end(wf_move_queue); #if defined(DEBUG) && defined (QPTPOOL_QUEUE_SIZE) pthread_mutex_lock(&ctx->mutex); pthread_mutex_lock(&print_mutex); - tw->waiting.size = tw->active.size; + tw->waiting.size = tw->claimed.size; struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); @@ -330,7 +330,7 @@ static void *worker_function(void *args) { /* * pop work item off before it is processed so that if another - * thread steals from the active queue, the current active + * thread steals from the claimed queue, the current claimed * work will not be re-run * * this has the side effect of moving 2 frees into the loop @@ -342,9 +342,9 @@ static void *worker_function(void *args) { * lower memory utilization */ timestamp_create_start(wf_get_queue_head); - pthread_mutex_lock(&tw->active_mutex); - struct queue_item *qi = (struct queue_item *) sll_pop(&tw->active); - pthread_mutex_unlock(&tw->active_mutex); + pthread_mutex_lock(&tw->claimed_mutex); + struct queue_item *qi = (struct queue_item *) sll_pop(&tw->claimed); + pthread_mutex_unlock(&tw->claimed_mutex); timestamp_end_print(ctx->debug_buffers, id, "wf_get_queue_head", wf_get_queue_head); while (qi) { @@ -354,9 +354,9 @@ static void *worker_function(void *args) { timestamp_end_print(ctx->debug_buffers, id, "wf_process_work", wf_process_work); timestamp_create_start(wf_next_work); - pthread_mutex_lock(&tw->active_mutex); - qi = (struct queue_item *) sll_pop(&tw->active); - pthread_mutex_unlock(&tw->active_mutex); + pthread_mutex_lock(&tw->claimed_mutex); + qi = (struct queue_item *) sll_pop(&tw->claimed); + pthread_mutex_unlock(&tw->claimed_mutex); timestamp_end_print(ctx->debug_buffers, id, "wf_next_work", wf_next_work); work_count++; @@ -609,8 +609,8 @@ QPTPool_t *QPTPool_init_with_props(const size_t nthreads, /* set up thread data, but not threads */ for(size_t i = 0; i < nthreads; i++) { QPTPoolThreadData_t *data = &ctx->data[i]; - sll_init(&data->active); - pthread_mutex_init(&data->active_mutex, NULL); + sll_init(&data->claimed); + pthread_mutex_init(&data->claimed_mutex, NULL); sll_init(&data->waiting); sll_init(&data->deferred); pthread_mutex_init(&data->mutex, NULL); @@ -824,8 +824,8 @@ void QPTPool_destroy(QPTPool_t *ctx) { */ sll_destroy(&data->deferred, free); sll_destroy(&data->waiting, free); - pthread_mutex_destroy(&data->active_mutex); - sll_destroy(&data->active, free); + pthread_mutex_destroy(&data->claimed_mutex); + sll_destroy(&data->claimed, free); } pthread_cond_destroy(&ctx->cv);