Skip to content

Commit

Permalink
[PECO-348] Enable direct results by default (#70)
Browse files Browse the repository at this point in the history
* [PECO-348] Enable direct results by default

Signed-off-by: Levko Kravets <[email protected]>

* Add tests

Signed-off-by: Levko Kravets <[email protected]>

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Oct 25, 2022
1 parent 36d1e8f commit 4186df2
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 26 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
- `DBSQLClient.openSession` now uses the latest protocol version by default
- Direct results feature is now available for all IOperation methods which support it. To enable direct results feature,
`maxRows` option should be used
- Direct results became enabled by default. If `maxRows` is omitted - it will default to `100000`. To disable direct
results, set `maxRows` to `null`
- `FunctionNameRequest` type renamed to `FunctionsRequest`
- `IDBSQLConnectionOptions` type renamed to `ConnectionOptions`
- `IFetchOptions` renamed to `FetchOptions`
Expand Down
4 changes: 3 additions & 1 deletion lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { stringify, NIL, parse } from 'uuid';
import IOperation, { FetchOptions, GetSchemaOptions, FinishedOptions, defaultMaxRows } from '../contracts/IOperation';
import IOperation, { FetchOptions, GetSchemaOptions, FinishedOptions } from '../contracts/IOperation';
import HiveDriver from '../hive/HiveDriver';
import {
TGetOperationStatusResp,
Expand All @@ -16,6 +16,8 @@ import FetchResultsHelper from './FetchResultsHelper';
import CompleteOperationHelper from './CompleteOperationHelper';
import IDBSQLLogger, { LogLevel } from '../contracts/IDBSQLLogger';

const defaultMaxRows = 100000;

export default class DBSQLOperation implements IOperation {
private driver: HiveDriver;

Expand Down
6 changes: 4 additions & 2 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import InfoValue from './dto/InfoValue';
import { definedOrError } from './utils';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';

const defaultMaxRows = 100000;

interface OperationResponseShape {
status: TStatus;
operationHandle?: TOperationHandle;
directResults?: TSparkDirectResults;
}

function getDirectResultsOptions(maxRows?: number) {
if (!maxRows) {
function getDirectResultsOptions(maxRows: number | null = defaultMaxRows) {
if (maxRows === null) {
return {};
}

Expand Down
20 changes: 10 additions & 10 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ import { Int64 } from '../hive/Types';
export type ExecuteStatementOptions = {
queryTimeout?: Int64;
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type TypeInfoRequest = {
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type CatalogsRequest = {
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type SchemasRequest = {
catalogName?: string;
schemaName?: string;
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type TablesRequest = {
Expand All @@ -32,12 +32,12 @@ export type TablesRequest = {
tableName?: string;
tableTypes?: Array<string>;
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type TableTypesRequest = {
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type ColumnsRequest = {
Expand All @@ -46,23 +46,23 @@ export type ColumnsRequest = {
tableName?: string;
columnName?: string;
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type FunctionsRequest = {
catalogName?: string;
schemaName?: string;
functionName: string;
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type PrimaryKeysRequest = {
catalogName?: string;
schemaName: string;
tableName: string;
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export type CrossReferenceRequest = {
Expand All @@ -73,7 +73,7 @@ export type CrossReferenceRequest = {
foreignSchemaName: string;
foreignTableName: string;
runAsync?: boolean;
maxRows?: number;
maxRows?: number | null;
};

export default interface IDBSQLSession {
Expand Down
2 changes: 0 additions & 2 deletions lib/contracts/IOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ export interface GetSchemaOptions extends WaitUntilReadyOptions {
// no other options
}

export const defaultMaxRows = 100000;

export default interface IOperation {
/**
* Fetch a portion of data
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/batched_fetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ describe('Data fetching', () => {

it('fetch chunks should return a max row set of chunkSize', async () => {
const session = await openSession();
const operation = await session.executeStatement(query, { runAsync: true });
const operation = await session.executeStatement(query, { runAsync: true, maxRows: null });
let chunkedOp = await operation.fetchChunk({ maxRows: 10 }).catch((error) => logger(error));
expect(chunkedOp.length).to.be.equal(10);
});

it('fetch all should fetch all records', async () => {
const session = await openSession();
const operation = await session.executeStatement(query, { runAsync: true });
const operation = await session.executeStatement(query, { runAsync: true, maxRows: null });
let all = await operation.fetchAll();
expect(all.length).to.be.equal(1000);
});
Expand Down
90 changes: 81 additions & 9 deletions tests/unit/DBSQLSession.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ describe('DBSQLSession', () => {
const result = await session.executeStatement('SELECT * FROM table', { maxRows: 10 });
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.executeStatement('SELECT * FROM table', { maxRows: null });
expect(result).instanceOf(DBSQLOperation);
});
});

describe('getTypeInfo', () => {
Expand All @@ -77,6 +83,12 @@ describe('DBSQLSession', () => {
const result = await session.getTypeInfo({ maxRows: 10 });
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getTypeInfo({ maxRows: null });
expect(result).instanceOf(DBSQLOperation);
});
});

describe('getCatalogs', () => {
Expand All @@ -91,6 +103,12 @@ describe('DBSQLSession', () => {
const result = await session.getCatalogs({ maxRows: 10 });
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getCatalogs({ maxRows: null });
expect(result).instanceOf(DBSQLOperation);
});
});

describe('getSchemas', () => {
Expand All @@ -111,9 +129,13 @@ describe('DBSQLSession', () => {

it('should use direct results', async () => {
const session = createSession();
const result = await session.getSchemas({
maxRows: 10,
});
const result = await session.getSchemas({ maxRows: 10 });
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getSchemas({ maxRows: null });
expect(result).instanceOf(DBSQLOperation);
});
});
Expand All @@ -138,9 +160,13 @@ describe('DBSQLSession', () => {

it('should use direct results', async () => {
const session = createSession();
const result = await session.getTables({
maxRows: 10,
});
const result = await session.getTables({ maxRows: 10 });
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getTables({ maxRows: null });
expect(result).instanceOf(DBSQLOperation);
});
});
Expand All @@ -157,6 +183,12 @@ describe('DBSQLSession', () => {
const result = await session.getTableTypes({ maxRows: 10 });
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getTableTypes({ maxRows: null });
expect(result).instanceOf(DBSQLOperation);
});
});

describe('getColumns', () => {
Expand All @@ -179,9 +211,13 @@ describe('DBSQLSession', () => {

it('should use direct results', async () => {
const session = createSession();
const result = await session.getColumns({
maxRows: 10,
});
const result = await session.getColumns({ maxRows: 10 });
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getColumns({ maxRows: null });
expect(result).instanceOf(DBSQLOperation);
});
});
Expand All @@ -207,6 +243,17 @@ describe('DBSQLSession', () => {
});
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getFunctions({
catalogName: 'catalog',
schemaName: 'schema',
functionName: 'avg',
maxRows: null,
});
expect(result).instanceOf(DBSQLOperation);
});
});

describe('getPrimaryKeys', () => {
Expand All @@ -230,6 +277,17 @@ describe('DBSQLSession', () => {
});
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getPrimaryKeys({
catalogName: 'catalog',
schemaName: 'schema',
tableName: 't1',
maxRows: null,
});
expect(result).instanceOf(DBSQLOperation);
});
});

describe('getCrossReference', () => {
Expand Down Expand Up @@ -259,6 +317,20 @@ describe('DBSQLSession', () => {
});
expect(result).instanceOf(DBSQLOperation);
});

it('should disable direct results', async () => {
const session = createSession();
const result = await session.getCrossReference({
parentCatalogName: 'parentCatalogName',
parentSchemaName: 'parentSchemaName',
parentTableName: 'parentTableName',
foreignCatalogName: 'foreignCatalogName',
foreignSchemaName: 'foreignSchemaName',
foreignTableName: 'foreignTableName',
maxRows: null,
});
expect(result).instanceOf(DBSQLOperation);
});
});

describe('getDelegationToken', () => {
Expand Down

0 comments on commit 4186df2

Please sign in to comment.