Skip to content

Commit

Permalink
Merge branch 'main' into iterable-operation
Browse files Browse the repository at this point in the history
  • Loading branch information
kravets-levko committed Apr 19, 2024
2 parents 8c8d1e2 + 1fd9e1e commit 884fdf7
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 44 deletions.
9 changes: 8 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@
"@typescript-eslint/no-throw-literal": "off",
"no-restricted-syntax": "off",
"no-case-declarations": "off",
"max-classes-per-file": "off"
"max-classes-per-file": "off",
"import/no-extraneous-dependencies": [
"error",
{
"devDependencies": true,
"optionalDependencies": true
}
]
}
}
]
Expand Down
21 changes: 11 additions & 10 deletions lib/DBSQLOperation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { stringify, NIL, parse } from 'uuid';
import { stringify, NIL } from 'uuid';
import IOperation, {
FetchOptions,
FinishedOptions,
Expand Down Expand Up @@ -92,7 +92,7 @@ export default class DBSQLOperation implements IOperation {
useOnlyPrefetchedResults,
);
this.closeOperation = directResults?.closeOperation;
this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`);
}

public iterateChunks(options?: IteratorOptions): IOperationChunksIterator {
Expand All @@ -103,8 +103,9 @@ export default class DBSQLOperation implements IOperation {
return new OperationRowsIterator(this, options);
}

public getId() {
return stringify(this.operationHandle?.operationId?.guid || parse(NIL));
public get id() {
const operationId = this.operationHandle?.operationId?.guid;
return operationId ? stringify(operationId) : NIL;
}

/**
Expand All @@ -131,7 +132,7 @@ export default class DBSQLOperation implements IOperation {
const chunk = await this.fetchChunk(fetchChunkOptions);
data.push(chunk);
} while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`);

