Skip to content

Commit

Permalink
[PECO-930] Add parameterized query support. (#162)
Browse files Browse the repository at this point in the history
* Initial commit

* Fixed comments

* Reverted change

* Updated to new standard

* Revert irrelevant changes

Signed-off-by: Levko Kravets <[email protected]>

* Refactoring + improve tests

Signed-off-by: Levko Kravets <[email protected]>

* Removed void pointers

* Remove leftovers from removing VOID

Signed-off-by: Levko Kravets <[email protected]>

* Add Date/Time parameters support

Signed-off-by: Levko Kravets <[email protected]>

* Support primitive values as named parameters

Signed-off-by: Levko Kravets <[email protected]>

---------

Signed-off-by: Levko Kravets <[email protected]>
Co-authored-by: Levko Kravets <[email protected]>
  • Loading branch information
nithinkdb and kravets-levko authored Sep 6, 2023
1 parent 67bea27 commit ad10743
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 1 deletion.
65 changes: 65 additions & 0 deletions lib/DBSQLParameter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import Int64 from 'node-int64';
import { TSparkParameter, TSparkParameterValue } from '../thrift/TCLIService_types';

export type DBSQLParameterValue = boolean | number | bigint | Int64 | Date | string;

interface DBSQLParameterOptions {
type?: string;
value: DBSQLParameterValue;
}

export default class DBSQLParameter {
public readonly type?: string;

public readonly value: DBSQLParameterValue;

constructor({ type, value }: DBSQLParameterOptions) {
this.type = type;
this.value = value;
}

public toSparkParameter(): TSparkParameter {
if (typeof this.value === 'boolean') {
return new TSparkParameter({
type: this.type ?? 'BOOLEAN',
value: new TSparkParameterValue({
stringValue: this.value ? 'TRUE' : 'FALSE',
}),
});
}

if (typeof this.value === 'number') {
return new TSparkParameter({
type: this.type ?? (Number.isInteger(this.value) ? 'INTEGER' : 'DOUBLE'),
value: new TSparkParameterValue({
stringValue: Number(this.value).toString(),
}),
});
}

if (this.value instanceof Int64 || typeof this.value === 'bigint') {
return new TSparkParameter({
type: this.type ?? 'BIGINT',
value: new TSparkParameterValue({
stringValue: this.value.toString(),
}),
});
}

if (this.value instanceof Date) {
return new TSparkParameter({
type: this.type ?? 'TIMESTAMP',
value: new TSparkParameterValue({
stringValue: this.value.toISOString(),
}),
});
}

return new TSparkParameter({
type: this.type ?? 'STRING',
value: new TSparkParameterValue({
stringValue: this.value,
}),
});
}
}
21 changes: 21 additions & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
TOperationHandle,
TSparkDirectResults,
TSparkArrowTypes,
TSparkParameter,
} from '../thrift/TCLIService_types';
import HiveDriver from './hive/HiveDriver';
import { Int64 } from './hive/Types';
Expand All @@ -29,6 +30,7 @@ import CloseableCollection from './utils/CloseableCollection';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import HiveDriverError from './errors/HiveDriverError';
import globalConfig from './globalConfig';
import DBSQLParameter, { DBSQLParameterValue } from './DBSQLParameter';

const defaultMaxRows = 100000;

Expand Down Expand Up @@ -74,6 +76,24 @@ function getArrowOptions(): {
};
}

function getQueryParameters(
namedParameters?: Record<string, DBSQLParameter | DBSQLParameterValue>,
): Array<TSparkParameter> {
const result: Array<TSparkParameter> = [];

if (namedParameters !== undefined) {
for (const name of Object.keys(namedParameters)) {
const value = namedParameters[name];
const param = value instanceof DBSQLParameter ? value : new DBSQLParameter({ value });
const sparkParam = param.toSparkParameter();
sparkParam.name = name;
result.push(sparkParam);
}
}

return result;
}

