Skip to content

Commit

Permalink
[PECO-238] Initial CloudFetch implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Aug 3, 2023
1 parent 6de01cb commit 1731abd
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 25 deletions.
51 changes: 39 additions & 12 deletions lib/DBSQLOperation/FetchResultsHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default class FetchResultsHelper {

private fetchOrientation: TFetchOrientation = TFetchOrientation.FETCH_FIRST;

private prefetchedResults: TFetchResultsResp[] = [];
private pendingResults: TFetchResultsResp[] = [];

private readonly returnOnlyPrefetchedResults: boolean;

Expand All @@ -58,7 +58,7 @@ export default class FetchResultsHelper {
this.operationHandle = operationHandle;
prefetchedResults.forEach((item) => {
if (item) {
this.prefetchedResults.push(item);
this.prepareCloudFetchChunks(item);
}
});
this.returnOnlyPrefetchedResults = returnOnlyPrefetchedResults;
Expand All @@ -68,7 +68,7 @@ export default class FetchResultsHelper {
Status.assert(response.status);
this.fetchOrientation = TFetchOrientation.FETCH_NEXT;

if (this.prefetchedResults.length > 0) {
if (this.pendingResults.length > 0) {
this.hasMoreRows = true;
} else if (this.returnOnlyPrefetchedResults) {
this.hasMoreRows = false;
Expand All @@ -80,18 +80,45 @@ export default class FetchResultsHelper {
}

public async fetch(maxRows: number) {
const prefetchedResponse = this.prefetchedResults.shift();
if (prefetchedResponse) {
return this.processFetchResponse(prefetchedResponse);
if (this.pendingResults.length == 0) {
const results = await this.driver.fetchResults({
operationHandle: this.operationHandle,
orientation: this.fetchOrientation,
maxRows: new Int64(maxRows),
fetchType: FetchType.Data,
});

this.prepareCloudFetchChunks(results);
}

const response = await this.driver.fetchResults({
operationHandle: this.operationHandle,
orientation: this.fetchOrientation,
maxRows: new Int64(maxRows),
fetchType: FetchType.Data,
});
const response = this.pendingResults.shift();
// This check is rather for safety and to make TS happy. In practice, such a case should not happen
if (!response) {
throw new Error('Unexpected error: no more data');
}

return this.processFetchResponse(response);
}

private prepareCloudFetchChunks(response: TFetchResultsResp) {
// TODO: Make it configurable. Effectively, this is a concurrent downloads limit for an operation
const maxLinkCount = 1;

if (response.results && response.results.resultLinks && response.results.resultLinks.length > 0) {
const allLinks = [...response.results.resultLinks];
while (allLinks.length > 0) {
// Shallow clone the original response object, but rewrite cloud fetch links array
// to contain the only entry
const responseFragment = {
...response,
results: {
...response.results,
resultLinks: allLinks.splice(0, maxLinkCount),
},
};

this.pendingResults.push(responseFragment);
}
}
}
}
31 changes: 24 additions & 7 deletions lib/DBSQLOperation/SchemaHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Status from '../dto/Status';
import IOperationResult from '../result/IOperationResult';
import JsonResult from '../result/JsonResult';
import ArrowResult from '../result/ArrowResult';
import CloudFetchResult from '../result/CloudFetchResult';
import HiveDriverError from '../errors/HiveDriverError';
import { definedOrError } from '../utils';

Expand All @@ -14,6 +15,8 @@ export default class SchemaHelper {

private metadata?: TGetResultSetMetadataResp;

private resultHandler?: IOperationResult;

constructor(driver: HiveDriver, operationHandle: TOperationHandle, metadata?: TGetResultSetMetadataResp) {
this.driver = driver;
this.operationHandle = operationHandle;
Expand Down Expand Up @@ -41,13 +44,27 @@ export default class SchemaHelper {
const metadata = await this.fetchMetadata();
const resultFormat = definedOrError(metadata.resultFormat);

switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
return new JsonResult(metadata.schema);
case TSparkRowSetType.ARROW_BASED_SET:
return new ArrowResult(metadata.schema, metadata.arrowSchema);
default:
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
if (!this.resultHandler) {
switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
this.resultHandler = new JsonResult(metadata.schema);
break;
case TSparkRowSetType.ARROW_BASED_SET:
this.resultHandler = new ArrowResult(metadata.schema, metadata.arrowSchema);
break;
case TSparkRowSetType.URL_BASED_SET:
this.resultHandler = new CloudFetchResult(metadata.schema, metadata.arrowSchema);
break;
default:
this.resultHandler = undefined;
break;
}
}

if (!this.resultHandler) {
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
}

return this.resultHandler;
}
}
2 changes: 1 addition & 1 deletion lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export default class DBSQLOperation implements IOperation {
this._data.fetch(options?.maxRows || defaultMaxRows),
]);

const result = resultHandler.getValue(data ? [data] : []);
const result = await resultHandler.getValue(data ? [data] : []);
this.logger?.log(
LogLevel.debug,
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.getId()}`,
Expand Down
1 change: 1 addition & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export default class DBSQLSession implements IDBSQLSession {
runAsync: options.runAsync || false,
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
canDownloadResult: options.useCloudFetch ?? false,
});

return this.createOperation(response);
Expand Down
1 change: 1 addition & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type ExecuteStatementOptions = {
queryTimeout?: Int64;
runAsync?: boolean;
maxRows?: number | null;
useCloudFetch?: boolean;
};

export type TypeInfoRequest = {
Expand Down
10 changes: 7 additions & 3 deletions lib/result/ArrowResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ export default class ArrowResult implements IOperationResult {
this.arrowSchema = arrowSchema;
}

getValue(data?: Array<TRowSet>) {
async getValue(data?: Array<TRowSet>) {
if (this.schema.length === 0 || !this.arrowSchema || !data) {
return [];
}

const batches = this.getBatches(data);
const batches = await this.getBatches(data);
return this.batchesToRows(batches);
}

protected batchesToRows(batches: Array<Buffer>) {
if (batches.length === 0) {
return [];
}
Expand All @@ -44,7 +48,7 @@ export default class ArrowResult implements IOperationResult {
return this.getRows(table.schema, table.toArray());
}

private getBatches(data: Array<TRowSet>): Array<Buffer> {
protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
const result: Array<Buffer> = [];

data.forEach((rowSet) => {
Expand Down
43 changes: 43 additions & 0 deletions lib/result/CloudFetchResult.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Buffer } from 'buffer';
import fetch from 'node-fetch';
import { TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import ArrowResult from './ArrowResult';

export default class CloudFetchResult extends ArrowResult {
protected batchesToRows(batches: Array<Buffer>) {
if (batches.length === 1) {
return super.batchesToRows(batches);
}

const results: Array<Array<any>> = [];

for (const batch of batches) {
results.push(super.batchesToRows([batch]));
}

return results.flat(1);
}

protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
const tasks: Array<Promise<Buffer>> = [];

data?.forEach((item) => {
item.resultLinks?.forEach((link) => {
tasks.push(this.downloadLink(link));
});
});

return await Promise.all(tasks);
}

private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
// TODO: Process expired links
const response = await fetch(link.fileLink);
if (!response.ok) {
throw new Error(`CloudFetch HTTP error ${response.status} ${response.statusText}`);
}

const result = await response.arrayBuffer();
return Buffer.from(result);
}
}
2 changes: 1 addition & 1 deletion lib/result/IOperationResult.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TRowSet } from '../../thrift/TCLIService_types';

export default interface IOperationResult {
getValue(data?: Array<TRowSet>): any;
getValue(data?: Array<TRowSet>): Promise<any>;
}
2 changes: 1 addition & 1 deletion lib/result/JsonResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default class JsonResult implements IOperationResult {
this.schema = getSchemaColumns(schema);
}

getValue(data?: Array<TRowSet>): Array<object> {
async getValue(data?: Array<TRowSet>): Promise<Array<object>> {
if (this.schema.length === 0 || !data) {
return [];
}
Expand Down
Loading

0 comments on commit 1731abd

Please sign in to comment.