Skip to content

Commit

Permalink
Merge branch 'main' into PECO-618-handle-closed-objects
Browse files Browse the repository at this point in the history
  • Loading branch information
kravets-levko committed Aug 16, 2023
2 parents c48e9bc + f265368 commit 0f763e7
Show file tree
Hide file tree
Showing 56 changed files with 2,919 additions and 639 deletions.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"no-bitwise": "off",
"@typescript-eslint/no-throw-literal": "off",
"no-restricted-syntax": "off",
"no-case-declarations": "off"
"no-case-declarations": "off",
"max-classes-per-file": "off"
}
}
]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
E2E_TABLE_SUFFIX: ${{github.sha}}
run: |
npm ci
npm run e2e
NODE_OPTIONS="--max-old-space-size=4096" npm run e2e
- name: Coverage
uses: codecov/codecov-action@v3
with:
Expand Down
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Release History

## 1.x (Unreleased)
## 1.3.0

- Implemented automatic retry for some HTTP errors (429, 503) (databricks/databricks-sql-nodejs#127)
- Implemented request timeout + added option to configure it (databricks/databricks-sql-nodejs#148)
- Added OAuth (U2M) support for AWS and Azure (databricks/databricks-sql-nodejs#147 and databricks/databricks-sql-nodejs#154)
- Fixed bug: for Arrow results, `null` values were ignored (@ivan-parada databricks/databricks-sql-nodejs#151)

## 1.2.1

Expand Down
186 changes: 110 additions & 76 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import thrift from 'thrift';
import thrift, { HttpHeaders } from 'thrift';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ConnectionOptions, OpenSessionRequest, ClientOptions } from './contracts/IDBSQLClient';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
import HiveDriver from './hive/HiveDriver';
import { Int64 } from './hive/Types';
import DBSQLSession from './DBSQLSession';
import IDBSQLSession from './contracts/IDBSQLSession';
import IThriftConnection from './connection/contracts/IThriftConnection';
import IConnectionProvider from './connection/contracts/IConnectionProvider';
import IAuthentication from './connection/contracts/IAuthentication';
import NoSaslAuthentication from './connection/auth/NoSaslAuthentication';
import HttpConnection from './connection/connections/HttpConnection';
import IConnectionOptions from './connection/contracts/IConnectionOptions';
import Status from './dto/Status';
import HiveDriverError from './errors/HiveDriverError';
import { buildUserAgentString, definedOrError } from './utils';
import { areHeadersEqual, buildUserAgentString, definedOrError } from './utils';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth from './connection/auth/DatabricksOAuth';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import DBSQLLogger from './DBSQLLogger';
import CloseableCollection from './utils/CloseableCollection';
Expand All @@ -43,13 +41,13 @@ function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
}

export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
private client: TCLIService.Client | null;
private client: TCLIService.Client | null = null;

private connection: IThriftConnection | null;
private authProvider: IAuthentication | null = null;

private connectionProvider: IConnectionProvider;
private connectionOptions: ConnectionOptions | null = null;

private authProvider: IAuthentication;
private additionalHeaders: HttpHeaders = {};

private readonly logger: IDBSQLLogger;

Expand All @@ -59,78 +57,78 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {

constructor(options?: ClientOptions) {
super();
this.connectionProvider = new HttpConnection();
this.authProvider = new NoSaslAuthentication();
this.logger = options?.logger || new DBSQLLogger();
this.client = null;
this.connection = null;
this.logger.log(LogLevel.info, 'Created DBSQLClient');
}

private getConnectionOptions(options: ConnectionOptions): IConnectionOptions {
const { host, port, path, token, clientId, ...otherOptions } = options;
private getConnectionOptions(options: ConnectionOptions, headers: HttpHeaders): IConnectionOptions {
const {
host,
port,
path,
clientId,
authType,
// @ts-expect-error TS2339: Property 'token' does not exist on type 'ConnectionOptions'
token,
// @ts-expect-error TS2339: Property 'persistence' does not exist on type 'ConnectionOptions'
persistence,
// @ts-expect-error TS2339: Property 'provider' does not exist on type 'ConnectionOptions'
provider,
...otherOptions
} = options;

return {
host,
port: port || 443,
options: {
path: prependSlash(path),
https: true,
...otherOptions,
headers: {
...headers,
'User-Agent': buildUserAgentString(options.clientId),
},
},
};
}

private getAuthProvider(options: ConnectionOptions, authProvider?: IAuthentication): IAuthentication {
if (authProvider) {
return authProvider;
}

switch (options.authType) {
case undefined:
case 'access-token':
return new PlainHttpAuthentication({
username: 'token',
password: options.token,
});
case 'databricks-oauth':
return new DatabricksOAuth({
host: options.host,
logger: this.logger,
persistence: options.persistence,
azureTenantId: options.azureTenantId,
});
case 'custom':
return options.provider;
// no default
}
}

/**
* Connects DBSQLClient to endpoint
* @public
* @param options - host, path, and token are required
* @param authProvider - Optional custom authentication provider
* @param authProvider - [DEPRECATED - use `authType: 'custom'] Optional custom authentication provider
* @returns Session object that can be used to execute statements
* @example
* const session = client.connect({host, path, token});
*/
public async connect(options: ConnectionOptions, authProvider?: IAuthentication): Promise<IDBSQLClient> {
this.authProvider =
authProvider ||
new PlainHttpAuthentication({
username: 'token',
password: options.token,
headers: {
'User-Agent': buildUserAgentString(options.clientId),
},
});

this.connection = await this.connectionProvider.connect(this.getConnectionOptions(options), this.authProvider);

this.client = this.thrift.createClient(TCLIService, this.connection.getConnection());

this.connection.getConnection().on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

this.connection.getConnection().on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

this.connection.getConnection().on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});

this.connection.getConnection().on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});

this.authProvider = this.getAuthProvider(options, authProvider);
this.connectionOptions = options;
return this;
}

