From 1adcb94043a204048e99cc3090ac0fbfb4f0eb70 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Tue, 22 Oct 2024 22:06:21 +0300 Subject: [PATCH] [PECO-983] Support streaming query results via Node.js streams (#262) * [PECO-983] Support streaming query results via Node.js streams Signed-off-by: Levko Kravets * Add tests Signed-off-by: Levko Kravets * CR1 Signed-off-by: Levko Kravets --------- Signed-off-by: Levko Kravets --- lib/DBSQLOperation.ts | 19 ++++++++++ lib/contracts/IOperation.ts | 9 +++++ tests/e2e/iterators.test.ts | 60 ++++++++++++++++++++++++++++++ tests/unit/.stubs/OperationStub.ts | 6 +++ 4 files changed, 94 insertions(+) diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index e7ab4bb..634749b 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -1,4 +1,5 @@ import { stringify, NIL } from 'uuid'; +import { Readable } from 'node:stream'; import IOperation, { FetchOptions, FinishedOptions, @@ -7,6 +8,7 @@ import IOperation, { IteratorOptions, IOperationChunksIterator, IOperationRowsIterator, + NodeStreamOptions, } from './contracts/IOperation'; import { TGetOperationStatusResp, @@ -101,6 +103,23 @@ export default class DBSQLOperation implements IOperation { return new OperationRowsIterator(this, options); } + public toNodeStream(options?: NodeStreamOptions): Readable { + let iterable: IOperationChunksIterator | IOperationRowsIterator | undefined; + + switch (options?.mode ?? 'chunks') { + case 'chunks': + iterable = this.iterateChunks(options?.iteratorOptions); + break; + case 'rows': + iterable = this.iterateRows(options?.iteratorOptions); + break; + default: + throw new Error(`IOperation.toNodeStream: unsupported mode ${options?.mode}`); + } + + return Readable.from(iterable, options?.streamOptions); + } + public get id() { const operationId = this.operationHandle?.operationId?.guid; return operationId ? stringify(operationId) : NIL; diff --git a/lib/contracts/IOperation.ts b/lib/contracts/IOperation.ts index 35382a5..1d0bb9a 100644 --- a/lib/contracts/IOperation.ts +++ b/lib/contracts/IOperation.ts @@ -1,3 +1,4 @@ +import { Readable, ReadableOptions } from 'node:stream'; import { TGetOperationStatusResp, TTableSchema } from '../../thrift/TCLIService_types'; import Status from '../dto/Status'; @@ -35,6 +36,12 @@ export interface IOperationRowsIterator extends AsyncIterableIterator { readonly operation: IOperation; } +export interface NodeStreamOptions { + mode?: 'chunks' | 'rows'; // defaults to 'chunks' + iteratorOptions?: IteratorOptions; + streamOptions?: ReadableOptions; +} + export default interface IOperation { /** * Operation identifier @@ -86,4 +93,6 @@ export default interface IOperation { iterateChunks(options?: IteratorOptions): IOperationChunksIterator; iterateRows(options?: IteratorOptions): IOperationRowsIterator; + + toNodeStream(options?: NodeStreamOptions): Readable; } diff --git a/tests/e2e/iterators.test.ts b/tests/e2e/iterators.test.ts index aa0e475..a07cb6f 100644 --- a/tests/e2e/iterators.test.ts +++ b/tests/e2e/iterators.test.ts @@ -88,4 +88,64 @@ describe('Iterators', () => { await session.close(); } }); + + it('should get all chunks via Nodejs stream', async () => { + const session = await openSession({ arrowEnabled: false }); + // @ts-expect-error TS2339: Property context does not exist on type IDBSQLSession + sinon.spy(session.context.driver, 'fetchResults'); + try { + const expectedRowsCount = 10; + + // set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults` + const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`, { + maxRows: null, + }); + + const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id })); + const chunkSize = 4; + const expectedChunks = arrayChunks(expectedRows, chunkSize); + + const stream = operation.toNodeStream({ + mode: 'chunks', + iteratorOptions: { maxRows: chunkSize }, + }); + + let index = 0; + for await (const chunk of stream) { + expect(chunk).to.deep.equal(expectedChunks[index]); + index += 1; + } + + expect(index).to.equal(expectedChunks.length); + } finally { + await session.close(); + } + }); + + it('should get all rows via Nodejs stream', async () => { + const session = await openSession({ arrowEnabled: false }); + // @ts-expect-error TS2339: Property context does not exist on type IDBSQLSession + sinon.spy(session.context.driver, 'fetchResults'); + try { + const expectedRowsCount = 10; + + const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`); + + const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id })); + + const stream = operation.toNodeStream({ + mode: 'rows', + }); + + let index = 0; + for await (const row of stream) { + expect(row).to.deep.equal(expectedRows[index]); + index += 1; + } + + expect(index).to.equal(expectedRows.length); + } finally { + await session.close(); + } + }); }); diff --git a/tests/unit/.stubs/OperationStub.ts b/tests/unit/.stubs/OperationStub.ts index 19a9087..cd82714 100644 --- a/tests/unit/.stubs/OperationStub.ts +++ b/tests/unit/.stubs/OperationStub.ts @@ -2,9 +2,11 @@ import IOperation, { IOperationChunksIterator, IOperationRowsIterator, IteratorOptions, + NodeStreamOptions, } from '../../../lib/contracts/IOperation'; import Status from '../../../lib/dto/Status'; import { OperationChunksIterator, OperationRowsIterator } from '../../../lib/utils/OperationIterator'; +import { Readable } from 'node:stream'; export default class OperationStub implements IOperation { public readonly id: string = ''; @@ -59,4 +61,8 @@ export default class OperationStub implements IOperation { public iterateRows(options?: IteratorOptions): IOperationRowsIterator { return new OperationRowsIterator(this, options); } + + public toNodeStream(options?: NodeStreamOptions): Readable { + throw new Error('Not implemented'); + } }