return data.flat();
}
Expand Down Expand Up @@ -185,7 +186,7 @@ export default class DBSQLOperation implements IOperation {
.getLogger()
.log(
LogLevel.debug,
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.getId()}`,
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.id}`,
);
return result;
}
Expand All @@ -197,7 +198,7 @@ export default class DBSQLOperation implements IOperation {
*/
public async status(progress: boolean = false): Promise<TGetOperationStatusResp> {
await this.failIfClosed();
this.context.getLogger().log(LogLevel.debug, `Fetching status for operation with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Fetching status for operation with id: ${this.id}`);

if (this.operationStatus) {
return this.operationStatus;
Expand All @@ -221,7 +222,7 @@ export default class DBSQLOperation implements IOperation {
return Status.success();
}

this.context.getLogger().log(LogLevel.debug, `Cancelling operation with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Cancelling operation with id: ${this.id}`);

const driver = await this.context.getDriver();
const response = await driver.cancelOperation({
Expand All @@ -245,7 +246,7 @@ export default class DBSQLOperation implements IOperation {
return Status.success();
}

this.context.getLogger().log(LogLevel.debug, `Closing operation with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Closing operation with id: ${this.id}`);

const driver = await this.context.getDriver();
const response =
Expand Down Expand Up @@ -286,7 +287,7 @@ export default class DBSQLOperation implements IOperation {

await this.waitUntilReady(options);

this.context.getLogger().log(LogLevel.debug, `Fetching schema for operation with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Fetching schema for operation with id: ${this.id}`);
const metadata = await this.fetchMetadata();
return metadata.schema ?? null;
}
Expand Down
52 changes: 37 additions & 15 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as fs from 'fs';
import * as path from 'path';
import { stringify, NIL, parse } from 'uuid';
import stream from 'node:stream';
import util from 'node:util';
import { stringify, NIL } from 'uuid';
import fetch, { HeadersInit } from 'node-fetch';
import {
TSessionHandle,
Expand All @@ -27,7 +29,7 @@ import IOperation from './contracts/IOperation';
import DBSQLOperation from './DBSQLOperation';
import Status from './dto/Status';
import InfoValue from './dto/InfoValue';
import { definedOrError } from './utils';
import { definedOrError, LZ4 } from './utils';
import CloseableCollection from './utils/CloseableCollection';
import { LogLevel } from './contracts/IDBSQLLogger';
import HiveDriverError from './errors/HiveDriverError';
Expand All @@ -36,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
import ParameterError from './errors/ParameterError';
import IClientContext, { ClientConfig } from './contracts/IClientContext';

// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
const pipeline = util.promisify(stream.pipeline);

const defaultMaxRows = 100000;

interface OperationResponseShape {
Expand Down Expand Up @@ -135,11 +140,12 @@ export default class DBSQLSession implements IDBSQLSession {
constructor({ handle, context }: DBSQLSessionConstructorOptions) {
this.sessionHandle = handle;
this.context = context;
this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.id}`);
}

public getId() {
return stringify(this.sessionHandle?.sessionId?.guid || parse(NIL));
public get id() {
const sessionId = this.sessionHandle?.sessionId?.guid;
return sessionId ? stringify(sessionId) : NIL;
}

/**
Expand Down Expand Up @@ -184,7 +190,7 @@ export default class DBSQLSession implements IDBSQLSession {
...getArrowOptions(clientConfig),
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters),
canDecompressLZ4Result: clientConfig.useLZ4Compression,
canDecompressLZ4Result: clientConfig.useLZ4Compression && Boolean(LZ4),
});
const response = await this.handleResponse(operationPromise);
const operation = this.createOperation(response);
Expand Down Expand Up @@ -271,20 +277,25 @@ export default class DBSQLSession implements IDBSQLSession {
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
const buffer = await response.arrayBuffer();
fs.writeFileSync(localFile, Buffer.from(buffer));

const fileStream = fs.createWriteStream(localFile);
// `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly
return pipeline(response.body, fileStream);
}

private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();

const response = await fetch(presignedUrl, { method: 'DELETE', headers, agent });
// Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files
// AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200
// Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files.
// AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200.
// Azure, on the other hand, is somewhat stricter and check if file exists before deleting it. And if
// file doesn't exist - Azure returns HTTP 404
if (!response.ok) {
// file doesn't exist - Azure returns HTTP 404.
//
// For us, it's totally okay if file didn't exist before removing. So when we get an HTTP 404 -
// just ignore it and report success. This way we can have a uniform library behavior for all clouds
if (!response.ok && response.status !== 404) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
}
Expand All @@ -301,8 +312,19 @@ export default class DBSQLSession implements IDBSQLSession {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();

const data = fs.readFileSync(localFile);
const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data });
const fileStream = fs.createReadStream(localFile);
const fileInfo = fs.statSync(localFile, { bigint: true });

const response = await fetch(presignedUrl, {
method: 'PUT',
headers: {
...headers,
// This header is required by server
'Content-Length': fileInfo.size.toString(),
},
agent,
body: fileStream,
});
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
Expand Down Expand Up @@ -510,7 +532,7 @@ export default class DBSQLSession implements IDBSQLSession {
this.onClose?.();
this.isOpen = false;

this.context.getLogger().log(LogLevel.debug, `Session closed with id: ${this.getId()}`);
this.context.getLogger().log(LogLevel.debug, `Session closed with id: ${this.id}`);
return new Status(response.status);
}

Expand Down
5 changes: 5 additions & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ export type CrossReferenceRequest = {
};

export default interface IDBSQLSession {
/**
* Session identifier
*/
readonly id: string;

/**
* Returns general information about the data source
*
Expand Down
5 changes: 5 additions & 0 deletions lib/contracts/IOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ export interface IOperationRowsIterator extends AsyncIterableIterator<object> {
}

export default interface IOperation {
/**
* Operation identifier
*/
readonly id: string;

/**
* Fetch a portion of data
*/
Expand Down
9 changes: 7 additions & 2 deletions lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import LZ4 from 'lz4';
import { TGetResultSetMetadataResp, TRowSet } from '../../thrift/TCLIService_types';
import HiveDriverError from '../errors/HiveDriverError';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { ArrowBatch, hiveSchemaToArrowSchema } from './utils';
import { LZ4 } from '../utils';

export default class ArrowResultHandler implements IResultsProvider<ArrowBatch> {
protected readonly context: IClientContext;
Expand All @@ -24,6 +25,10 @@ export default class ArrowResultHandler implements IResultsProvider<ArrowBatch>
// so it's possible to infer Arrow schema from Hive schema ignoring `useArrowNativeTypes` option
this.arrowSchema = arrowSchema ?? hiveSchemaToArrowSchema(schema);
this.isLZ4Compressed = lz4Compressed ?? false;

if (this.isLZ4Compressed && !LZ4) {
throw new HiveDriverError('Cannot handle LZ4 compressed result: module `lz4` not installed');
}
}

public async hasMore() {
Expand All @@ -47,7 +52,7 @@ export default class ArrowResultHandler implements IResultsProvider<ArrowBatch>
let totalRowCount = 0;
rowSet?.arrowBatches?.forEach(({ batch, rowCount }) => {
if (batch) {
batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch);
batches.push(this.isLZ4Compressed ? LZ4!.decode(batch) : batch);
totalRowCount += rowCount.toNumber(true);
}
});
Expand Down
9 changes: 7 additions & 2 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import LZ4 from 'lz4';
import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import HiveDriverError from '../errors/HiveDriverError';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { ArrowBatch } from './utils';
import { LZ4 } from '../utils';

export default class CloudFetchResultHandler implements IResultsProvider<ArrowBatch> {
protected readonly context: IClientContext;
Expand All @@ -24,6 +25,10 @@ export default class CloudFetchResultHandler implements IResultsProvider<ArrowBa
this.context = context;
this.source = source;
this.isLZ4Compressed = lz4Compressed ?? false;

if (this.isLZ4Compressed && !LZ4) {
throw new HiveDriverError('Cannot handle LZ4 compressed result: module `lz4` not installed');
}
}

public async hasMore() {
Expand Down Expand Up @@ -58,7 +63,7 @@ export default class CloudFetchResultHandler implements IResultsProvider<ArrowBa
}

if (this.isLZ4Compressed) {
batch.batches = batch.batches.map((buffer) => LZ4.decode(buffer));
batch.batches = batch.batches.map((buffer) => LZ4!.decode(buffer));
}
return batch;
}
Expand Down
3 changes: 2 additions & 1 deletion lib/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import definedOrError from './definedOrError';
import buildUserAgentString from './buildUserAgentString';
import formatProgress, { ProgressUpdateTransformer } from './formatProgress';
import LZ4 from './lz4';

export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer };
export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4 };
16 changes: 16 additions & 0 deletions lib/utils/lz4.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import type LZ4Namespace from 'lz4';

type LZ4Module = typeof LZ4Namespace;

function tryLoadLZ4Module(): LZ4Module | undefined {
try {
return require('lz4'); // eslint-disable-line global-require
} catch (err) {
const isModuleNotFoundError = err instanceof Error && 'code' in err && err.code === 'MODULE_NOT_FOUND';
if (!isModuleNotFoundError) {
throw err;
}
}
}

export default tryLoadLZ4Module();
Loading

0 comments on commit 884fdf7

Please sign in to comment.