Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-930] Add parameterized query support. #162

Merged
merged 11 commits into from
Sep 6, 2023
2 changes: 1 addition & 1 deletion lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import thrift, { HttpHeaders } from 'thrift';

import { EventEmitter } from 'events';
nithinkdb marked this conversation as resolved.
Show resolved Hide resolved
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
Expand Down
2 changes: 2 additions & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import CloseableCollection from './utils/CloseableCollection';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import HiveDriverError from './errors/HiveDriverError';
import globalConfig from './globalConfig';
import convertToSparkParameters from './utils/ParameterConverter';

const defaultMaxRows = 100000;

Expand Down Expand Up @@ -140,6 +141,7 @@ export default class DBSQLSession implements IDBSQLSession {
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
canDownloadResult: options.useCloudFetch ?? globalConfig.useCloudFetch,
parameters: options.parameters ? convertToSparkParameters(options.parameters) : undefined,
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand Down
2 changes: 1 addition & 1 deletion lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import thrift from 'thrift';
import https from 'https';
import http, { IncomingMessage } from 'http';
import thrift from 'thrift';

kravets-levko marked this conversation as resolved.
Show resolved Hide resolved
import IThriftConnection from '../contracts/IThriftConnection';
import IConnectionProvider from '../contracts/IConnectionProvider';
Expand Down
1 change: 1 addition & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export type ExecuteStatementOptions = {
runAsync?: boolean;
maxRows?: number | null;
useCloudFetch?: boolean;
parameters?: object;
nithinkdb marked this conversation as resolved.
Show resolved Hide resolved
};

export type TypeInfoRequest = {
Expand Down
38 changes: 38 additions & 0 deletions lib/utils/ParameterConverter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { TSparkParameter, TSparkParameterValue } from '../../thrift/TCLIService_types';
import HiveDriverError from '../errors/HiveDriverError';

function getTypeAndValue(value: any): [string, TSparkParameterValue] {
switch (typeof value) {
case 'object':
if (value === null) {
return ['VOID', new TSparkParameterValue()];
}

throw new HiveDriverError('Unsupported object type used for parameterized query.');

case 'boolean':
return ['BOOLEAN', new TSparkParameterValue({ booleanValue: value })];
case 'number':
if (Number.isInteger(value)) {
return ['INT', new TSparkParameterValue({ doubleValue: value })];
}

return ['DOUBLE', new TSparkParameterValue({ doubleValue: value })];

case 'string':
return ['STRING', new TSparkParameterValue({ stringValue: value })];
default:
throw new HiveDriverError('Unsupported object type used for parameterized query.');
}
}

export default function convertToSparkParameters(params: object): TSparkParameter[] {
nithinkdb marked this conversation as resolved.
Show resolved Hide resolved
const sparkValueParams = [];
for (const e of Object.entries(params)) {
const key = e[0];
const value = e[1];
const typeValueTuple = getTypeAndValue(value);
sparkValueParams.push(new TSparkParameter({ name: key, type: typeValueTuple[0], value: typeValueTuple[1] }));
}
return sparkValueParams;
}
53 changes: 53 additions & 0 deletions tests/e2e/parameterized_query.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const { expect } = require('chai');
const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);

const DBSQLClient = require('../../dist/DBSQLClient').default;
const convertToSparkParameters = require('../../dist/utils/ParameterConverter').default;
const { TSparkParameterValue, TSparkParameter } = require('../../thrift/TCLIService_types');
const globalConfig = require('../../dist/globalConfig').default;

describe('Parameterized query converter unit test', () => {
expect(convertToSparkParameters({ key: null })[0]).to.deep.eq(
new TSparkParameter({ name: 'key', type: 'VOID', value: new TSparkParameterValue() }),
);
expect(convertToSparkParameters({ key: 'value' })[0]).to.deep.eq(
new TSparkParameter({ name: 'key', type: 'STRING', value: new TSparkParameterValue({ stringValue: 'value' }) }),
);
expect(convertToSparkParameters({ key: 1 })[0]).to.deep.eq(
new TSparkParameter({ name: 'key', type: 'INT', value: new TSparkParameterValue({ doubleValue: 1 }) }),
);
expect(convertToSparkParameters({ key: 1.1 })[0]).to.deep.eq(
new TSparkParameter({ name: 'key', type: 'DOUBLE', value: new TSparkParameterValue({ doubleValue: 1.1 }) }),
);
expect(convertToSparkParameters({ key: true })[0]).to.deep.eq(
new TSparkParameter({ name: 'key', type: 'BOOLEAN', value: new TSparkParameterValue({ booleanValue: true }) }),
);
});
nithinkdb marked this conversation as resolved.
Show resolved Hide resolved

const openSession = async () => {
const client = new DBSQLClient();

const connection = await client.connect({
host: config.host,
path: config.path,
token: config.token,
});

return connection.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
};

describe('Parameterized Query', async () => {
it('should use default socket timeout', async () => {
const query = `
select * from default.stock_data where open > {{parameter}}
`;

let session = await openSession();

let result = await session.executeStatement(query, { parameters: 2 });
});
kravets-levko marked this conversation as resolved.
Show resolved Hide resolved
});
Loading