diff --git a/.eslintrc b/.eslintrc index 88b52f9b..33499af2 100644 --- a/.eslintrc +++ b/.eslintrc @@ -15,7 +15,14 @@ "@typescript-eslint/no-throw-literal": "off", "no-restricted-syntax": "off", "no-case-declarations": "off", - "max-classes-per-file": "off" + "max-classes-per-file": "off", + "import/no-extraneous-dependencies": [ + "error", + { + "devDependencies": true, + "optionalDependencies": true + } + ] } } ] diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index 2bdcb724..305e4c81 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -1,4 +1,4 @@ -import { stringify, NIL, parse } from 'uuid'; +import { stringify, NIL } from 'uuid'; import IOperation, { FetchOptions, FinishedOptions, @@ -92,7 +92,7 @@ export default class DBSQLOperation implements IOperation { useOnlyPrefetchedResults, ); this.closeOperation = directResults?.closeOperation; - this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`); } public iterateChunks(options?: IteratorOptions): IOperationChunksIterator { @@ -103,8 +103,9 @@ export default class DBSQLOperation implements IOperation { return new OperationRowsIterator(this, options); } - public getId() { - return stringify(this.operationHandle?.operationId?.guid || parse(NIL)); + public get id() { + const operationId = this.operationHandle?.operationId?.guid; + return operationId ? stringify(operationId) : NIL; } /** @@ -131,7 +132,7 @@ export default class DBSQLOperation implements IOperation { const chunk = await this.fetchChunk(fetchChunkOptions); data.push(chunk); } while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop - this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`); return data.flat(); } @@ -185,7 +186,7 @@ export default class DBSQLOperation implements IOperation { .getLogger() .log( LogLevel.debug, - `Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.getId()}`, + `Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.id}`, ); return result; } @@ -197,7 +198,7 @@ export default class DBSQLOperation implements IOperation { */ public async status(progress: boolean = false): Promise { await this.failIfClosed(); - this.context.getLogger().log(LogLevel.debug, `Fetching status for operation with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Fetching status for operation with id: ${this.id}`); if (this.operationStatus) { return this.operationStatus; @@ -221,7 +222,7 @@ export default class DBSQLOperation implements IOperation { return Status.success(); } - this.context.getLogger().log(LogLevel.debug, `Cancelling operation with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Cancelling operation with id: ${this.id}`); const driver = await this.context.getDriver(); const response = await driver.cancelOperation({ @@ -245,7 +246,7 @@ export default class DBSQLOperation implements IOperation { return Status.success(); } - this.context.getLogger().log(LogLevel.debug, `Closing operation with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Closing operation with id: ${this.id}`); const driver = await this.context.getDriver(); const response = @@ -286,7 +287,7 @@ export default class DBSQLOperation implements IOperation { await this.waitUntilReady(options); - this.context.getLogger().log(LogLevel.debug, `Fetching schema for operation with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Fetching schema for operation with id: ${this.id}`); const metadata = await this.fetchMetadata(); return metadata.schema ?? null; } diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 9863bc0c..f49e9651 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -1,6 +1,8 @@ import * as fs from 'fs'; import * as path from 'path'; -import { stringify, NIL, parse } from 'uuid'; +import stream from 'node:stream'; +import util from 'node:util'; +import { stringify, NIL } from 'uuid'; import fetch, { HeadersInit } from 'node-fetch'; import { TSessionHandle, @@ -27,7 +29,7 @@ import IOperation from './contracts/IOperation'; import DBSQLOperation from './DBSQLOperation'; import Status from './dto/Status'; import InfoValue from './dto/InfoValue'; -import { definedOrError } from './utils'; +import { definedOrError, LZ4 } from './utils'; import CloseableCollection from './utils/CloseableCollection'; import { LogLevel } from './contracts/IDBSQLLogger'; import HiveDriverError from './errors/HiveDriverError'; @@ -36,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter'; import ParameterError from './errors/ParameterError'; import IClientContext, { ClientConfig } from './contracts/IClientContext'; +// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14 +const pipeline = util.promisify(stream.pipeline); + const defaultMaxRows = 100000; interface OperationResponseShape { @@ -135,11 +140,12 @@ export default class DBSQLSession implements IDBSQLSession { constructor({ handle, context }: DBSQLSessionConstructorOptions) { this.sessionHandle = handle; this.context = context; - this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.id}`); } - public getId() { - return stringify(this.sessionHandle?.sessionId?.guid || parse(NIL)); + public get id() { + const sessionId = this.sessionHandle?.sessionId?.guid; + return sessionId ? stringify(sessionId) : NIL; } /** @@ -184,7 +190,7 @@ export default class DBSQLSession implements IDBSQLSession { ...getArrowOptions(clientConfig), canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch, parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters), - canDecompressLZ4Result: clientConfig.useLZ4Compression, + canDecompressLZ4Result: clientConfig.useLZ4Compression && Boolean(LZ4), }); const response = await this.handleResponse(operationPromise); const operation = this.createOperation(response); @@ -271,8 +277,10 @@ export default class DBSQLSession implements IDBSQLSession { if (!response.ok) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } - const buffer = await response.arrayBuffer(); - fs.writeFileSync(localFile, Buffer.from(buffer)); + + const fileStream = fs.createWriteStream(localFile); + // `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly + return pipeline(response.body, fileStream); } private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise { @@ -280,11 +288,14 @@ export default class DBSQLSession implements IDBSQLSession { const agent = await connectionProvider.getAgent(); const response = await fetch(presignedUrl, { method: 'DELETE', headers, agent }); - // Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files - // AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200 + // Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files. + // AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200. // Azure, on the other hand, is somewhat stricter and check if file exists before deleting it. And if - // file doesn't exist - Azure returns HTTP 404 - if (!response.ok) { + // file doesn't exist - Azure returns HTTP 404. + // + // For us, it's totally okay if file didn't exist before removing. So when we get an HTTP 404 - + // just ignore it and report success. This way we can have a uniform library behavior for all clouds + if (!response.ok && response.status !== 404) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } } @@ -301,8 +312,19 @@ export default class DBSQLSession implements IDBSQLSession { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - const data = fs.readFileSync(localFile); - const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data }); + const fileStream = fs.createReadStream(localFile); + const fileInfo = fs.statSync(localFile, { bigint: true }); + + const response = await fetch(presignedUrl, { + method: 'PUT', + headers: { + ...headers, + // This header is required by server + 'Content-Length': fileInfo.size.toString(), + }, + agent, + body: fileStream, + }); if (!response.ok) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } @@ -510,7 +532,7 @@ export default class DBSQLSession implements IDBSQLSession { this.onClose?.(); this.isOpen = false; - this.context.getLogger().log(LogLevel.debug, `Session closed with id: ${this.getId()}`); + this.context.getLogger().log(LogLevel.debug, `Session closed with id: ${this.id}`); return new Status(response.status); } diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index 25cf251a..ab5509ef 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -112,6 +112,11 @@ export type CrossReferenceRequest = { }; export default interface IDBSQLSession { + /** + * Session identifier + */ + readonly id: string; + /** * Returns general information about the data source * diff --git a/lib/contracts/IOperation.ts b/lib/contracts/IOperation.ts index a8659efb..35382a5e 100644 --- a/lib/contracts/IOperation.ts +++ b/lib/contracts/IOperation.ts @@ -36,6 +36,11 @@ export interface IOperationRowsIterator extends AsyncIterableIterator { } export default interface IOperation { + /** + * Operation identifier + */ + readonly id: string; + /** * Fetch a portion of data */ diff --git a/lib/result/ArrowResultHandler.ts b/lib/result/ArrowResultHandler.ts index 601432e8..108f3365 100644 --- a/lib/result/ArrowResultHandler.ts +++ b/lib/result/ArrowResultHandler.ts @@ -1,8 +1,9 @@ -import LZ4 from 'lz4'; import { TGetResultSetMetadataResp, TRowSet } from '../../thrift/TCLIService_types'; +import HiveDriverError from '../errors/HiveDriverError'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; import { ArrowBatch, hiveSchemaToArrowSchema } from './utils'; +import { LZ4 } from '../utils'; export default class ArrowResultHandler implements IResultsProvider { protected readonly context: IClientContext; @@ -24,6 +25,10 @@ export default class ArrowResultHandler implements IResultsProvider // so it's possible to infer Arrow schema from Hive schema ignoring `useArrowNativeTypes` option this.arrowSchema = arrowSchema ?? hiveSchemaToArrowSchema(schema); this.isLZ4Compressed = lz4Compressed ?? false; + + if (this.isLZ4Compressed && !LZ4) { + throw new HiveDriverError('Cannot handle LZ4 compressed result: module `lz4` not installed'); + } } public async hasMore() { @@ -47,7 +52,7 @@ export default class ArrowResultHandler implements IResultsProvider let totalRowCount = 0; rowSet?.arrowBatches?.forEach(({ batch, rowCount }) => { if (batch) { - batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch); + batches.push(this.isLZ4Compressed ? LZ4!.decode(batch) : batch); totalRowCount += rowCount.toNumber(true); } }); diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index 39ef6f94..c0450aef 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -1,9 +1,10 @@ -import LZ4 from 'lz4'; import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch'; import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types'; +import HiveDriverError from '../errors/HiveDriverError'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; import { ArrowBatch } from './utils'; +import { LZ4 } from '../utils'; export default class CloudFetchResultHandler implements IResultsProvider { protected readonly context: IClientContext; @@ -24,6 +25,10 @@ export default class CloudFetchResultHandler implements IResultsProvider LZ4.decode(buffer)); + batch.batches = batch.batches.map((buffer) => LZ4!.decode(buffer)); } return batch; } diff --git a/lib/utils/index.ts b/lib/utils/index.ts index 4603277a..963f6b05 100644 --- a/lib/utils/index.ts +++ b/lib/utils/index.ts @@ -1,5 +1,6 @@ import definedOrError from './definedOrError'; import buildUserAgentString from './buildUserAgentString'; import formatProgress, { ProgressUpdateTransformer } from './formatProgress'; +import LZ4 from './lz4'; -export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer }; +export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4 }; diff --git a/lib/utils/lz4.ts b/lib/utils/lz4.ts new file mode 100644 index 00000000..8186024d --- /dev/null +++ b/lib/utils/lz4.ts @@ -0,0 +1,16 @@ +import type LZ4Namespace from 'lz4'; + +type LZ4Module = typeof LZ4Namespace; + +function tryLoadLZ4Module(): LZ4Module | undefined { + try { + return require('lz4'); // eslint-disable-line global-require + } catch (err) { + const isModuleNotFoundError = err instanceof Error && 'code' in err && err.code === 'MODULE_NOT_FOUND'; + if (!isModuleNotFoundError) { + throw err; + } + } +} + +export default tryLoadLZ4Module(); diff --git a/package-lock.json b/package-lock.json index 3259493a..ce4ee92a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,7 +11,6 @@ "dependencies": { "apache-arrow": "^13.0.0", "commander": "^9.3.0", - "lz4": "^0.6.5", "node-fetch": "^2.6.12", "node-int64": "^0.4.0", "open": "^8.4.2", @@ -48,6 +47,9 @@ }, "engines": { "node": ">=14.0.0" + }, + "optionalDependencies": { + "lz4": "^0.6.5" } }, "node_modules/@75lb/deep-merge": { @@ -1486,7 +1488,8 @@ "type": "consulting", "url": "https://feross.org/support" } - ] + ], + "optional": true }, "node_modules/basic-ftp": { "version": "5.0.3", @@ -1584,6 +1587,7 @@ "url": "https://feross.org/support" } ], + "optional": true, "dependencies": { "base64-js": "^1.3.1", "ieee754": "^1.1.13" @@ -1941,7 +1945,8 @@ "node_modules/cuint": { "version": "0.2.2", "resolved": "https://registry.npmjs.org/cuint/-/cuint-0.2.2.tgz", - "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==" + "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==", + "optional": true }, "node_modules/damerau-levenshtein": { "version": "1.0.8", @@ -3335,7 +3340,8 @@ "type": "consulting", "url": "https://feross.org/support" } - ] + ], + "optional": true }, "node_modules/ignore": { "version": "5.2.0", @@ -4093,6 +4099,7 @@ "resolved": "https://registry.npmjs.org/lz4/-/lz4-0.6.5.tgz", "integrity": "sha512-KSZcJU49QZOlJSItaeIU3p8WoAvkTmD9fJqeahQXNu1iQ/kR0/mQLdbrK8JY9MY8f6AhJoMrihp1nu1xDbscSQ==", "hasInstallScript": true, + "optional": true, "dependencies": { "buffer": "^5.2.1", "cuint": "^0.2.2", @@ -4278,7 +4285,8 @@ "node_modules/nan": { "version": "2.18.0", "resolved": "https://registry.npmjs.org/nan/-/nan-2.18.0.tgz", - "integrity": "sha512-W7tfG7vMOGtD30sHoZSSc/JVYiyDPEyQVso/Zz+/uQd0B0L46gtC+pHha5FFMRpil6fm/AoEcRWyOVi4+E/f8w==" + "integrity": "sha512-W7tfG7vMOGtD30sHoZSSc/JVYiyDPEyQVso/Zz+/uQd0B0L46gtC+pHha5FFMRpil6fm/AoEcRWyOVi4+E/f8w==", + "optional": true }, "node_modules/nanoid": { "version": "3.3.3", @@ -6106,6 +6114,7 @@ "version": "0.2.2", "resolved": "https://registry.npmjs.org/xxhashjs/-/xxhashjs-0.2.2.tgz", "integrity": "sha512-AkTuIuVTET12tpsVIQo+ZU6f/qDmKuRUcjaqR+OIvm+aCBsZ95i7UVY5WJ9TMsSaZ0DA2WxoZ4acu0sPH+OKAw==", + "optional": true, "dependencies": { "cuint": "^0.2.2" } @@ -7294,7 +7303,8 @@ "base64-js": { "version": "1.5.1", "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", - "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==" + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "optional": true }, "basic-ftp": { "version": "5.0.3", @@ -7353,6 +7363,7 @@ "version": "5.7.1", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "optional": true, "requires": { "base64-js": "^1.3.1", "ieee754": "^1.1.13" @@ -7630,7 +7641,8 @@ "cuint": { "version": "0.2.2", "resolved": "https://registry.npmjs.org/cuint/-/cuint-0.2.2.tgz", - "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==" + "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==", + "optional": true }, "damerau-levenshtein": { "version": "1.0.8", @@ -8644,7 +8656,8 @@ "ieee754": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", - "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==" + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "optional": true }, "ignore": { "version": "5.2.0", @@ -9207,6 +9220,7 @@ "version": "0.6.5", "resolved": "https://registry.npmjs.org/lz4/-/lz4-0.6.5.tgz", "integrity": "sha512-KSZcJU49QZOlJSItaeIU3p8WoAvkTmD9fJqeahQXNu1iQ/kR0/mQLdbrK8JY9MY8f6AhJoMrihp1nu1xDbscSQ==", + "optional": true, "requires": { "buffer": "^5.2.1", "cuint": "^0.2.2", @@ -9349,7 +9363,8 @@ "nan": { "version": "2.18.0", "resolved": "https://registry.npmjs.org/nan/-/nan-2.18.0.tgz", - "integrity": "sha512-W7tfG7vMOGtD30sHoZSSc/JVYiyDPEyQVso/Zz+/uQd0B0L46gtC+pHha5FFMRpil6fm/AoEcRWyOVi4+E/f8w==" + "integrity": "sha512-W7tfG7vMOGtD30sHoZSSc/JVYiyDPEyQVso/Zz+/uQd0B0L46gtC+pHha5FFMRpil6fm/AoEcRWyOVi4+E/f8w==", + "optional": true }, "nanoid": { "version": "3.3.3", @@ -10702,6 +10717,7 @@ "version": "0.2.2", "resolved": "https://registry.npmjs.org/xxhashjs/-/xxhashjs-0.2.2.tgz", "integrity": "sha512-AkTuIuVTET12tpsVIQo+ZU6f/qDmKuRUcjaqR+OIvm+aCBsZ95i7UVY5WJ9TMsSaZ0DA2WxoZ4acu0sPH+OKAw==", + "optional": true, "requires": { "cuint": "^0.2.2" } diff --git a/package.json b/package.json index 4c1b0ec8..87e6833a 100644 --- a/package.json +++ b/package.json @@ -74,7 +74,6 @@ "dependencies": { "apache-arrow": "^13.0.0", "commander": "^9.3.0", - "lz4": "^0.6.5", "node-fetch": "^2.6.12", "node-int64": "^0.4.0", "open": "^8.4.2", @@ -83,5 +82,8 @@ "thrift": "^0.16.0", "uuid": "^9.0.0", "winston": "^3.8.2" + }, + "optionalDependencies": { + "lz4": "^0.6.5" } } diff --git a/tests/e2e/staging_ingestion.test.js b/tests/e2e/staging_ingestion.test.js index d8feb423..b9cd3fea 100644 --- a/tests/e2e/staging_ingestion.test.js +++ b/tests/e2e/staging_ingestion.test.js @@ -114,17 +114,25 @@ describe('Staging Test', () => { }); const stagingFileName = `/Volumes/${catalog}/${schema}/${volume}/${uuid.v4()}.csv`; + const localFile = path.join(localPath, `${uuid.v4()}.csv`); + // File should not exist before removing try { - await session.executeStatement(`REMOVE '${stagingFileName}'`, { stagingAllowedLocalPath: [localPath] }); - // In some cases, `REMOVE` may silently succeed for non-existing files (see comment in relevant - // part of `DBSQLSession` code). But if it fails - it has to be an HTTP 404 error + await session.executeStatement(`GET '${stagingFileName}' TO '${localFile}'`, { + stagingAllowedLocalPath: [localPath], + }); + expect.fail('It should throw HTTP 404 error'); } catch (error) { if (error instanceof StagingError) { expect(error.message).to.contain('404'); } else { throw error; } + } finally { + fs.rmSync(localFile, { force: true }); } + + // Try to remove the file - it should succeed and not throw any errors + await session.executeStatement(`REMOVE '${stagingFileName}'`, { stagingAllowedLocalPath: [localPath] }); }); });