Skip to content

Commit

Permalink
Merge branch 'main' into infer-missing-arrow-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
kravets-levko authored Mar 5, 2024
2 parents f26d3f2 + ff9fc0d commit 947a2b0
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ jobs:
E2E_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }}
E2E_ACCESS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
E2E_TABLE_SUFFIX: ${{github.sha}}
E2E_CATALOG: peco
E2E_SCHEMA: default
E2E_VOLUME: e2etests
cache-name: cache-node-modules
NYC_REPORT_DIR: coverage_e2e

Expand Down
4 changes: 4 additions & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ export default class DBSQLSession implements IDBSQLSession {
const agent = await connectionProvider.getAgent();

const response = await fetch(presignedUrl, { method: 'DELETE', headers, agent });
// Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files
// AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200
// Azure, on the other hand, is somewhat stricter and check if file exists before deleting it. And if
// file doesn't exist - Azure returns HTTP 404
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
Expand Down
1 change: 0 additions & 1 deletion tests/e2e/staging/.gitignore

This file was deleted.

134 changes: 91 additions & 43 deletions tests/e2e/staging_ingestion.test.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,61 @@
const { expect } = require('chai');
const fs = require('fs');
const path = require('path');
const os = require('os');
const uuid = require('uuid');
const config = require('./utils/config');
const { DBSQLClient } = require('../..');
const fs = require('fs');
const StagingError = require('../../dist/errors/StagingError').default;

describe('Staging Test', () => {
const catalog = config.database[0];
const schema = config.database[1];
const volume = config.volume;

const localPath = fs.mkdtempSync(path.join(os.tmpdir(), 'databricks-sql-tests-'));

before(() => {
expect(catalog).to.not.be.undefined;
expect(schema).to.not.be.undefined;
expect(volume).to.not.be.undefined;
});

after(() => {
fs.rmSync(localPath, {
recursive: true,
force: true,
});
});

// TODO: Temporarily disable those tests until we figure out issues with E2E test env
describe.skip('Staging Test', () => {
it('put staging data and receive it', async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
});
let tempPath = 'tests/e2e/staging/data';
fs.writeFileSync(tempPath, 'Hello World!');

const session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
initialCatalog: catalog,
initialSchema: schema,
});

const expectedData = 'Hello World!';
const stagingFileName = `/Volumes/${catalog}/${schema}/${volume}/${uuid.v4()}.csv`;
const localFile = path.join(localPath, `${uuid.v4()}.csv`);

fs.writeFileSync(localFile, expectedData);
await session.executeStatement(`PUT '${localFile}' INTO '${stagingFileName}' OVERWRITE`, {
stagingAllowedLocalPath: [localPath],
});
fs.rmSync(localFile);

await session.executeStatement(`GET '${stagingFileName}' TO '${localFile}'`, {
stagingAllowedLocalPath: [localPath],
});
await session.executeStatement(
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
await session.executeStatement(
`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
let result = fs.readFileSync('tests/e2e/staging/file');
expect(result.toString() === 'Hello World!').to.be.true;
const result = fs.readFileSync(localFile);
fs.rmSync(localFile);
expect(result.toString() === expectedData).to.be.true;
});

it('put staging data and remove it', async () => {
Expand All @@ -38,20 +65,39 @@ describe.skip('Staging Test', () => {
path: config.path,
token: config.token,
});
let tempPath = 'tests/e2e/staging/data';
fs.writeFileSync(tempPath, (data = 'Hello World!'));

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
const session = await client.openSession({
initialCatalog: catalog,
initialSchema: schema,
});
await session.executeStatement(
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
await session.executeStatement(`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv'`, {
stagingAllowedLocalPath: ['tests/e2e/staging'],

const expectedData = 'Hello World!';
const stagingFileName = `/Volumes/${catalog}/${schema}/${volume}/${uuid.v4()}.csv`;
const localFile = path.join(localPath, `${uuid.v4()}.csv`);

fs.writeFileSync(localFile, expectedData);
await session.executeStatement(`PUT '${localFile}' INTO '${stagingFileName}' OVERWRITE`, {
stagingAllowedLocalPath: [localPath],
});
fs.rmSync(localFile);

await session.executeStatement(`REMOVE '${stagingFileName}'`, { stagingAllowedLocalPath: [localPath] });

try {
await session.executeStatement(`GET '${stagingFileName}' TO '${localFile}'`, {
stagingAllowedLocalPath: [localPath],
});
expect.fail('It should throw HTTP 404 error');
} catch (error) {
if (error instanceof StagingError) {
// File should not exist after deleting
expect(error.message).to.contain('404');
} else {
throw error;
}
} finally {
fs.rmSync(localFile, { force: true });
}
});

it('delete non-existent data', async () => {
Expand All @@ -61,22 +107,24 @@ describe.skip('Staging Test', () => {
path: config.path,
token: config.token,
});
let tempPath = 'tests/e2e/staging/data';
fs.writeFileSync(tempPath, (data = 'Hello World!'));

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
const session = await client.openSession({
initialCatalog: catalog,
initialSchema: schema,
});
await session.executeStatement(
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
await session.executeStatement(
`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
let result = fs.readFileSync('tests/e2e/staging/file');
expect(result.toString() === 'Hello World!').to.be.true;

const stagingFileName = `/Volumes/${catalog}/${schema}/${volume}/${uuid.v4()}.csv`;

try {
await session.executeStatement(`REMOVE '${stagingFileName}'`, { stagingAllowedLocalPath: [localPath] });
// In some cases, `REMOVE` may silently succeed for non-existing files (see comment in relevant
// part of `DBSQLSession` code). But if it fails - it has to be an HTTP 404 error
} catch (error) {
if (error instanceof StagingError) {
expect(error.message).to.contain('404');
} else {
throw error;
}
}
});
});
6 changes: 4 additions & 2 deletions tests/e2e/utils/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ try {
} catch (e) {}

const catalog = process.env.E2E_CATALOG || undefined;
const database = process.env.E2E_DATABASE || undefined;
const schema = process.env.E2E_SCHEMA || undefined;

// Create file named `config.local.js` in the same directory and override config there
module.exports = {
Expand All @@ -17,7 +17,9 @@ module.exports = {
// Access token: dapi********************************
token: process.env.E2E_ACCESS_TOKEN,
// Catalog and database to use for testing; specify both or leave array empty to use defaults
database: catalog || database ? [catalog, database] : [],
database: catalog || schema ? [catalog, schema] : [],
// Volume to use for testing
volume: process.env.E2E_VOLUME,
// Suffix used for tables that will be created during tests
tableSuffix: process.env.E2E_TABLE_SUFFIX,
...overrides,
Expand Down

0 comments on commit 947a2b0

Please sign in to comment.