diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index ea25f676..26ba3a24 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -1,6 +1,6 @@ +import { EventEmitter } from 'events'; import thrift, { HttpHeaders } from 'thrift'; -import { EventEmitter } from 'events'; import TCLIService from '../thrift/TCLIService'; import { TProtocolVersion } from '../thrift/TCLIService_types'; import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient'; @@ -52,13 +52,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient { private readonly thrift = thrift; - private stagingAllowedLocalPath: string[] | null - constructor(options?: ClientOptions) { super(); this.logger = options?.logger || new DBSQLLogger(); this.logger.log(LogLevel.info, 'Created DBSQLClient'); - this.stagingAllowedLocalPath = options?.stagingAllowedLocalPath || null } private getConnectionOptions(options: ConnectionOptions, headers: HttpHeaders): IConnectionOptions { @@ -150,7 +147,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient { }); Status.assert(response.status); - return new DBSQLSession(driver, definedOrError(response.sessionHandle), this.logger, this.stagingAllowedLocalPath); + return new DBSQLSession(driver, definedOrError(response.sessionHandle), this.logger); } private async getClient() { diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 0520e09e..df4673ac 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -1,6 +1,8 @@ -import { stringify, NIL, parse } from 'uuid'; import * as fs from 'fs' import * as path from 'path' +import { assert } from 'console'; +import { stringify, NIL, parse } from 'uuid'; +import axios from 'axios'; import { TSessionHandle, TStatus, @@ -29,10 +31,9 @@ import InfoValue from './dto/InfoValue'; import { definedOrError } from './utils'; import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger'; import globalConfig from './globalConfig'; -import { assert } from 'console'; -import {Axios} from 'axios'; +import StagingError from './errors/StagingError'; + -const axios = new Axios() const defaultMaxRows = 100000; @@ -85,14 +86,11 @@ export default class DBSQLSession implements IDBSQLSession { private readonly logger: IDBSQLLogger; - private readonly stagingAllowedLocalPath: string[] | null - - constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger, stagingAllowedLocalPath: string[] | null ) { + constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger) { this.driver = driver; this.sessionHandle = sessionHandle; this.logger = logger; this.logger.log(LogLevel.debug, `Session created with id: ${this.getId()}`); - this.stagingAllowedLocalPath = stagingAllowedLocalPath } public getId() { @@ -135,48 +133,21 @@ export default class DBSQLSession implements IDBSQLSession { ...getDirectResultsOptions(options.maxRows), ...getArrowOptions(), }); - - return this.createOperation(response); - } - - /** - * Executes staging statement - * @public - * @param statement - SQL statement to be executed - * @param options - maxRows field is used to specify Direct Results - * @returns DBSQLOperation - * @example - * const operation = await session.executeStatement(query, { runAsync: true }); - */ - public async executeStagingStatement(statement: string, options: ExecuteStatementOptions = {}): Promise{ - if(this.stagingAllowedLocalPath == null){ - // Add error message. - return - } - const response = await this.driver.executeStatement({ - sessionHandle: this.sessionHandle, - statement, - queryTimeout: options.queryTimeout, - runAsync: options.runAsync || false, - ...getDirectResultsOptions(options.maxRows), - ...getArrowOptions(), - }); + const operation = this.createOperation(response); + if(options.stagingAllowedLocalPath) { type StagingResponse = { presignedUrl: string localFile: string headers: object operation: string } - - - let operation = this.createOperation(response); - let result = await operation.fetchAll() - assert(result.length == 1) - let row = result[0] as StagingResponse + const result = await operation.fetchAll() + assert(result.length === 1) + const row = result[0] as StagingResponse let allowOperation = false - for(let filepath of this.stagingAllowedLocalPath){ - let relativePath = path.relative(filepath,row.localFile) + for(const filepath of options.stagingAllowedLocalPath){ + const relativePath = path.relative(filepath,row.localFile) if(!relativePath.startsWith('..') && !path.isAbsolute(relativePath)){ allowOperation = true @@ -184,48 +155,55 @@ export default class DBSQLSession implements IDBSQLSession { } if(!allowOperation) { - return + throw new StagingError("Staging path not a subset of allowed local paths.") } - - let handler_args = { + + const handlerArgs = { "presigned_url": row.presignedUrl, "local_file": row.localFile, "headers": row.headers, } switch(row.operation) { case "GET": - await this.handleStagingGet(handler_args.local_file, handler_args.presigned_url, handler_args.headers) + await this.handleStagingGet(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers) + break; case "PUT": - await this.handleStagingPut(handler_args.local_file, handler_args.presigned_url, handler_args.headers) + await this.handleStagingPut(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers) + break; case "REMOVE": - await this.handleStagingRemove(handler_args.local_file, handler_args.presigned_url, handler_args.headers) - - } - - + await this.handleStagingRemove(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers) + break; + default: + throw new StagingError("Staging query operation is not supported."); + } } + return operation + } + public async handleStagingGet(local_file: string, presigned_url: string, headers: object) { - let response = await axios.get(presigned_url,{headers: headers}) - let respJson = await response.data - if(respJson['ok']){ - fs.writeFileSync(local_file,respJson['content']) + const response = await axios.get(presigned_url,{headers}) + const buffer = Buffer.from(response.data.body.data) + if(response.statusText === 'OK'){ + fs.writeFileSync(local_file,buffer) } } + public async handleStagingRemove(local_file: string, presigned_url: string, headers: object) { - let response = await axios.delete(presigned_url,{headers: headers}) - let respJson = await response.data - if(!respJson['ok']){ + const response = await axios.delete(presigned_url,{headers}) + const respJson = await response.data + if(!respJson.ok){ // Throw } } + public async handleStagingPut(local_file: string, presigned_url: string, headers: object) { - let data = fs.readFileSync(local_file) + const data = fs.readFileSync(local_file) + + const response = await axios.put(presigned_url,{body: data, headers}) - let response = await axios.put(presigned_url,{body: data, headers: headers}) - let respJson = await response.data - if(respJson['ok']){ - fs.writeFileSync(local_file,respJson['content']) + if(response.statusText === 'OK'){ + fs.writeFileSync(local_file,response.data) } } diff --git a/lib/connection/connections/HttpConnection.ts b/lib/connection/connections/HttpConnection.ts index ec665ed5..0cc19155 100644 --- a/lib/connection/connections/HttpConnection.ts +++ b/lib/connection/connections/HttpConnection.ts @@ -1,6 +1,6 @@ -import thrift from 'thrift'; import https from 'https'; import http, { IncomingMessage } from 'http'; +import thrift from 'thrift'; import IThriftConnection from '../contracts/IThriftConnection'; import IConnectionProvider from '../contracts/IConnectionProvider'; diff --git a/lib/contracts/IDBSQLClient.ts b/lib/contracts/IDBSQLClient.ts index 4ae9d1c3..a5367299 100644 --- a/lib/contracts/IDBSQLClient.ts +++ b/lib/contracts/IDBSQLClient.ts @@ -5,7 +5,6 @@ import OAuthPersistence from '../connection/auth/DatabricksOAuth/OAuthPersistenc export interface ClientOptions { logger?: IDBSQLLogger; - stagingAllowedLocalPath?: string[] } type AuthOptions = diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index f1f500c9..6af1048d 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -7,6 +7,7 @@ export type ExecuteStatementOptions = { queryTimeout?: Int64; runAsync?: boolean; maxRows?: number | null; + stagingAllowedLocalPath?: string[] }; export type TypeInfoRequest = { @@ -92,14 +93,6 @@ export default interface IDBSQLSession { */ executeStatement(statement: string, options?: ExecuteStatementOptions): Promise; - /** - * Executes staging statements - * - * @param statement DDL/DML statement - * @param options - */ - executeStagingStatement(statement: string, options?: ExecuteStatementOptions): Promise; - /** * Information about supported data types diff --git a/lib/errors/StagingError.ts b/lib/errors/StagingError.ts new file mode 100644 index 00000000..3f08f3a3 --- /dev/null +++ b/lib/errors/StagingError.ts @@ -0,0 +1 @@ +export default class StagingError extends Error {} diff --git a/tests/e2e/staging/data b/tests/e2e/staging/data deleted file mode 100644 index c57eff55..00000000 --- a/tests/e2e/staging/data +++ /dev/null @@ -1 +0,0 @@ -Hello World! \ No newline at end of file diff --git a/tests/e2e/staging_ingestion.test.js b/tests/e2e/staging_ingestion.test.js index e85ac546..3259ff47 100644 --- a/tests/e2e/staging_ingestion.test.js +++ b/tests/e2e/staging_ingestion.test.js @@ -6,25 +6,83 @@ const fs = require('fs') const globalConfig = require('../../dist/globalConfig').default; -const client = new DBSQLClient({stagingAllowedLocalPath: ["tests/e2e/staging"]}); - -client.connect({ +describe('Staging Test', () => { +it("put staging data and receive it", async () => { + const client = new DBSQLClient(); + await client.connect({ host: config.host, path: config.path, token: config.token, - }).then(async (client) => { + }) let temp_path = "tests/e2e/staging/data" fs.writeFileSync(temp_path,data="Hello World!") - const connection = await client.connect({ - host: config.host, - path: config.path, - token: config.token, - }); let session = await client.openSession({ initialCatalog: config.database[0], initialSchema: config.database[1], }); - let result = await session.executeStagingStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`) - } - ) + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + let result = fs.readFileSync('tests/e2e/staging/file') + expect(result.toString() === "Hello World!").to.be.true + }) + + it("put staging data and receive it", async () => { + const client = new DBSQLClient(); + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }) + let temp_path = "tests/e2e/staging/data" + fs.writeFileSync(temp_path,data="Hello World!") + + let session = await client.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + let result = fs.readFileSync('tests/e2e/staging/file') + expect(result.toString() === "Hello World!").to.be.true + }) + + it("put staging data and remove it", async () => { + const client = new DBSQLClient(); + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }) + let temp_path = "tests/e2e/staging/data" + fs.writeFileSync(temp_path,data="Hello World!") + + let session = await client.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + }) + + it("delete non-existent data", async () => { + const client = new DBSQLClient(); + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }) + let temp_path = "tests/e2e/staging/data" + fs.writeFileSync(temp_path,data="Hello World!") + + let session = await client.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + let result = fs.readFileSync('tests/e2e/staging/file') + expect(result.toString() === "Hello World!").to.be.true + }) +}) +