Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-964] Initial Staging Ingestion implementation #159

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import thrift, { HttpHeaders } from 'thrift';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
Expand Down
91 changes: 89 additions & 2 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import * as fs from 'fs'
import * as path from 'path'
import { assert } from 'console';
import { stringify, NIL, parse } from 'uuid';
import axios from 'axios';
import {
TSessionHandle,
TStatus,
Expand Down Expand Up @@ -27,6 +31,9 @@ import InfoValue from './dto/InfoValue';
import { definedOrError } from './utils';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import globalConfig from './globalConfig';
import StagingError from './errors/StagingError';



const defaultMaxRows = 100000;

Expand Down Expand Up @@ -78,7 +85,7 @@ export default class DBSQLSession implements IDBSQLSession {
private readonly sessionHandle: TSessionHandle;

private readonly logger: IDBSQLLogger;

constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger) {
this.driver = driver;
this.sessionHandle = sessionHandle;
Expand Down Expand Up @@ -126,10 +133,90 @@ export default class DBSQLSession implements IDBSQLSession {
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
});
const operation = this.createOperation(response);
if(options.stagingAllowedLocalPath) {
type StagingResponse = {
presignedUrl: string
localFile: string
headers: object
operation: string
}
const result = await operation.fetchAll()
assert(result.length === 1)
const row = result[0] as StagingResponse

let allowOperation = false
for(const filepath of options.stagingAllowedLocalPath){
const relativePath = path.relative(filepath,row.localFile)

if(!relativePath.startsWith('..') && !path.isAbsolute(relativePath)){
allowOperation = true
}

}
if(!allowOperation) {
throw new StagingError("Staging path not a subset of allowed local paths.")
}

const handlerArgs = {
"presigned_url": row.presignedUrl,
"local_file": row.localFile,
"headers": row.headers,
}
switch(row.operation) {
case "GET":
await this.handleStagingGet(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers)
break;
case "PUT":
await this.handleStagingPut(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers)
break;
case "REMOVE":
await this.handleStagingRemove(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers)
break;
default:
throw new StagingError("Staging query operation is not supported.");

}
}
return operation
}

return this.createOperation(response);
public async handleStagingGet(local_file: string, presigned_url: string, headers: object) {
const response = await axios.get(presigned_url,{headers})
const buffer = Buffer.from(response.data.body.data)
if(response.statusText === 'OK'){
fs.writeFileSync(local_file,buffer)
}
}

public async handleStagingRemove(local_file: string, presigned_url: string, headers: object) {
const response = await axios.delete(presigned_url,{headers})
const respJson = await response.data
if(!respJson.ok){
// Throw
}
}

public async handleStagingPut(local_file: string, presigned_url: string, headers: object) {
const data = fs.readFileSync(local_file)

const response = await axios.put(presigned_url,{body: data, headers})

if(response.statusText === 'OK'){
fs.writeFileSync(local_file,response.data)
}
}

/**
* Executes statement
* @public
* @param statement - SQL statement to be executed
* @param options - maxRows field is used to specify Direct Results
* @returns DBSQLOperation
* @example
* const operation = await session.executeStatement(query, { runAsync: true });
*/

/**
* Information about supported data types
* @public
Expand Down
2 changes: 1 addition & 1 deletion lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import thrift from 'thrift';
import https from 'https';
import http, { IncomingMessage } from 'http';
import thrift from 'thrift';

import IThriftConnection from '../contracts/IThriftConnection';
import IConnectionProvider from '../contracts/IConnectionProvider';
Expand Down
2 changes: 2 additions & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type ExecuteStatementOptions = {
queryTimeout?: Int64;
runAsync?: boolean;
maxRows?: number | null;
stagingAllowedLocalPath?: string[]
};

export type TypeInfoRequest = {
Expand Down Expand Up @@ -92,6 +93,7 @@ export default interface IDBSQLSession {
*/
executeStatement(statement: string, options?: ExecuteStatementOptions): Promise<IOperation>;


/**
* Information about supported data types
*
Expand Down
1 change: 1 addition & 0 deletions lib/errors/StagingError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export default class StagingError extends Error {}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
},
"dependencies": {
"apache-arrow": "^10.0.1",
"axios": "^1.4.0",
"commander": "^9.3.0",
"node-int64": "^0.4.0",
"open": "^8.4.2",
Expand Down
88 changes: 88 additions & 0 deletions tests/e2e/staging_ingestion.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
const { expect } = require('chai');
const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
const fs = require('fs')
const globalConfig = require('../../dist/globalConfig').default;


describe('Staging Test', () => {
it("put staging data and receive it", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
let result = fs.readFileSync('tests/e2e/staging/file')
expect(result.toString() === "Hello World!").to.be.true
})

it("put staging data and receive it", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
let result = fs.readFileSync('tests/e2e/staging/file')
expect(result.toString() === "Hello World!").to.be.true
})

it("put staging data and remove it", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
})

it("delete non-existent data", async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
})
let temp_path = "tests/e2e/staging/data"
fs.writeFileSync(temp_path,data="Hello World!")

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]})
let result = fs.readFileSync('tests/e2e/staging/file')
expect(result.toString() === "Hello World!").to.be.true
})
})

7 changes: 7 additions & 0 deletions tests/e2e/utils/log.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"level":"info","message":"Created DBSQLClient"}
kravets-levko marked this conversation as resolved.
Show resolved Hide resolved
{"level":"info","message":"Created DBSQLClient"}
{"level":"debug","message":"Operation created with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Fetched chunk of size: 100000 from operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Fetched all data from operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Closing operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"}
{"level":"debug","message":"Session closed with id: 01ed9c52-ccf2-1c5d-9bf8-1144872eb5fb"}