diff --git a/src/QueuePerThreadPool.c b/src/QueuePerThreadPool.c index 38c25b03b..5de499620 100644 --- a/src/QueuePerThreadPool.c +++ b/src/QueuePerThreadPool.c @@ -118,6 +118,10 @@ struct QPTPool { #if defined(DEBUG) && defined(PER_THREAD_STATS) struct OutputBuffers *debug_buffers; #endif + + #if defined(DEBUG) && defined(QPTPOOL_QUEUE_SIZE) + size_t queue_size_highwater; /* used when printing out queue size stats */ + #endif }; /* struct to pass into pthread_create */ @@ -272,21 +276,32 @@ static void claim_work(QPTPoolThreadData_t *tw) { static void dump_queue_size_stats(QPTPool_t *ctx, QPTPoolThreadData_t *tw) { pthread_mutex_lock(&ctx->mutex); pthread_mutex_lock(&print_mutex); - tw->waiting.size = tw->claimed.size; - struct timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - fprintf(stderr, "qptpool_size %" PRIu64 " ", since_epoch(&now) - epoch); + if (ctx->incomplete) { + tw->waiting.size = tw->claimed.size; - size_t sum = 0; - for(size_t i = 0; i < ctx->nthreads; i++) { - fprintf(stderr, "%zu ", ctx->data[i].waiting.size); - sum += ctx->data[i].waiting.size; - fprintf(stderr, "%zu ", ctx->data[i].deferred.size); - sum += ctx->data[i].deferred.size; + char buf[4096]; + size_t n = 0; + + size_t sum = 0; + for(size_t i = 0; i < ctx->nthreads; i++) { + n += SNPRINTF(buf + n, sizeof(buf) - n, "%zu ", ctx->data[i].waiting.size); + sum += ctx->data[i].waiting.size; + n += SNPRINTF(buf + n, sizeof(buf) - n, "%zu ", ctx->data[i].deferred.size); + sum += ctx->data[i].deferred.size; + } + + if (sum > ctx->queue_size_highwater) { + ctx->queue_size_highwater = sum; + + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + fprintf(stderr, "qptpool_size %" PRIu64 " %s total: %zu\n", since_epoch(&now) - epoch, buf, sum); + } + + tw->waiting.size = 0; } - fprintf(stderr, "%zu\n", sum); - tw->waiting.size = 0; + pthread_mutex_unlock(&print_mutex); pthread_mutex_unlock(&ctx->mutex); } @@ -634,6 +649,10 @@ QPTPool_t *QPTPool_init_with_props(const size_t nthreads, ctx->debug_buffers = debug_buffers; #endif + #if defined(DEBUG) && defined(QPTPOOL_QUEUE_SIZE) + ctx->queue_size_highwater = 0; + #endif + ctx->data = calloc(nthreads, sizeof(QPTPoolThreadData_t)); if (!ctx->data) { free(ctx);