Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-964] Initial Staging Ingestion implementation #159

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {

private readonly thrift = thrift;

private stagingAllowedLocalPath: string[] | null

constructor(options?: ClientOptions) {
super();
this.logger = options?.logger || new DBSQLLogger();
this.logger.log(LogLevel.info, 'Created DBSQLClient');
this.stagingAllowedLocalPath = options?.stagingAllowedLocalPath || null
}

private getConnectionOptions(options: ConnectionOptions, headers: HttpHeaders): IConnectionOptions {
Expand Down Expand Up @@ -147,7 +150,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
});

Status.assert(response.status);
return new DBSQLSession(driver, definedOrError(response.sessionHandle), this.logger);
return new DBSQLSession(driver, definedOrError(response.sessionHandle), this.logger, this.stagingAllowedLocalPath);
}

private async getClient() {
Expand Down
111 changes: 110 additions & 1 deletion lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { stringify, NIL, parse } from 'uuid';
import * as fs from 'fs'
import * as path from 'path'
import {
TSessionHandle,
TStatus,
Expand Down Expand Up @@ -27,6 +29,10 @@ import InfoValue from './dto/InfoValue';
import { definedOrError } from './utils';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import globalConfig from './globalConfig';
import { assert } from 'console';
import {Axios} from 'axios';

const axios = new Axios()

const defaultMaxRows = 100000;

Expand Down Expand Up @@ -78,12 +84,15 @@ export default class DBSQLSession implements IDBSQLSession {
private readonly sessionHandle: TSessionHandle;

private readonly logger: IDBSQLLogger;

private readonly stagingAllowedLocalPath: string[] | null

constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger) {
constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger, stagingAllowedLocalPath: string[] | null ) {
this.driver = driver;
this.sessionHandle = sessionHandle;
this.logger = logger;
this.logger.log(LogLevel.debug, `Session created with id: ${this.getId()}`);
this.stagingAllowedLocalPath = stagingAllowedLocalPath
}

public getId() {
Expand Down Expand Up @@ -130,6 +139,106 @@ export default class DBSQLSession implements IDBSQLSession {
return this.createOperation(response);
}

/**
* Executes staging statement
* @public
* @param statement - SQL statement to be executed
* @param options - maxRows field is used to specify Direct Results
* @returns DBSQLOperation
* @example
* const operation = await session.executeStatement(query, { runAsync: true });
*/
public async executeStagingStatement(statement: string, options: ExecuteStatementOptions = {}): Promise<void>{
kravets-levko marked this conversation as resolved.
Show resolved Hide resolved
if(this.stagingAllowedLocalPath == null){
// Add error message.
return
}
const response = await this.driver.executeStatement({
sessionHandle: this.sessionHandle,
statement,
queryTimeout: options.queryTimeout,
runAsync: options.runAsync || false,
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
});
type StagingResponse = {
presignedUrl: string
localFile: string
headers: object
operation: string
}


let operation = this.createOperation(response);
let result = await operation.fetchAll()
assert(result.length == 1)
let row = result[0] as StagingResponse

let allowOperation = false
for(let filepath of this.stagingAllowedLocalPath){
let relativePath = path.relative(filepath,row.localFile)

if(!relativePath.startsWith('..') && !path.isAbsolute(relativePath)){
allowOperation = true
}

}
if(!allowOperation) {
return
}

let handler_args = {
"presigned_url": row.presignedUrl,
"local_file": row.localFile,
"headers": row.headers,
}
switch(row.operation) {
case "GET":
await this.handleStagingGet(handler_args.local_file, handler_args.presigned_url, handler_args.headers)
case "PUT":
await this.handleStagingPut(handler_args.local_file, handler_args.presigned_url, handler_args.headers)
case "REMOVE":
await this.handleStagingRemove(handler_args.local_file, handler_args.presigned_url, handler_args.headers)

}



}
public async handleStagingGet(local_file: string, presigned_url: string, headers: object) {
let response = await axios.get(presigned_url,{headers: headers})
let respJson = await response.data
if(respJson['ok']){
fs.writeFileSync(local_file,respJson['content'])
}
}
public async handleStagingRemove(local_file: string, presigned_url: string, headers: object) {
let response = await axios.delete(presigned_url,{headers: headers})
let respJson = await response.data
if(!respJson['ok']){
// Throw
}
}
public async handleStagingPut(local_file: string, presigned_url: string, headers: object) {
let data = fs.readFileSync(local_file)

let response = await axios.put(presigned_url,{body: data, headers: headers})
let respJson = await response.data
if(respJson['ok']){
fs.writeFileSync(local_file,respJson['content'])
}
}

/**
* Executes statement
* @public
* @param statement - SQL statement to be executed
* @param options - maxRows field is used to specify Direct Results
* @returns DBSQLOperation
* @example
* const operation = await session.executeStatement(query, { runAsync: true });
*/

/**
* Information about supported data types
* @public
Expand Down
1 change: 1 addition & 0 deletions lib/contracts/IDBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import OAuthPersistence from '../connection/auth/DatabricksOAuth/OAuthPersistenc

export interface ClientOptions {
logger?: IDBSQLLogger;
stagingAllowedLocalPath?: string[]
}

type AuthOptions =
Expand Down
9 changes: 9 additions & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ export default interface IDBSQLSession {
*/
executeStatement(statement: string, options?: ExecuteStatementOptions): Promise<IOperation>;

/**
* Executes staging statements
*
* @param statement DDL/DML statement
* @param options
*/
executeStagingStatement(statement: string, options?: ExecuteStatementOptions): Promise<void>;


/**
* Information about supported data types
*
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
},
"dependencies": {
"apache-arrow": "^10.0.1",
"axios": "^1.4.0",
"commander": "^9.3.0",
"node-int64": "^0.4.0",
"open": "^8.4.2",
Expand Down
1 change: 1 addition & 0 deletions tests/e2e/staging/data
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello World!
30 changes: 30 additions & 0 deletions tests/e2e/staging_ingestion.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
const { expect } = require('chai');
const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
const fs = require('fs')
const globalConfig = require('../../dist/globalConfig').default;


const client = new DBSQLClient({stagingAllowedLocalPath: ["tests/e2e/staging"]});

client.connect({
host: config.host,
path: config.path,
token: config.token,
}).then(async (client) => {
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")
const connection = await client.connect({
host: config.host,
path: config.path,
token: config.token,
});

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
let result = await session.executeStagingStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`)
}
)
7 changes: 7 additions & 0 deletions tests/e2e/utils/log.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"level":"info","message":"Created DBSQLClient"}
kravets-levko marked this conversation as resolved.
Show resolved Hide resolved
{"level":"info","message":"Created DBSQLClient"}
{"level":"debug","message":"Operation created with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Fetched chunk of size: 100000 from operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Fetched all data from operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Closing operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Session closed with id: 01ed9c52-ccf2-1c5d-9bf8-1144872eb5fb"}
Loading