Expand All @@ -144,11 +142,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
* const session = await client.openSession();
*/
public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> {
if (!this.connection?.isConnected()) {
throw new HiveDriverError('DBSQLClient: connection is lost');
}

const driver = new HiveDriver(this.getClient());
const driver = new HiveDriver(() => this.getClient());

const response = await driver.openSession({
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6),
Expand All @@ -163,26 +157,66 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
return session;
}

public getClient() {
if (!this.client) {
throw new HiveDriverError('DBSQLClient: client is not initialized');
private async getClient() {
if (!this.connectionOptions || !this.authProvider) {
throw new HiveDriverError('DBSQLClient: not connected');
}

const authHeaders = await this.authProvider.authenticate();
// When auth headers change - recreate client. Thrift library does not provide API for updating
// changed options, therefore we have to recreate both connection and client to apply new headers
if (!this.client || !areHeadersEqual(this.additionalHeaders, authHeaders)) {
this.logger.log(LogLevel.info, 'DBSQLClient: initializing thrift client');
this.additionalHeaders = authHeaders;
const connectionOptions = this.getConnectionOptions(this.connectionOptions, this.additionalHeaders);

const connection = await this.createConnection(connectionOptions);
this.client = this.thrift.createClient(TCLIService, connection.getConnection());
}

return this.client;
}

public async close(): Promise<void> {
if (!this.connection) {
return;
}
private async createConnection(options: IConnectionOptions) {
const connectionProvider = new HttpConnection();
const connection = await connectionProvider.connect(options);
const thriftConnection = connection.getConnection();

await this.sessions.closeAll();
thriftConnection.on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

const thriftConnection = this.connection.getConnection();
thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

if (typeof thriftConnection.end === 'function') {
this.connection.getConnection().end();
}
this.connection = null;
thriftConnection.on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});

thriftConnection.on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});

return connection;
}

public async close(): Promise<void> {
await this.sessions.closeAll();

this.client = null;
this.authProvider = null;
this.connectionOptions = null;
}
}
31 changes: 24 additions & 7 deletions lib/DBSQLOperation/SchemaHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Status from '../dto/Status';
import IOperationResult from '../result/IOperationResult';
import JsonResult from '../result/JsonResult';
import ArrowResult from '../result/ArrowResult';
import CloudFetchResult from '../result/CloudFetchResult';
import HiveDriverError from '../errors/HiveDriverError';
import { definedOrError } from '../utils';

Expand All @@ -14,6 +15,8 @@ export default class SchemaHelper {

private metadata?: TGetResultSetMetadataResp;

private resultHandler?: IOperationResult;

constructor(driver: HiveDriver, operationHandle: TOperationHandle, metadata?: TGetResultSetMetadataResp) {
this.driver = driver;
this.operationHandle = operationHandle;
Expand Down Expand Up @@ -41,13 +44,27 @@ export default class SchemaHelper {
const metadata = await this.fetchMetadata();
const resultFormat = definedOrError(metadata.resultFormat);

switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
return new JsonResult(metadata.schema);
case TSparkRowSetType.ARROW_BASED_SET:
return new ArrowResult(metadata.schema, metadata.arrowSchema);
default:
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
if (!this.resultHandler) {
switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
this.resultHandler = new JsonResult(metadata.schema);
break;
case TSparkRowSetType.ARROW_BASED_SET:
this.resultHandler = new ArrowResult(metadata.schema, metadata.arrowSchema);
break;
case TSparkRowSetType.URL_BASED_SET:
this.resultHandler = new CloudFetchResult(metadata.schema);
break;
default:
this.resultHandler = undefined;
break;
}
}

if (!this.resultHandler) {
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
}

return this.resultHandler;
}
}
13 changes: 11 additions & 2 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export default class DBSQLOperation implements IOperation {

await this.failIfClosed();

const result = resultHandler.getValue(data ? [data] : []);
const result = await resultHandler.getValue(data ? [data] : []);
this.logger?.log(
LogLevel.debug,
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.getId()}`,
Expand Down Expand Up @@ -179,10 +179,19 @@ export default class DBSQLOperation implements IOperation {
}

public async hasMoreRows(): Promise<boolean> {
// If operation is closed or cancelled - we should not try to get data from it
if (this._completeOperation.closed || this._completeOperation.cancelled) {
return false;
}
return this._data.hasMoreRows;

// Return early if there are still data available for fetching
if (this._data.hasMoreRows) {
return true;
}

// If we fetched all the data from server - check if there's anything buffered in result handler
const resultHandler = await this._schema.getResultHandler();
return resultHandler.hasPendingData();
}

public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
Expand Down
1 change: 1 addition & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export default class DBSQLSession implements IDBSQLSession {
runAsync: options.runAsync || false,
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
canDownloadResult: options.useCloudFetch ?? globalConfig.useCloudFetch,
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand Down
Loading

0 comments on commit 0f763e7

Please sign in to comment.