diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index f61097d8..26ba3a24 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -1,6 +1,6 @@ +import { EventEmitter } from 'events'; import thrift, { HttpHeaders } from 'thrift'; -import { EventEmitter } from 'events'; import TCLIService from '../thrift/TCLIService'; import { TProtocolVersion } from '../thrift/TCLIService_types'; import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient'; diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 82b8cdf1..df4673ac 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -1,4 +1,8 @@ +import * as fs from 'fs' +import * as path from 'path' +import { assert } from 'console'; import { stringify, NIL, parse } from 'uuid'; +import axios from 'axios'; import { TSessionHandle, TStatus, @@ -27,6 +31,9 @@ import InfoValue from './dto/InfoValue'; import { definedOrError } from './utils'; import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger'; import globalConfig from './globalConfig'; +import StagingError from './errors/StagingError'; + + const defaultMaxRows = 100000; @@ -78,7 +85,7 @@ export default class DBSQLSession implements IDBSQLSession { private readonly sessionHandle: TSessionHandle; private readonly logger: IDBSQLLogger; - + constructor(driver: HiveDriver, sessionHandle: TSessionHandle, logger: IDBSQLLogger) { this.driver = driver; this.sessionHandle = sessionHandle; @@ -126,10 +133,90 @@ export default class DBSQLSession implements IDBSQLSession { ...getDirectResultsOptions(options.maxRows), ...getArrowOptions(), }); + const operation = this.createOperation(response); + if(options.stagingAllowedLocalPath) { + type StagingResponse = { + presignedUrl: string + localFile: string + headers: object + operation: string + } + const result = await operation.fetchAll() + assert(result.length === 1) + const row = result[0] as StagingResponse + + let allowOperation = false + for(const filepath of options.stagingAllowedLocalPath){ + const relativePath = path.relative(filepath,row.localFile) + + if(!relativePath.startsWith('..') && !path.isAbsolute(relativePath)){ + allowOperation = true + } + + } + if(!allowOperation) { + throw new StagingError("Staging path not a subset of allowed local paths.") + } + + const handlerArgs = { + "presigned_url": row.presignedUrl, + "local_file": row.localFile, + "headers": row.headers, + } + switch(row.operation) { + case "GET": + await this.handleStagingGet(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers) + break; + case "PUT": + await this.handleStagingPut(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers) + break; + case "REMOVE": + await this.handleStagingRemove(handlerArgs.local_file, handlerArgs.presigned_url, handlerArgs.headers) + break; + default: + throw new StagingError("Staging query operation is not supported."); + + } + } + return operation + } - return this.createOperation(response); + public async handleStagingGet(local_file: string, presigned_url: string, headers: object) { + const response = await axios.get(presigned_url,{headers}) + const buffer = Buffer.from(response.data.body.data) + if(response.statusText === 'OK'){ + fs.writeFileSync(local_file,buffer) + } + } + + public async handleStagingRemove(local_file: string, presigned_url: string, headers: object) { + const response = await axios.delete(presigned_url,{headers}) + const respJson = await response.data + if(!respJson.ok){ + // Throw + } + } + + public async handleStagingPut(local_file: string, presigned_url: string, headers: object) { + const data = fs.readFileSync(local_file) + + const response = await axios.put(presigned_url,{body: data, headers}) + + if(response.statusText === 'OK'){ + fs.writeFileSync(local_file,response.data) + } } + /** + * Executes statement + * @public + * @param statement - SQL statement to be executed + * @param options - maxRows field is used to specify Direct Results + * @returns DBSQLOperation + * @example + * const operation = await session.executeStatement(query, { runAsync: true }); + */ + /** * Information about supported data types * @public diff --git a/lib/connection/connections/HttpConnection.ts b/lib/connection/connections/HttpConnection.ts index ec665ed5..0cc19155 100644 --- a/lib/connection/connections/HttpConnection.ts +++ b/lib/connection/connections/HttpConnection.ts @@ -1,6 +1,6 @@ -import thrift from 'thrift'; import https from 'https'; import http, { IncomingMessage } from 'http'; +import thrift from 'thrift'; import IThriftConnection from '../contracts/IThriftConnection'; import IConnectionProvider from '../contracts/IConnectionProvider'; diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index 4dcdad1c..6af1048d 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -7,6 +7,7 @@ export type ExecuteStatementOptions = { queryTimeout?: Int64; runAsync?: boolean; maxRows?: number | null; + stagingAllowedLocalPath?: string[] }; export type TypeInfoRequest = { @@ -92,6 +93,7 @@ export default interface IDBSQLSession { */ executeStatement(statement: string, options?: ExecuteStatementOptions): Promise; + /** * Information about supported data types * diff --git a/lib/errors/StagingError.ts b/lib/errors/StagingError.ts new file mode 100644 index 00000000..3f08f3a3 --- /dev/null +++ b/lib/errors/StagingError.ts @@ -0,0 +1 @@ +export default class StagingError extends Error {} diff --git a/package.json b/package.json index 0c7949b1..20695db5 100644 --- a/package.json +++ b/package.json @@ -72,6 +72,7 @@ }, "dependencies": { "apache-arrow": "^10.0.1", + "axios": "^1.4.0", "commander": "^9.3.0", "node-int64": "^0.4.0", "open": "^8.4.2", diff --git a/tests/e2e/staging_ingestion.test.js b/tests/e2e/staging_ingestion.test.js new file mode 100644 index 00000000..3259ff47 --- /dev/null +++ b/tests/e2e/staging_ingestion.test.js @@ -0,0 +1,88 @@ +const { expect } = require('chai'); +const config = require('./utils/config'); +const logger = require('./utils/logger')(config.logger); +const { DBSQLClient } = require('../..'); +const fs = require('fs') +const globalConfig = require('../../dist/globalConfig').default; + + +describe('Staging Test', () => { +it("put staging data and receive it", async () => { + const client = new DBSQLClient(); + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }) + let temp_path = "tests/e2e/staging/data" + fs.writeFileSync(temp_path,data="Hello World!") + + let session = await client.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + let result = fs.readFileSync('tests/e2e/staging/file') + expect(result.toString() === "Hello World!").to.be.true + }) + + it("put staging data and receive it", async () => { + const client = new DBSQLClient(); + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }) + let temp_path = "tests/e2e/staging/data" + fs.writeFileSync(temp_path,data="Hello World!") + + let session = await client.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + let result = fs.readFileSync('tests/e2e/staging/file') + expect(result.toString() === "Hello World!").to.be.true + }) + + it("put staging data and remove it", async () => { + const client = new DBSQLClient(); + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }) + let temp_path = "tests/e2e/staging/data" + fs.writeFileSync(temp_path,data="Hello World!") + + let session = await client.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + }) + + it("delete non-existent data", async () => { + const client = new DBSQLClient(); + await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }) + let temp_path = "tests/e2e/staging/data" + fs.writeFileSync(temp_path,data="Hello World!") + + let session = await client.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); + await session.executeStatement(`PUT '${temp_path}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + await session.executeStatement(`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,{stagingAllowedLocalPath: ["tests/e2e/staging"]}) + let result = fs.readFileSync('tests/e2e/staging/file') + expect(result.toString() === "Hello World!").to.be.true + }) +}) + diff --git a/tests/e2e/utils/log.txt b/tests/e2e/utils/log.txt new file mode 100644 index 00000000..88c30cf6 --- /dev/null +++ b/tests/e2e/utils/log.txt @@ -0,0 +1,7 @@ +{"level":"info","message":"Created DBSQLClient"} +{"level":"info","message":"Created DBSQLClient"} +{"level":"debug","message":"Operation created with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"} +{"level":"debug","message":"Fetched chunk of size: 100000 from operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"} +{"level":"debug","message":"Fetched all data from operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"} +{"level":"debug","message":"Closing operation with id: 01ed9c52-dd9c-1fc5-bae6-34882fc0c082"} +{"level":"debug","message":"Session closed with id: 01ed9c52-ccf2-1c5d-9bf8-1144872eb5fb"}