From 8d297ee2861eea1ae0d7ece00aeddb403ecec802 Mon Sep 17 00:00:00 2001 From: Dmitriy Rusov Date: Thu, 20 Jul 2023 09:01:03 +0200 Subject: [PATCH] chore(query-orchestrator): Added metadata to cache and queue logs (#6751) --- .../src/queue-driver.interface.ts | 1 + .../src/orchestrator/LocalQueueDriver.js | 7 ++-- .../src/orchestrator/QueryCache.ts | 30 +++++++++++------ .../src/orchestrator/QueryQueue.js | 32 +++++++++++++++++-- 4 files changed, 53 insertions(+), 17 deletions(-) diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index d8812f1a73cfe..f02fa0a25f96b 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -42,6 +42,7 @@ export interface AddToQueueOptions { stageQueryKey: string, requestId: string, orphanedTimeout?: number, + queueId?: QueueId, } export interface QueueDriverOptions { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js index 6d86b47de48fe..a705f23152645 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js @@ -109,6 +109,7 @@ export class LocalQueueDriverConnection { */ addToQueue(keyScore, queryKey, orphanedTime, queryHandler, query, priority, options) { const queryQueueObj = { + queueId: options.queueId, queryHandler, query, queryKey, @@ -142,8 +143,7 @@ export class LocalQueueDriverConnection { return [ added, - // this driver doesnt support queue id - null, + queryQueueObj.queueId, Object.keys(this.toProcess).length, queryQueueObj.addedToQueueTime ]; @@ -271,8 +271,7 @@ export class LocalQueueDriverConnection { return [ added, - // this driver doesnt support queue id - null, + this.queryDef[key]?.queueId, this.queueArray(this.active), Object.keys(this.toProcess).length, this.queryDef[key], diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index c30b88fec9a9c..95063c6609e6c 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -1,3 +1,4 @@ +import crypto from 'crypto'; import csvWriter from 'csv-write-stream'; import LRUCache from 'lru-cache'; import { pipeline } from 'stream'; @@ -426,6 +427,7 @@ export class QueryCache { external, priority, requestId, + spanId, inlineTables, useCsvQuery, lambdaTypes, @@ -437,6 +439,7 @@ export class QueryCache { external: boolean, priority?: number, requestId?: string, + spanId?: string, inlineTables?: InlineTables, useCsvQuery?: boolean, lambdaTypes?: TableStructure, @@ -461,6 +464,7 @@ export class QueryCache { const opt = { stageQueryKey: cacheKey, requestId, + spanId, }; if (!persistent) { @@ -834,6 +838,7 @@ export class QueryCache { persistent?: boolean, } ) { + const spanId = crypto.randomBytes(16).toString('hex'); options = options || { dataSource: 'default' }; const { renewalThreshold } = options; const renewalKey = options.renewalKey && this.queryRedisKey(options.renewalKey); @@ -844,6 +849,7 @@ export class QueryCache { priority: options.priority, external: options.external, requestId: options.requestId, + spanId, persistent: options.persistent, dataSource: options.dataSource, useCsvQuery: options.useCsvQuery, @@ -858,10 +864,11 @@ export class QueryCache { .cacheDriver .set(redisKey, result, expiration) .then(({ bytes }) => { - this.logger('Renewed', { cacheKey, requestId: options.requestId }); + this.logger('Renewed', { cacheKey, requestId: options.requestId, spanId }); this.logger('Outgoing network usage', { service: 'cache', requestId: options.requestId, + spanId, bytes, cacheKey, }); @@ -869,10 +876,11 @@ export class QueryCache { }); }).catch(e => { if (!(e instanceof ContinueWaitError)) { - this.logger('Dropping Cache', { cacheKey, error: e.stack || e, requestId: options.requestId }); + this.logger('Dropping Cache', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId }); this.cacheDriver.remove(redisKey) .catch(err => this.logger('Error removing key', { cacheKey, + spanId, error: err.stack || err, requestId: options.requestId })); @@ -882,7 +890,7 @@ export class QueryCache { ); if (options.forceNoCache) { - this.logger('Force no cache for', { cacheKey, requestId: options.requestId }); + this.logger('Force no cache for', { cacheKey, requestId: options.requestId, spanId }); return fetchNew(); } @@ -914,7 +922,8 @@ export class QueryCache { renewalKey: inMemoryValue.renewalKey, newRenewalKey: renewalKey, renewalThreshold, - requestId: options.requestId + requestId: options.requestId, + spanId, }); res = inMemoryValue; } @@ -935,7 +944,8 @@ export class QueryCache { renewalKey: parsedResult.renewalKey, newRenewalKey: renewalKey, renewalThreshold, - requestId: options.requestId + requestId: options.requestId, + spanId, }); if ( renewalKey && ( @@ -946,24 +956,24 @@ export class QueryCache { ) ) { if (options.waitForRenew) { - this.logger('Waiting for renew', { cacheKey, renewalThreshold, requestId: options.requestId }); + this.logger('Waiting for renew', { cacheKey, renewalThreshold, requestId: options.requestId, spanId }); return fetchNew(); } else { - this.logger('Renewing existing key', { cacheKey, renewalThreshold, requestId: options.requestId }); + this.logger('Renewing existing key', { cacheKey, renewalThreshold, requestId: options.requestId, spanId }); fetchNew().catch(e => { if (!(e instanceof ContinueWaitError)) { - this.logger('Error renewing', { cacheKey, error: e.stack || e, requestId: options.requestId }); + this.logger('Error renewing', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId }); } }); } } - this.logger('Using cache for', { cacheKey, requestId: options.requestId }); + this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId }); if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) { this.memoryCache.set(redisKey, parsedResult); } return parsedResult.result; } else { - this.logger('Missing cache for', { cacheKey, requestId: options.requestId }); + this.logger('Missing cache for', { cacheKey, requestId: options.requestId, spanId }); return fetchNew(); } } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 17eda53f934af..939a2a9a191f9 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -167,6 +167,12 @@ export class QueryQueue { return stream; } + counter = 0; + + generateQueueId() { + return this.counter++; + } + /** * Push query to the queue and call `QueryQueue.reconcileQueue()` method if * `options.skipQueue` is set to `false`, execute query skipping queue @@ -189,6 +195,7 @@ export class QueryQueue { options, ) { options = options || {}; + options.queueId = this.generateQueueId(); if (this.skipQueue) { const queryDef = { queryHandler, @@ -200,6 +207,8 @@ export class QueryQueue { addedToQueueTime: new Date().getTime(), }; this.logger('Waiting for query', { + queueId: options.queueId, + spanId: options.spanId, queueSize: 0, queryKey: queryDef.queryKey, queuePrefix: this.redisQueuePrefix, @@ -209,7 +218,7 @@ export class QueryQueue { if (queryHandler === 'stream') { throw new Error('Streaming queries to Cube Store aren\'t supported'); } - const result = await this.processQuerySkipQueue(queryDef); + const result = await this.processQuerySkipQueue(queryDef, options.queueId); return this.parseResult(result); } @@ -251,6 +260,8 @@ export class QueryQueue { if (added > 0) { this.logger('Added to queue', { + queueId, + spanId: options.spanId, priority, queueSize, queryKey, @@ -273,6 +284,8 @@ export class QueryQueue { if (queryDef) { this.logger('Waiting for query', { + queueId, + spanId: options.spanId, queueSize, queryKey: queryDef.queryKey, queuePrefix: this.redisQueuePrefix, @@ -614,9 +627,10 @@ export class QueryQueue { * @param {*} query * @returns {Promise<{ result: undefined | Object, error: string | undefined }>} */ - async processQuerySkipQueue(query) { + async processQuerySkipQueue(query, queueId) { const startQueryTime = (new Date()).getTime(); this.logger('Performing query', { + queueId, queueSize: 0, queryKey: query.queryKey, queuePrefix: this.redisQueuePrefix, @@ -639,6 +653,7 @@ export class QueryQueue { ) }; this.logger('Performing query completed', { + queueId, queueSize: 0, duration: ((new Date()).getTime() - startQueryTime), queryKey: query.queryKey, @@ -651,6 +666,7 @@ export class QueryQueue { error: (e.message || e).toString() // TODO error handling }; this.logger('Error while querying', { + queueId, queueSize: 0, duration: ((new Date()).getTime() - startQueryTime), queryKey: query.queryKey, @@ -662,9 +678,10 @@ export class QueryQueue { if (e instanceof TimeoutError) { if (handler) { this.logger('Cancelling query due to timeout', { + queueId, queryKey: query.queryKey, queuePrefix: this.redisQueuePrefix, - requestId: query.requestId + requestId: query.requestId, }); await handler(query); } @@ -708,6 +725,7 @@ export class QueryQueue { const startQueryTime = (new Date()).getTime(); const timeInQueue = (new Date()).getTime() - query.addedToQueueTime; this.logger('Performing query', { + queueId, processingId, queueSize, queryKey: query.queryKey, @@ -755,6 +773,7 @@ export class QueryQueue { return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId, queueId); } catch (e) { this.logger('Error while query update', { + queueId, queryKey: query.queryKey, error: e.stack || e, queuePrefix: this.redisQueuePrefix, @@ -775,6 +794,7 @@ export class QueryQueue { } this.logger('Performing query completed', { + queueId, processingId, queueSize, duration: ((new Date()).getTime() - startQueryTime), @@ -793,6 +813,7 @@ export class QueryQueue { error: (e.message || e).toString() // TODO error handling }; this.logger('Error while querying', { + queueId, processingId, queueSize, duration: ((new Date()).getTime() - startQueryTime), @@ -811,6 +832,7 @@ export class QueryQueue { const queryWithCancelHandle = await queueConnection.getQueryDef(queryKeyHashed); if (queryWithCancelHandle) { this.logger('Cancelling query due to timeout', { + queueId, processingId, queryKey: queryWithCancelHandle.queryKey, queuePrefix: this.redisQueuePrefix, @@ -830,6 +852,7 @@ export class QueryQueue { if (!(await queueConnection.setResultAndRemoveQuery(queryKeyHashed, executionResult, processingId, queueId))) { this.logger('Orphaned execution result', { + queueId, processingId, warn: 'Result for query was not set due to processing lock wasn\'t acquired', queryKey: query.queryKey, @@ -855,6 +878,7 @@ export class QueryQueue { // } this.logger('Skip processing', { + queueId, processingId, queryKey: query && query.queryKey || queryKeyHashed, requestId: query && query.requestId, @@ -869,6 +893,7 @@ export class QueryQueue { const currentProcessingId = await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated); if (currentProcessingId) { this.logger('Skipping free processing lock', { + queueId, processingId, currentProcessingId, queryKey: query && query.queryKey || queryKeyHashed, @@ -885,6 +910,7 @@ export class QueryQueue { } } catch (e) { this.logger('Queue storage error', { + queueId, queryKey: query && query.queryKey || queryKeyHashed, requestId: query && query.requestId, error: (e.stack || e).toString(),