Skip to content

Commit

Permalink
Got test working except delete (on current dbr version)
Browse files Browse the repository at this point in the history
Signed-off-by: nithinkdb <[email protected]>
  • Loading branch information
nithinkdb committed Aug 21, 2023
1 parent 59fcbe7 commit 0d8d22c
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 92 deletions.
7 changes: 2 additions & 5 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import thrift, { HttpHeaders } from 'thrift';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
Expand Down Expand Up @@ -52,13 +52,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {

private readonly thrift = thrift;

private stagingAllowedLocalPath: string[] | null

constructor(options?: ClientOptions) {
super();
this.logger = options?.logger || new DBSQLLogger();
this.logger.log(LogLevel.info, 'Created DBSQLClient');
this.stagingAllowedLocalPath = options?.stagingAllowedLocalPath || null
}

private getConnectionOptions(options: ConnectionOptions, headers: HttpHeaders): IConnectionOptions {
Expand Down Expand Up @@ -150,7 +147,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
});

Status.assert(response.status);
return new DBSQLSession(driver, definedOrError(response.sessionHandle), this.logger, this.stagingAllowedLocalPath);
return new DBSQLSession(driver, definedOrError(response.sessionHandle), this.logger);
}

private async getClient() {
Expand Down
106 changes: 42 additions & 64 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { stringify, NIL, parse } from 'uuid';
import * as fs from 'fs'
import * as path from 'path'
import { assert } from 'console';
import { stringify, NIL, parse } from 'uuid';
import axios from 'axios';
import {
TSessionHandle,
TStatus,
Expand Down Expand Up @@ -29,10 +31,9 @@ import InfoValue from './dto/InfoValue';
import { definedOrError } from './utils';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import globalConfig from './globalConfig';
import { assert } from 'console';
import {Axios} from 'axios';
import StagingError from './errors/StagingError';


const axios = new Axios()

const defaultMaxRows = 100000;

Expand Down Expand Up @@ -85,14 +86,11 @@ export default class DBSQLSession implements IDBSQLSession {

private readonly logger: IDBSQLLogger;

private readonly stagingAllowedLocalPath: string[] | null

constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger, stagingAllowedLocalPath: string[] | null ) {
constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger) {
this.driver = driver;
this.sessionHandle = sessionHandle;
this.logger = logger;
this.logger.log(LogLevel.debug, `Session created with id: ${this.getId()}`);
this.stagingAllowedLocalPath = stagingAllowedLocalPath
}

public getId() {
Expand Down Expand Up @@ -135,97 +133,77 @@ export default class DBSQLSession implements IDBSQLSession {
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
});

return this.createOperation(response);
}

/**
* Executes staging statement
* @public
* @param statement - SQL statement to be executed
* @param options - maxRows field is used to specify Direct Results
* @returns DBSQLOperation
* @example
* const operation = await session.executeStatement(query, { runAsync: true });
*/
public async executeStagingStatement(statement: string, options: ExecuteStatementOptions = {}): Promise<void>{
if(this.stagingAllowedLocalPath == null){
// Add error message.
return
}
const response = await this.driver.executeStatement({
sessionHandle: this.sessionHandle,
statement,
queryTimeout: options.queryTimeout,
runAsync: options.runAsync || false,
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
});
const operation = this.createOperation(response);
if(options.stagingAllowedLocalPath) {
type StagingResponse = {
presignedUrl: string
localFile: string
headers: object
operation: string
}


let operation = this.createOperation(response);
let result = await operation.fetchAll()
assert(result.length == 1)
let row = result[0] as StagingResponse
const result = await operation.fetchAll()
assert(result.length === 1)
const row = result[0] as StagingResponse

let allowOperation = false
for(let filepath of this.stagingAllowedLocalPath){
let relativePath = path.relative(filepath,row.localFile)
for(const filepath of options.stagingAllowedLocalPath){
const relativePath = path.relative(filepath,row.localFile)

if(!relativePath.startsWith('..') && !path.isAbsolute(relativePath)){
allowOperation = true
}

}
if(!allowOperation) {
return
throw new StagingError("Staging path not a subset of allowed local paths.")
}

let handler_args = {
const handlerArgs = {
"presigned_url": row.presignedUrl,
"local_file": row.localFile,
"headers": row.headers,
}
switch(row.operation) {
case "GET":
await this.handleStagingGet(handler_args.local_file, handler_args.presigned_url, handler_args.headers)
await this.handleStagingGet(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers)
break;
case "PUT":
await this.handleStagingPut(handler_args.local_file, handler_args.presigned_url, handler_args.headers)
await this.handleStagingPut(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers)
break;
case "REMOVE":
await this.handleStagingRemove(handler_args.local_file, handler_args.presigned_url, handler_args.headers)

}


await this.handleStagingRemove(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers)
break;
default:
throw new StagingError("Staging query operation is not supported.");

}
}
return operation
}

public async handleStagingGet(local_file: string, presigned_url: string, headers: object) {
let response = await axios.get(presigned_url,{headers: headers})
let respJson = await response.data
if(respJson['ok']){
fs.writeFileSync(local_file,respJson['content'])
const response = await axios.get(presigned_url,{headers})
const buffer = Buffer.from(response.data.body.data)
if(response.statusText === 'OK'){
fs.writeFileSync(local_file,buffer)
}
}

public async handleStagingRemove(local_file: string, presigned_url: string, headers: object) {
let response = await axios.delete(presigned_url,{headers: headers})
let respJson = await response.data
if(!respJson['ok']){
const response = await axios.delete(presigned_url,{headers})
const respJson = await response.data
if(!respJson.ok){
// Throw
}
}

public async handleStagingPut(local_file: string, presigned_url: string, headers: object) {
let data = fs.readFileSync(local_file)
const data = fs.readFileSync(local_file)

const response = await axios.put(presigned_url,{body: data, headers})

let response = await axios.put(presigned_url,{body: data, headers: headers})
let respJson = await response.data
if(respJson['ok']){
fs.writeFileSync(local_file,respJson['content'])
if(response.statusText === 'OK'){
fs.writeFileSync(local_file,response.data)
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import thrift from 'thrift';
import https from 'https';
import http, { IncomingMessage } from 'http';
import thrift from 'thrift';

import IThriftConnection from '../contracts/IThriftConnection';
import IConnectionProvider from '../contracts/IConnectionProvider';
Expand Down
1 change: 0 additions & 1 deletion lib/contracts/IDBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import OAuthPersistence from '../connection/auth/DatabricksOAuth/OAuthPersistenc

export interface ClientOptions {
logger?: IDBSQLLogger;
stagingAllowedLocalPath?: string[]
}

type AuthOptions =
Expand Down
9 changes: 1 addition & 8 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type ExecuteStatementOptions = {
queryTimeout?: Int64;
runAsync?: boolean;
maxRows?: number | null;
stagingAllowedLocalPath?: string[]
};

export type TypeInfoRequest = {
Expand Down Expand Up @@ -92,14 +93,6 @@ export default interface IDBSQLSession {
*/
executeStatement(statement: string, options?: ExecuteStatementOptions): Promise<IOperation>;

/**
* Executes staging statements
*
* @param statement DDL/DML statement
* @param options
*/
executeStagingStatement(statement: string, options?: ExecuteStatementOptions): Promise<void>;


/**
* Information about supported data types
Expand Down
1 change: 1 addition & 0 deletions lib/errors/StagingError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export default class StagingError extends Error {}
1 change: 0 additions & 1 deletion tests/e2e/staging/data

This file was deleted.

82 changes: 70 additions & 12 deletions tests/e2e/staging_ingestion.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,83 @@ const fs = require('fs')
const globalConfig = require('../../dist/globalConfig').default;


const client = new DBSQLClient({stagingAllowedLocalPath: ["tests/e2e/staging"]});

client.connect({
describe('Staging Test', () => {
it("put staging data and receive it", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
}).then(async (client) => {
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")
const connection = await client.connect({
host: config.host,
path: config.path,
token: config.token,
});

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
let result = await session.executeStagingStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`)
}
)
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
let result = fs.readFileSync('tests/e2e/staging/file')
expect(result.toString() === "Hello World!").to.be.true
})

it("put staging data and receive it", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
let result = fs.readFileSync('tests/e2e/staging/file')
expect(result.toString() === "Hello World!").to.be.true
})

it("put staging data and remove it", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
})

it("delete non-existent data", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
let result = fs.readFileSync('tests/e2e/staging/file')
expect(result.toString() === "Hello World!").to.be.true
})
})

0 comments on commit 0d8d22c

Please sign in to comment.