interface DBSQLSessionConstructorOptions {
logger: IDBSQLLogger;
}
Expand Down Expand Up @@ -140,6 +160,7 @@ export default class DBSQLSession implements IDBSQLSession {
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
canDownloadResult: options.useCloudFetch ?? globalConfig.useCloudFetch,
parameters: getQueryParameters(options.namedParameters),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand Down
2 changes: 2 additions & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import IOperation from './IOperation';
import Status from '../dto/Status';
import InfoValue from '../dto/InfoValue';
import { Int64 } from '../hive/Types';
import DBSQLParameter, { DBSQLParameterValue } from '../DBSQLParameter';

export type ExecuteStatementOptions = {
queryTimeout?: Int64;
runAsync?: boolean;
maxRows?: number | null;
useCloudFetch?: boolean;
namedParameters?: Record<string, DBSQLParameter | DBSQLParameterValue>;
};

export type TypeInfoRequest = {
Expand Down
3 changes: 2 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import TCLIService from '../thrift/TCLIService';
import TCLIService_types from '../thrift/TCLIService_types';
import DBSQLClient from './DBSQLClient';
import DBSQLSession from './DBSQLSession';
import DBSQLParameter from './DBSQLParameter';
import DBSQLLogger from './DBSQLLogger';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import HttpConnection from './connection/connections/HttpConnection';
Expand Down Expand Up @@ -31,4 +32,4 @@ export const utils = {
formatProgress,
};

export { DBSQLClient, DBSQLSession, DBSQLLogger, LogLevel };
export { DBSQLClient, DBSQLSession, DBSQLParameter, DBSQLLogger, LogLevel };
104 changes: 104 additions & 0 deletions tests/e2e/query_parameters.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
const { expect } = require('chai');
const Int64 = require('node-int64');
const config = require('./utils/config');
const { DBSQLClient, DBSQLParameter } = require('../..');

const openSession = async () => {
const client = new DBSQLClient();

const connection = await client.connect({
host: config.host,
path: config.path,
token: config.token,
});

return connection.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
};

describe('Query parameters', () => {
it('should use named parameters', async () => {
const session = await openSession();
const operation = await session.executeStatement(
`
SELECT
:p_bool AS col_bool,
:p_int AS col_int,
:p_double AS col_double,
:p_bigint_1 AS col_bigint_1,
:p_bigint_2 AS col_bigint_2,
:p_date as col_date,
:p_timestamp as col_timestamp,
:p_str AS col_str
`,
{
runAsync: true,
namedParameters: {
p_bool: new DBSQLParameter({ value: true }),
p_int: new DBSQLParameter({ value: 1234 }),
p_double: new DBSQLParameter({ value: 3.14 }),
p_bigint_1: new DBSQLParameter({ value: BigInt(1234) }),
p_bigint_2: new DBSQLParameter({ value: new Int64(1234) }),
p_date: new DBSQLParameter({ value: new Date('2023-09-06T03:14:27.843Z'), type: 'DATE' }),
p_timestamp: new DBSQLParameter({ value: new Date('2023-09-06T03:14:27.843Z') }),
p_str: new DBSQLParameter({ value: 'Hello' }),
},
},
);
const result = await operation.fetchAll();
expect(result).to.deep.equal([
{
col_bool: true,
col_int: 1234,
col_double: 3.14,
col_bigint_1: 1234,
col_bigint_2: 1234,
col_date: new Date('2023-09-06T00:00:00.000Z'),
col_timestamp: new Date('2023-09-06T03:14:27.843Z'),
col_str: 'Hello',
},
]);
});

it('should accept primitives as values for named parameters', async () => {
const session = await openSession();
const operation = await session.executeStatement(
`
SELECT
:p_bool AS col_bool,
:p_int AS col_int,
:p_double AS col_double,
:p_bigint_1 AS col_bigint_1,
:p_bigint_2 AS col_bigint_2,
:p_timestamp as col_timestamp,
:p_str AS col_str
`,
{
runAsync: true,
namedParameters: {
p_bool: true,
p_int: 1234,
p_double: 3.14,
p_bigint_1: BigInt(1234),
p_bigint_2: new Int64(1234),
p_timestamp: new Date('2023-09-06T03:14:27.843Z'),
p_str: 'Hello',
},
},
);
const result = await operation.fetchAll();
expect(result).to.deep.equal([
{
col_bool: true,
col_int: 1234,
col_double: 3.14,
col_bigint_1: 1234,
col_bigint_2: 1234,
col_timestamp: new Date('2023-09-06T03:14:27.843Z'),
col_str: 'Hello',
},
]);
});
});
66 changes: 66 additions & 0 deletions tests/unit/DBSQLParameter.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
const { expect } = require('chai');

const Int64 = require('node-int64');
const { TSparkParameterValue, TSparkParameter } = require('../../thrift/TCLIService_types');
const { default: DBSQLParameter } = require('../../dist/DBSQLParameter');

describe('DBSQLParameter', () => {
it('should infer types correctly', () => {
const cases = [
[false, new TSparkParameter({ type: 'BOOLEAN', value: new TSparkParameterValue({ stringValue: 'FALSE' }) })],
[true, new TSparkParameter({ type: 'BOOLEAN', value: new TSparkParameterValue({ stringValue: 'TRUE' }) })],
[123, new TSparkParameter({ type: 'INTEGER', value: new TSparkParameterValue({ stringValue: '123' }) })],
[3.14, new TSparkParameter({ type: 'DOUBLE', value: new TSparkParameterValue({ stringValue: '3.14' }) })],
[BigInt(1234), new TSparkParameter({ type: 'BIGINT', value: new TSparkParameterValue({ stringValue: '1234' }) })],
[
new Int64(1234),
new TSparkParameter({ type: 'BIGINT', value: new TSparkParameterValue({ stringValue: '1234' }) }),
],
[
new Date('2023-09-06T03:14:27.843Z'),
new TSparkParameter({
type: 'TIMESTAMP',
value: new TSparkParameterValue({ stringValue: '2023-09-06T03:14:27.843Z' }),
}),
],
['Hello', new TSparkParameter({ type: 'STRING', value: new TSparkParameterValue({ stringValue: 'Hello' }) })],
];

for (const [value, expectedParam] of cases) {
const dbsqlParam = new DBSQLParameter({ value });
expect(dbsqlParam.toSparkParameter()).to.deep.equal(expectedParam);
}
});

it('should use provided type', () => {
const expectedType = '_CUSTOM_TYPE_'; // it doesn't have to be valid type name, just any string

const cases = [
[false, new TSparkParameter({ type: expectedType, value: new TSparkParameterValue({ stringValue: 'FALSE' }) })],
[true, new TSparkParameter({ type: expectedType, value: new TSparkParameterValue({ stringValue: 'TRUE' }) })],
[123, new TSparkParameter({ type: expectedType, value: new TSparkParameterValue({ stringValue: '123' }) })],
[3.14, new TSparkParameter({ type: expectedType, value: new TSparkParameterValue({ stringValue: '3.14' }) })],
[
BigInt(1234),
new TSparkParameter({ type: expectedType, value: new TSparkParameterValue({ stringValue: '1234' }) }),
],
[
new Int64(1234),
new TSparkParameter({ type: expectedType, value: new TSparkParameterValue({ stringValue: '1234' }) }),
],
[
new Date('2023-09-06T03:14:27.843Z'),
new TSparkParameter({
type: expectedType,
value: new TSparkParameterValue({ stringValue: '2023-09-06T03:14:27.843Z' }),
}),
],
['Hello', new TSparkParameter({ type: expectedType, value: new TSparkParameterValue({ stringValue: 'Hello' }) })],
];

for (const [value, expectedParam] of cases) {
const dbsqlParam = new DBSQLParameter({ type: expectedType, value });
expect(dbsqlParam.toSparkParameter()).to.deep.equal(expectedParam);
}
});
});

0 comments on commit ad10743

Please sign in to comment.