Skip to content

Commit

Permalink
chore(query-orchestrator): Added metadata to cache and queue logs (#6751
Browse files Browse the repository at this point in the history
)
  • Loading branch information
RusovDmitriy authored Jul 20, 2023
1 parent c609048 commit 8d297ee
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 17 deletions.
1 change: 1 addition & 0 deletions packages/cubejs-base-driver/src/queue-driver.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export interface AddToQueueOptions {
stageQueryKey: string,
requestId: string,
orphanedTimeout?: number,
queueId?: QueueId,
}

export interface QueueDriverOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export class LocalQueueDriverConnection {
*/
addToQueue(keyScore, queryKey, orphanedTime, queryHandler, query, priority, options) {
const queryQueueObj = {
queueId: options.queueId,
queryHandler,
query,
queryKey,
Expand Down Expand Up @@ -142,8 +143,7 @@ export class LocalQueueDriverConnection {

return [
added,
// this driver doesnt support queue id
null,
queryQueueObj.queueId,
Object.keys(this.toProcess).length,
queryQueueObj.addedToQueueTime
];
Expand Down Expand Up @@ -271,8 +271,7 @@ export class LocalQueueDriverConnection {

return [
added,
// this driver doesnt support queue id
null,
this.queryDef[key]?.queueId,
this.queueArray(this.active),
Object.keys(this.toProcess).length,
this.queryDef[key],
Expand Down
30 changes: 20 additions & 10 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import crypto from 'crypto';
import csvWriter from 'csv-write-stream';
import LRUCache from 'lru-cache';
import { pipeline } from 'stream';
Expand Down Expand Up @@ -426,6 +427,7 @@ export class QueryCache {
external,
priority,
requestId,
spanId,
inlineTables,
useCsvQuery,
lambdaTypes,
Expand All @@ -437,6 +439,7 @@ export class QueryCache {
external: boolean,
priority?: number,
requestId?: string,
spanId?: string,
inlineTables?: InlineTables,
useCsvQuery?: boolean,
lambdaTypes?: TableStructure,
Expand All @@ -461,6 +464,7 @@ export class QueryCache {
const opt = {
stageQueryKey: cacheKey,
requestId,
spanId,
};

if (!persistent) {
Expand Down Expand Up @@ -834,6 +838,7 @@ export class QueryCache {
persistent?: boolean,
}
) {
const spanId = crypto.randomBytes(16).toString('hex');
options = options || { dataSource: 'default' };
const { renewalThreshold } = options;
const renewalKey = options.renewalKey && this.queryRedisKey(options.renewalKey);
Expand All @@ -844,6 +849,7 @@ export class QueryCache {
priority: options.priority,
external: options.external,
requestId: options.requestId,
spanId,
persistent: options.persistent,
dataSource: options.dataSource,
useCsvQuery: options.useCsvQuery,
Expand All @@ -858,21 +864,23 @@ export class QueryCache {
.cacheDriver
.set(redisKey, result, expiration)
.then(({ bytes }) => {
this.logger('Renewed', { cacheKey, requestId: options.requestId });
this.logger('Renewed', { cacheKey, requestId: options.requestId, spanId });
this.logger('Outgoing network usage', {
service: 'cache',
requestId: options.requestId,
spanId,
bytes,
cacheKey,
});
return res;
});
}).catch(e => {
if (!(e instanceof ContinueWaitError)) {
this.logger('Dropping Cache', { cacheKey, error: e.stack || e, requestId: options.requestId });
this.logger('Dropping Cache', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId });
this.cacheDriver.remove(redisKey)
.catch(err => this.logger('Error removing key', {
cacheKey,
spanId,
error: err.stack || err,
requestId: options.requestId
}));
Expand All @@ -882,7 +890,7 @@ export class QueryCache {
);

if (options.forceNoCache) {
this.logger('Force no cache for', { cacheKey, requestId: options.requestId });
this.logger('Force no cache for', { cacheKey, requestId: options.requestId, spanId });
return fetchNew();
}

Expand Down Expand Up @@ -914,7 +922,8 @@ export class QueryCache {
renewalKey: inMemoryValue.renewalKey,
newRenewalKey: renewalKey,
renewalThreshold,
requestId: options.requestId
requestId: options.requestId,
spanId,
});
res = inMemoryValue;
}
Expand All @@ -935,7 +944,8 @@ export class QueryCache {
renewalKey: parsedResult.renewalKey,
newRenewalKey: renewalKey,
renewalThreshold,
requestId: options.requestId
requestId: options.requestId,
spanId,
});
if (
renewalKey && (
Expand All @@ -946,24 +956,24 @@ export class QueryCache {
)
) {
if (options.waitForRenew) {
this.logger('Waiting for renew', { cacheKey, renewalThreshold, requestId: options.requestId });
this.logger('Waiting for renew', { cacheKey, renewalThreshold, requestId: options.requestId, spanId });
return fetchNew();
} else {
this.logger('Renewing existing key', { cacheKey, renewalThreshold, requestId: options.requestId });
this.logger('Renewing existing key', { cacheKey, renewalThreshold, requestId: options.requestId, spanId });
fetchNew().catch(e => {
if (!(e instanceof ContinueWaitError)) {
this.logger('Error renewing', { cacheKey, error: e.stack || e, requestId: options.requestId });
this.logger('Error renewing', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId });
}
});
}
}
this.logger('Using cache for', { cacheKey, requestId: options.requestId });
this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId });
if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) {
this.memoryCache.set(redisKey, parsedResult);
}
return parsedResult.result;
} else {
this.logger('Missing cache for', { cacheKey, requestId: options.requestId });
this.logger('Missing cache for', { cacheKey, requestId: options.requestId, spanId });
return fetchNew();
}
}
Expand Down
32 changes: 29 additions & 3 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ export class QueryQueue {
return stream;
}

counter = 0;

generateQueueId() {
return this.counter++;
}

/**
* Push query to the queue and call `QueryQueue.reconcileQueue()` method if
* `options.skipQueue` is set to `false`, execute query skipping queue
Expand All @@ -189,6 +195,7 @@ export class QueryQueue {
options,
) {
options = options || {};
options.queueId = this.generateQueueId();
if (this.skipQueue) {
const queryDef = {
queryHandler,
Expand All @@ -200,6 +207,8 @@ export class QueryQueue {
addedToQueueTime: new Date().getTime(),
};
this.logger('Waiting for query', {
queueId: options.queueId,
spanId: options.spanId,
queueSize: 0,
queryKey: queryDef.queryKey,
queuePrefix: this.redisQueuePrefix,
Expand All @@ -209,7 +218,7 @@ export class QueryQueue {
if (queryHandler === 'stream') {
throw new Error('Streaming queries to Cube Store aren\'t supported');
}
const result = await this.processQuerySkipQueue(queryDef);
const result = await this.processQuerySkipQueue(queryDef, options.queueId);
return this.parseResult(result);
}

Expand Down Expand Up @@ -251,6 +260,8 @@ export class QueryQueue {

if (added > 0) {
this.logger('Added to queue', {
queueId,
spanId: options.spanId,
priority,
queueSize,
queryKey,
Expand All @@ -273,6 +284,8 @@ export class QueryQueue {

if (queryDef) {
this.logger('Waiting for query', {
queueId,
spanId: options.spanId,
queueSize,
queryKey: queryDef.queryKey,
queuePrefix: this.redisQueuePrefix,
Expand Down Expand Up @@ -614,9 +627,10 @@ export class QueryQueue {
* @param {*} query
* @returns {Promise<{ result: undefined | Object, error: string | undefined }>}
*/
async processQuerySkipQueue(query) {
async processQuerySkipQueue(query, queueId) {
const startQueryTime = (new Date()).getTime();
this.logger('Performing query', {
queueId,
queueSize: 0,
queryKey: query.queryKey,
queuePrefix: this.redisQueuePrefix,
Expand All @@ -639,6 +653,7 @@ export class QueryQueue {
)
};
this.logger('Performing query completed', {
queueId,
queueSize: 0,
duration: ((new Date()).getTime() - startQueryTime),
queryKey: query.queryKey,
Expand All @@ -651,6 +666,7 @@ export class QueryQueue {
error: (e.message || e).toString() // TODO error handling
};
this.logger('Error while querying', {
queueId,
queueSize: 0,
duration: ((new Date()).getTime() - startQueryTime),
queryKey: query.queryKey,
Expand All @@ -662,9 +678,10 @@ export class QueryQueue {
if (e instanceof TimeoutError) {
if (handler) {
this.logger('Cancelling query due to timeout', {
queueId,
queryKey: query.queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: query.requestId
requestId: query.requestId,
});
await handler(query);
}
Expand Down Expand Up @@ -708,6 +725,7 @@ export class QueryQueue {
const startQueryTime = (new Date()).getTime();
const timeInQueue = (new Date()).getTime() - query.addedToQueueTime;
this.logger('Performing query', {
queueId,
processingId,
queueSize,
queryKey: query.queryKey,
Expand Down Expand Up @@ -755,6 +773,7 @@ export class QueryQueue {
return queueConnection.optimisticQueryUpdate(queryKeyHashed, { cancelHandler }, processingId, queueId);
} catch (e) {
this.logger('Error while query update', {
queueId,
queryKey: query.queryKey,
error: e.stack || e,
queuePrefix: this.redisQueuePrefix,
Expand All @@ -775,6 +794,7 @@ export class QueryQueue {
}

this.logger('Performing query completed', {
queueId,
processingId,
queueSize,
duration: ((new Date()).getTime() - startQueryTime),
Expand All @@ -793,6 +813,7 @@ export class QueryQueue {
error: (e.message || e).toString() // TODO error handling
};
this.logger('Error while querying', {
queueId,
processingId,
queueSize,
duration: ((new Date()).getTime() - startQueryTime),
Expand All @@ -811,6 +832,7 @@ export class QueryQueue {
const queryWithCancelHandle = await queueConnection.getQueryDef(queryKeyHashed);
if (queryWithCancelHandle) {
this.logger('Cancelling query due to timeout', {
queueId,
processingId,
queryKey: queryWithCancelHandle.queryKey,
queuePrefix: this.redisQueuePrefix,
Expand All @@ -830,6 +852,7 @@ export class QueryQueue {

if (!(await queueConnection.setResultAndRemoveQuery(queryKeyHashed, executionResult, processingId, queueId))) {
this.logger('Orphaned execution result', {
queueId,
processingId,
warn: 'Result for query was not set due to processing lock wasn\'t acquired',
queryKey: query.queryKey,
Expand All @@ -855,6 +878,7 @@ export class QueryQueue {
// }

this.logger('Skip processing', {
queueId,
processingId,
queryKey: query && query.queryKey || queryKeyHashed,
requestId: query && query.requestId,
Expand All @@ -869,6 +893,7 @@ export class QueryQueue {
const currentProcessingId = await queueConnection.freeProcessingLock(queryKeyHashed, processingId, activated);
if (currentProcessingId) {
this.logger('Skipping free processing lock', {
queueId,
processingId,
currentProcessingId,
queryKey: query && query.queryKey || queryKeyHashed,
Expand All @@ -885,6 +910,7 @@ export class QueryQueue {
}
} catch (e) {
this.logger('Queue storage error', {
queueId,
queryKey: query && query.queryKey || queryKeyHashed,
requestId: query && query.requestId,
error: (e.stack || e).toString(),
Expand Down

0 comments on commit 8d297ee

Please sign in to comment.