From 14e0487ebf444d06324b4456e7c3cdc74d5f57ee Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Sat, 28 Oct 2023 17:25:02 +0200 Subject: [PATCH] feat(query-orchestrator): Use real queueId in events for processing --- .../src/queue-driver.interface.ts | 7 +++--- .../src/CubeStoreQueueDriver.ts | 22 ++++++++++++------- .../src/orchestrator/LocalQueueDriver.js | 13 ++++++++++- .../src/orchestrator/QueryQueue.js | 20 ++++++++++++----- .../src/orchestrator/RedisQueueDriver.ts | 9 +++++--- .../test/unit/QueryQueue.abstract.ts | 2 +- 6 files changed, 51 insertions(+), 22 deletions(-) diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index f02fa0a25f96b..831fdf7cedd56 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -10,7 +10,8 @@ export interface QueryKeyHash extends String { __type: 'QueryKeyHash' } -export type GetActiveAndToProcessResponse = [active: string[], toProcess: string[]]; +export type QueryKeysTuple = [keyHash: QueryKeyHash, queueId: QueueId | null /** Cube Store supports real QueueId */]; +export type GetActiveAndToProcessResponse = [active: QueryKeysTuple[], toProcess: QueryKeysTuple[]]; export type AddToQueueResponse = [added: number, queueId: QueueId | null, queueSize: number, addedToQueueTime: number]; export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record]; export type RetrieveForProcessingSuccess = [ @@ -73,8 +74,8 @@ export interface QueueDriverConnectionInterface { */ addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise; // Return query keys which was sorted by priority and time - getToProcessQueries(): Promise; - getActiveQueries(): Promise; + getToProcessQueries(): Promise; + getActiveQueries(): Promise; getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise; // Queries which was added to queue, but was not processed and not needed getOrphanedQueries(): Promise; diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index 7c131171b4820..7b336011a006f 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -12,7 +12,7 @@ import { QueryKey, QueryKeyHash, ProcessingId, - QueueId, + QueueId, GetActiveAndToProcessResponse, QueryKeysTuple, } from '@cubejs-backend/base-driver'; import { getProcessUid } from '@cubejs-backend/shared'; @@ -106,33 +106,39 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { // nothing to do } - public async getActiveQueries(): Promise { + public async getActiveQueries(): Promise { const rows = await this.driver.query('QUEUE ACTIVE ?', [ this.options.redisQueuePrefix ]); return rows.map((row) => row.id); } - public async getToProcessQueries(): Promise { + public async getToProcessQueries(): Promise { const rows = await this.driver.query('QUEUE PENDING ?', [ this.options.redisQueuePrefix ]); return rows.map((row) => row.id); } - public async getActiveAndToProcess(): Promise<[active: string[], toProcess: string[]]> { + public async getActiveAndToProcess(): Promise { const rows = await this.driver.query('QUEUE LIST ?', [ this.options.redisQueuePrefix ]); if (rows.length) { - const active: string[] = []; - const toProcess: string[] = []; + const active: QueryKeysTuple[] = []; + const toProcess: QueryKeysTuple[] = []; for (const row of rows) { if (row.status === 'active') { - active.push(row.id); + active.push([ + row.id, + row.queue_id ? parseInt(row.queue_id, 10) : null, + ]); } else { - toProcess.push(row.id); + toProcess.push([ + row.id, + row.queue_id ? parseInt(row.queue_id, 10) : null, + ]); } } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js index a705f23152645..eae1d814f5d75 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js @@ -33,8 +33,17 @@ export class LocalQueueDriverConnection { return stalled.concat(orphaned); } + /** + * @returns {Promise} + */ async getActiveAndToProcess() { - return [this.queueArray(this.active), this.queueArray(this.toProcess)]; + const active = this.queueArray(this.active); + const toProcess = this.queueArray(this.toProcess); + + return [ + active.map((queryKeyHash) => [queryKeyHash, null]), + toProcess.map((queryKeyHash) => [queryKeyHash, null]) + ]; } getResultPromise(resultListKey) { @@ -247,12 +256,14 @@ export class LocalQueueDriverConnection { retrieveForProcessing(queryKey, processingId) { const key = this.redisHash(queryKey); let lockAcquired = false; + if (!this.processingLocks[key]) { this.processingLocks[key] = processingId; lockAcquired = true; } else { return null; } + let added = 0; if (Object.keys(this.active).length < this.concurrency && !this.active[key]) { this.active[key] = { key, order: processingId }; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 17cac3348d12a..6dab45411a509 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -1,7 +1,7 @@ import R from 'ramda'; import { EventEmitter } from 'events'; import { getEnv, getProcessUid } from '@cubejs-backend/shared'; -import { QueueDriverInterface, QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver'; +import { QueueDriverInterface, QueryKey, QueryKeyHash, QueueId } from '@cubejs-backend/base-driver'; import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; import { TimeoutError } from './TimeoutError'; @@ -79,9 +79,9 @@ export class QueryQueue { /** * @protected - * @type {function(string): Promise} + * @type {function(QueryKeyHash, QueueId | null): Promise} */ - this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey) => { this.processQuery(queryKey); }); + this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey, queryId) => { this.processQuery(queryKey, queryId); }); /** * @protected @@ -635,6 +635,7 @@ export class QueryQueue { * Execute query without adding it to the queue. * * @param {*} query + * @param {QueueId} queueId * @returns {Promise<{ result: undefined | Object, error: string | undefined }>} */ async processQuerySkipQueue(query, queueId) { @@ -705,13 +706,13 @@ export class QueryQueue { * of the logic related with the queues updates, heartbeat, etc. * * @param {QueryKeyHash} queryKeyHashed + * @param {QueueId | null} queueId Real queue id, only for Cube Store * @return {Promise<{ result: undefined | Object, error: string | undefined }>} */ - async processQuery(queryKeyHashed) { + async processQuery(queryKeyHashed, queueId) { const queueConnection = await this.queueDriver.createConnection(); let insertedCount; - let queueId; let activeKeys; let queueSize; let query; @@ -722,7 +723,14 @@ export class QueryQueue { const retrieveResult = await queueConnection.retrieveForProcessing(queryKeyHashed, processingId); if (retrieveResult) { - [insertedCount, queueId, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult; + let retrieveQueueId; + + [insertedCount, retrieveQueueId, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult; + + // Backward compatibility for old Cube Store + if (retrieveQueueId) { + queueId = retrieveQueueId; + } } const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.ts index 442c85fee5d84..7ea8f6887fb00 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.ts @@ -87,7 +87,10 @@ export class RedisQueueDriverConnection implements QueueDriverConnectionInterfac const active = await this.getActiveQueries(); const toProcess = await this.getToProcessQueries(); - return [active, toProcess]; + return [ + active.map((queryKeyHash) => [queryKeyHash, null]), + toProcess.map((queryKeyHash) => [queryKeyHash, null]) + ]; } public async addToQueue( @@ -142,11 +145,11 @@ export class RedisQueueDriverConnection implements QueueDriverConnectionInterfac } public getToProcessQueries() { - return this.redisClient.zrangeAsync([this.toProcessRedisKey(), 0, -1]); + return this.redisClient.zrangeAsync([this.toProcessRedisKey(), 0, -1]) as Promise; } public getActiveQueries() { - return this.redisClient.zrangeAsync([this.activeRedisKey(), 0, -1]); + return this.redisClient.zrangeAsync([this.activeRedisKey(), 0, -1]) as Promise; } public async getQueryAndRemove(queryKey: QueryKeyHash): Promise<[QueryDef]> { diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index a8d64d8b10829..e4bc88efd2312 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -324,7 +324,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} test('removed before reconciled', async () => { const query: QueryKey = ['select * from', []]; const key = queue.redisHash(query); - await queue.processQuery(key); + await queue.processQuery(key, null); const result = await queue.executeInQueue('foo', key, query); expect(result).toBe('select * from bar'); });