Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-930] Add parameterized query support. #162

Merged
merged 11 commits into from
Sep 6, 2023
2 changes: 1 addition & 1 deletion lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import thrift, { HttpHeaders } from 'thrift';

import { EventEmitter } from 'events';
nithinkdb marked this conversation as resolved.
Show resolved Hide resolved
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
Expand Down
19 changes: 19 additions & 0 deletions lib/DBSQLParameter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
interface ParameterInput {
name?: string;
type?: string;
value?: any;
}

export default class DBSQLParameter {
name?: string;

type?: string;

value?: any;

public constructor({ name, type, value }: ParameterInput) {
this.name = name;
this.type = type;
this.value = value;
}
}
2 changes: 2 additions & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import CloseableCollection from './utils/CloseableCollection';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import HiveDriverError from './errors/HiveDriverError';
import globalConfig from './globalConfig';
import convertToSparkParameters from './utils/convertToSparkParameters';

const defaultMaxRows = 100000;

Expand Down Expand Up @@ -140,6 +141,7 @@ export default class DBSQLSession implements IDBSQLSession {
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
canDownloadResult: options.useCloudFetch ?? globalConfig.useCloudFetch,
parameters: options.parameters ? convertToSparkParameters(options.parameters) : undefined,
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand Down
2 changes: 1 addition & 1 deletion lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import thrift from 'thrift';
import https from 'https';
import http, { IncomingMessage } from 'http';
import thrift from 'thrift';

kravets-levko marked this conversation as resolved.
Show resolved Hide resolved
import IThriftConnection from '../contracts/IThriftConnection';
import IConnectionProvider from '../contracts/IConnectionProvider';
Expand Down
2 changes: 2 additions & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import IOperation from './IOperation';
import Status from '../dto/Status';
import InfoValue from '../dto/InfoValue';
import { Int64 } from '../hive/Types';
import DBSQLParameter from '../DBSQLParameter';

export type ExecuteStatementOptions = {
queryTimeout?: Int64;
runAsync?: boolean;
maxRows?: number | null;
useCloudFetch?: boolean;
parameters?: DBSQLParameter[];
};

export type TypeInfoRequest = {
Expand Down
85 changes: 85 additions & 0 deletions lib/utils/convertToSparkParameters.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { TSparkParameter, TSparkParameterValue } from '../../thrift/TCLIService_types';
import DBSQLParameter from '../DBSQLParameter';
import HiveDriverError from '../errors/HiveDriverError';

function convertToDBSQLParameters(values: any[]): DBSQLParameter[] {
const params: DBSQLParameter[] = [];
for (const value of values) {
switch (typeof value) {
case 'object':
if (value === null) {
params.push(new DBSQLParameter({}));
break;
}
if (value instanceof DBSQLParameter) {
params.push(value);
break;
}
throw new HiveDriverError('Unsupported object type used for parameterized query.');
default:
params.push(new DBSQLParameter({ value }));
}
}
return params;
}
// Possible inputs to the params array:
// Naked args (will be converted to DBParameter)
function inferTypes(params: DBSQLParameter[]): void {
for (const param of params) {
if (!param.type) {
switch (typeof param.value) {
case 'undefined':
param.type = 'VOID';
break;
case 'boolean':
param.type = 'BOOLEAN';
param.value = param.value.toString();
break;
case 'number':
if (Number.isInteger(param.value)) {
param.type = 'INTEGER';
} else {
param.type = 'DOUBLE';
}
param.value = param.value.toString();
break;
case 'string':
param.type = 'STRING';
break;
default:
throw new HiveDriverError('Unsupported object type used for parameterized query.');
}
}
}
}
export default function convertToSparkParameters(values: any[]): TSparkParameter[] {
const params = convertToDBSQLParameters(values);
const retVal: TSparkParameter[] = [];
inferTypes(params);
for (const param of params) {
switch (typeof param.value) {
case 'string':
retVal.push(
new TSparkParameter({
name: param.name,
value: new TSparkParameterValue({ stringValue: param.value }),
type: param.type,
}),
);
break;
case 'undefined':
retVal.push(new TSparkParameter({ name: param.name, value: new TSparkParameterValue({}), type: param.type }));
break;
default:
// Cast to a string and then return param
retVal.push(
new TSparkParameter({
name: param.name,
value: new TSparkParameterValue({ stringValue: param.value.toString() }),
type: param.type,
}),
);
}
}
return retVal;
}
37 changes: 37 additions & 0 deletions tests/unit/parameterized_query.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const { expect } = require('chai');

const DBSQLClient = require('../../dist/DBSQLClient').default;
const convertToSparkParameters = require('../../dist/utils/convertToSparkParameters').default;
const { TSparkParameterValue, TSparkParameter } = require('../../thrift/TCLIService_types');
const { default: DBSQLParameter } = require('../../dist/DBSQLParameter');

describe('Test Inference', () => {
it('should infer types correctly', () => {
let params = convertToSparkParameters([null, 'value', 1, 1.1, true]);
expect(params[0]).to.deep.eq(new TSparkParameter({ type: 'VOID', value: new TSparkParameterValue() }));
expect(params[1]).to.deep.eq(
new TSparkParameter({ type: 'STRING', value: new TSparkParameterValue({ stringValue: 'value' }) }),
);
expect(params[2]).to.deep.eq(
new TSparkParameter({ type: 'INTEGER', value: new TSparkParameterValue({ stringValue: '1' }) }),
);
expect(params[3]).to.deep.eq(
new TSparkParameter({ type: 'DOUBLE', value: new TSparkParameterValue({ stringValue: '1.1' }) }),
);
expect(params[4]).to.deep.eq(
new TSparkParameter({ type: 'BOOLEAN', value: new TSparkParameterValue({ stringValue: 'true' }) }),
);
});
it('should preserve name info', () => {
let params = convertToSparkParameters([
new DBSQLParameter({ name: '1', value: 26 }),
new DBSQLParameter({ name: '2', value: 6.2, type: 'DECIMAL' }),
]);
expect(params[0]).to.deep.eq(
new TSparkParameter({ name: '1', type: 'INTEGER', value: new TSparkParameterValue({ stringValue: '26' }) }),
);
expect(params[1]).to.deep.eq(
new TSparkParameter({ name: '2', type: 'DECIMAL', value: new TSparkParameterValue({ stringValue: '6.2' }) }),
);
});
});
Loading