Skip to content

Commit

Permalink
[PECO-618] Better handling of in-progress/subsequent action taken on …
Browse files Browse the repository at this point in the history
…closed session/operation (#129)

* Explicitly close sessions when closing connection

Signed-off-by: Levko Kravets <[email protected]>

* Close operations when closing session

Signed-off-by: Levko Kravets <[email protected]>

* Extract closable objects logic to a dedicated class

Signed-off-by: Levko Kravets <[email protected]>

* Add error code to OperationStateError

Signed-off-by: Levko Kravets <[email protected]>

* Handle closed/cancelled state in DBSQLOperation

Signed-off-by: Levko Kravets <[email protected]>

* Convert DBSQLSession to async/await syntax

Signed-off-by: Levko Kravets <[email protected]>

* Handle closed state in DBSQLSession

Signed-off-by: Levko Kravets <[email protected]>

* DBSQLOperation/CompleteOperationHelper: handle direct results close response only when operation is being closed

Signed-off-by: Levko Kravets <[email protected]>

* When all records were returned via direct results and operation was closed, it still attempted to fetch data

Signed-off-by: Levko Kravets <[email protected]>

* Fix tests (wrong test conditions, running operations on closed session)

Signed-off-by: Levko Kravets <[email protected]>

* Fix lint errors

Signed-off-by: Levko Kravets <[email protected]>

* Fix after merge conflicts

Signed-off-by: Levko Kravets <[email protected]>

* Tidy up code

Signed-off-by: Levko Kravets <[email protected]>

* Add tests

Signed-off-by: Levko Kravets <[email protected]>

* Fix/update tests

Signed-off-by: Levko Kravets <[email protected]>

---------

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Aug 17, 2023
1 parent f265368 commit c9e27f4
Show file tree
Hide file tree
Showing 10 changed files with 585 additions and 48 deletions.
11 changes: 10 additions & 1 deletion lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth from './connection/auth/DatabricksOAuth';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import DBSQLLogger from './DBSQLLogger';
import CloseableCollection from './utils/CloseableCollection';

function prependSlash(str: string): string {
if (str.length > 0 && str.charAt(0) !== '/') {
Expand Down Expand Up @@ -52,6 +53,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {

private readonly thrift = thrift;

private sessions = new CloseableCollection<DBSQLSession>();

constructor(options?: ClientOptions) {
super();
this.logger = options?.logger || new DBSQLLogger();
Expand Down Expand Up @@ -147,7 +150,11 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
});

Status.assert(response.status);
return new DBSQLSession(driver, definedOrError(response.sessionHandle), this.logger);
const session = new DBSQLSession(driver, definedOrError(response.sessionHandle), {
logger: this.logger,
});
this.sessions.add(session);
return session;
}

private async getClient() {
Expand Down Expand Up @@ -206,6 +213,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
}

public async close(): Promise<void> {
await this.sessions.closeAll();

this.client = null;
this.authProvider = null;
this.connectionOptions = null;
Expand Down
14 changes: 7 additions & 7 deletions lib/DBSQLOperation/OperationStatusHelper.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { TOperationHandle, TOperationState, TGetOperationStatusResp } from '../../thrift/TCLIService_types';
import { TGetOperationStatusResp, TOperationHandle, TOperationState } from '../../thrift/TCLIService_types';
import HiveDriver from '../hive/HiveDriver';
import Status from '../dto/Status';
import { WaitUntilReadyOptions } from '../contracts/IOperation';
import OperationStateError from '../errors/OperationStateError';
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';

async function delay(ms?: number): Promise<void> {
return new Promise((resolve) => {
Expand Down Expand Up @@ -92,16 +92,16 @@ export default class OperationStatusHelper {
case TOperationState.FINISHED_STATE:
return true;
case TOperationState.CANCELED_STATE:
throw new OperationStateError('The operation was canceled by a client', response);
throw new OperationStateError(OperationStateErrorCode.Canceled, response);
case TOperationState.CLOSED_STATE:
throw new OperationStateError('The operation was closed by a client', response);
throw new OperationStateError(OperationStateErrorCode.Closed, response);
case TOperationState.ERROR_STATE:
throw new OperationStateError('The operation failed due to an error', response);
throw new OperationStateError(OperationStateErrorCode.Error, response);
case TOperationState.TIMEDOUT_STATE:
throw new OperationStateError('The operation is in a timed out state', response);
throw new OperationStateError(OperationStateErrorCode.Timeout, response);
case TOperationState.UKNOWN_STATE:
default:
throw new OperationStateError('The operation is in an unrecognized state', response);
throw new OperationStateError(OperationStateErrorCode.Unknown, response);
}
}

Expand Down
75 changes: 67 additions & 8 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { stringify, NIL, parse } from 'uuid';
import IOperation, { FetchOptions, GetSchemaOptions, FinishedOptions } from '../contracts/IOperation';
import IOperation, {
FetchOptions,
FinishedOptions,
GetSchemaOptions,
WaitUntilReadyOptions,
} from '../contracts/IOperation';
import HiveDriver from '../hive/HiveDriver';
import {
TGetOperationStatusResp,
Expand All @@ -8,22 +13,28 @@ import {
TSparkDirectResults,
} from '../../thrift/TCLIService_types';
import Status from '../dto/Status';

import OperationStatusHelper from './OperationStatusHelper';
import SchemaHelper from './SchemaHelper';
import FetchResultsHelper from './FetchResultsHelper';
import CompleteOperationHelper from './CompleteOperationHelper';
import IDBSQLLogger, { LogLevel } from '../contracts/IDBSQLLogger';
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';

const defaultMaxRows = 100000;

interface DBSQLOperationConstructorOptions {
logger: IDBSQLLogger;
}

export default class DBSQLOperation implements IOperation {
private readonly driver: HiveDriver;

private readonly operationHandle: TOperationHandle;

private readonly logger: IDBSQLLogger;

public onClose?: () => void;

private readonly _status: OperationStatusHelper;

private readonly _schema: SchemaHelper;
Expand All @@ -35,7 +46,7 @@ export default class DBSQLOperation implements IOperation {
constructor(
driver: HiveDriver,
operationHandle: TOperationHandle,
logger: IDBSQLLogger,
{ logger }: DBSQLOperationConstructorOptions,
directResults?: TSparkDirectResults,
) {
this.driver = driver;
Expand Down Expand Up @@ -95,17 +106,21 @@ export default class DBSQLOperation implements IOperation {
* const result = await queryOperation.fetchChunk({maxRows: 1000});
*/
public async fetchChunk(options?: FetchOptions): Promise<Array<object>> {
await this.failIfClosed();

if (!this._status.hasResultSet) {
return [];
}

await this._status.waitUntilReady(options);
await this.waitUntilReady(options);

const [resultHandler, data] = await Promise.all([
this._schema.getResultHandler(),
this._data.fetch(options?.maxRows || defaultMaxRows),
]);

await this.failIfClosed();

const result = await resultHandler.getValue(data ? [data] : []);
this.logger?.log(
LogLevel.debug,
Expand All @@ -120,6 +135,7 @@ export default class DBSQLOperation implements IOperation {
* @throws {StatusError}
*/
public async status(progress: boolean = false): Promise<TGetOperationStatusResp> {
await this.failIfClosed();
this.logger?.log(LogLevel.debug, `Fetching status for operation with id: ${this.getId()}`);
return this._status.status(progress);
}
Expand All @@ -129,21 +145,37 @@ export default class DBSQLOperation implements IOperation {
* @throws {StatusError}
*/
public async cancel(): Promise<Status> {
if (this._completeOperation.closed || this._completeOperation.cancelled) {
return Status.success();
}

this.logger?.log(LogLevel.debug, `Cancelling operation with id: ${this.getId()}`);
return this._completeOperation.cancel();
const result = this._completeOperation.cancel();

// Cancelled operation becomes unusable, similarly to being closed
this.onClose?.();
return result;
}

/**
* Closes operation
* @throws {StatusError}
*/
public async close(): Promise<Status> {
if (this._completeOperation.closed || this._completeOperation.cancelled) {
return Status.success();
}

this.logger?.log(LogLevel.debug, `Closing operation with id: ${this.getId()}`);
return this._completeOperation.close();
const result = await this._completeOperation.close();

this.onClose?.();
return result;
}

public async finished(options?: FinishedOptions): Promise<void> {
await this._status.waitUntilReady(options);
await this.failIfClosed();
await this.waitUntilReady(options);
}

public async hasMoreRows(): Promise<boolean> {
Expand All @@ -163,13 +195,40 @@ export default class DBSQLOperation implements IOperation {
}

public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
await this.failIfClosed();

if (!this._status.hasResultSet) {
return null;
}

await this._status.waitUntilReady(options);
await this.waitUntilReady(options);

this.logger?.log(LogLevel.debug, `Fetching schema for operation with id: ${this.getId()}`);
return this._schema.fetch();
}

private async failIfClosed(): Promise<void> {
if (this._completeOperation.closed) {
throw new OperationStateError(OperationStateErrorCode.Closed);
}
if (this._completeOperation.cancelled) {
throw new OperationStateError(OperationStateErrorCode.Canceled);
}
}

private async waitUntilReady(options?: WaitUntilReadyOptions) {
try {
await this._status.waitUntilReady(options);
} catch (error) {
if (error instanceof OperationStateError) {
if (error.errorCode === OperationStateErrorCode.Canceled) {
this._completeOperation.cancelled = true;
}
if (error.errorCode === OperationStateErrorCode.Closed) {
this._completeOperation.closed = true;
}
}
throw error;
}
}
}
Loading

0 comments on commit c9e27f4

Please sign in to comment.