From 4dc7cc54a2b63dc80055e5ab306cce7ba5ea0ed9 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 16 Sep 2024 14:44:02 -0300 Subject: [PATCH 1/2] fix: upgrade SQLAlchemy so that it recognizes oracle dialect --- apps/api/jupyter-requirements.txt | 5 +++-- apps/api/jupyter.Dockerfile | 7 +++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/api/jupyter-requirements.txt b/apps/api/jupyter-requirements.txt index a5ad9f12..31fe2682 100644 --- a/apps/api/jupyter-requirements.txt +++ b/apps/api/jupyter-requirements.txt @@ -18,7 +18,7 @@ vegafusion-python-embed==1.5.0 vl-convert-python==1.2.0 tiktoken==0.5.2 polars==0.19.19 -SQLAlchemy==1.4.50 +SQLAlchemy==2.0.34 google-api-core==2.15.0 google-api-python-client==1.6.7 google-api-support==0.1.4 @@ -46,7 +46,8 @@ db-dtypes==1.2.0 fastparquet==2024.2.0 oracledb==2.2.0 redshift-connector==2.0.917 -sqlalchemy-redshift==0.8.14 +# sqlalchemy-redshift==0.8.14 +git+https://github.com/briefercloud/sqlalchemy-redshift.git#egg=sqlalchemy-redshift trino==0.329.0 duckdb==1.0.0 openpyxl==3.1.2 diff --git a/apps/api/jupyter.Dockerfile b/apps/api/jupyter.Dockerfile index 95329fcd..80f33e3c 100644 --- a/apps/api/jupyter.Dockerfile +++ b/apps/api/jupyter.Dockerfile @@ -2,10 +2,6 @@ FROM python:3.9-slim WORKDIR /usr/src/app -ARG JUPYTER_REQUIREMENTS_FILE=jupyter-requirements.txt - -COPY $JUPYTER_REQUIREMENTS_FILE ./requirements.txt - RUN apt-get update && \ apt-get install -y \ libpq-dev \ @@ -27,6 +23,9 @@ RUN apt-get update && \ ENV CPLUS_INCLUDE_PATH=/usr/include/gdal ENV C_INCLUDE_PATH=/usr/include/gdal +ARG JUPYTER_REQUIREMENTS_FILE=jupyter-requirements.txt +COPY $JUPYTER_REQUIREMENTS_FILE ./requirements.txt + RUN pip install --upgrade pip RUN pip install --no-cache-dir jupyter_server RUN pip install --no-cache-dir ipykernel From 99cd78c5321d92d40d5a72b00a656a4595e156ce Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 16 Sep 2024 15:48:15 -0300 Subject: [PATCH 2/2] feat: implement schema querying for oracledb --- apps/api/package.json | 2 - apps/api/src/datasources/oracle.ts | 86 +++-------- apps/api/src/datasources/structure.ts | 3 +- apps/api/src/python/query/oracle.ts | 30 +++- apps/api/src/python/query/psql.ts | 175 ++-------------------- apps/api/src/python/query/sqlalchemy.ts | 187 +++++++++++++++++++++++- yarn.lock | 12 -- 7 files changed, 240 insertions(+), 255 deletions(-) diff --git a/apps/api/package.json b/apps/api/package.json index 983f3cfd..bdfbc690 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -49,7 +49,6 @@ "lru-cache": "^10.2.2", "mysql2": "^3.10.1", "node-fetch": "^3.3.2", - "oracledb": "^6.5.1", "p-all": "^5.0.0", "p-queue": "^8.0.1", "parse-duration": "^1.1.0", @@ -85,7 +84,6 @@ "@types/multer": "^1.4.10", "@types/node": "^20.9.0", "@types/node-fetch": "^2.6.9", - "@types/oracledb": "^6.5.0", "@types/ramda": "^0.29.9", "@types/semver": "^7.5.8", "@types/split2": "^4.2.3", diff --git a/apps/api/src/datasources/oracle.ts b/apps/api/src/datasources/oracle.ts index 18cdda0a..bb2567dc 100644 --- a/apps/api/src/datasources/oracle.ts +++ b/apps/api/src/datasources/oracle.ts @@ -1,82 +1,30 @@ import config from '../config/index.js' -import prisma, { - DataSource, - OracleDataSource, - getOraclePassword, -} from '@briefer/database' -import oracle from 'oracledb' -import { logger } from '../logger.js' +import prisma, { DataSource, OracleDataSource } from '@briefer/database' import { DataSourceStatus } from './index.js' -import { DataSourceConnectionError } from '@briefer/types' - -async function getConnectionAttributes(ds: OracleDataSource) { - const password = await getOraclePassword( - ds, - config().DATASOURCES_ENCRYPTION_KEY - ) - - let connectData = '' - if (ds.serviceName) { - connectData += `(service_name=${ds.serviceName})` - } - if (ds.sid) { - connectData += `(sid=${ds.sid})` - } - - if (connectData === '' && ds.database) { - return { - user: ds.username, - password, - connectString: `${ds.host}:${ds.port}/${ds.database}`, - } - } - - if (connectData !== '') { - connectData = `(connect_data=${connectData})` - } - - return { - user: ds.username, - password, - connectString: `(description=(retry_count=3)(retry_delay=3)(address=(protocol=tcps)(port=${ds.port})(host=${ds.host}))${connectData}(security=(ssl_server_dn_match=yes)))`, - } -} +import { getOracleSchema, pingOracle } from '../python/query/oracle.js' +import { logger } from '../logger.js' +import { DataSourceStructure } from '@briefer/types' -export async function ping(datasource: OracleDataSource): Promise { +export async function ping(ds: OracleDataSource): Promise { const lastConnection = new Date() + const err = await pingOracle(ds, config().DATASOURCES_ENCRYPTION_KEY) - try { - const attrs = await getConnectionAttributes(datasource) - const connection = await oracle.getConnection(attrs) - - await connection.ping() + logger.error({ err }, 'ping error') - return updateConnStatus(datasource, { + if (!err) { + return updateConnStatus(ds, { connStatus: 'online', lastConnection, }) - } catch (err) { - logger.info({ err, id: datasource.id }, 'Error pinging Oracle') - const parsedErr = DataSourceConnectionError.safeParse(err) - if (!parsedErr.success) { - logger.error( - { err, id: datasource.id }, - 'Error parsing Oracle connection error' - ) - return updateConnStatus(datasource, { - connStatus: 'offline', - connError: { - name: 'UnknownError', - message: 'Unknown error', - }, - }) - } - - return updateConnStatus(datasource, { - connStatus: 'offline', - connError: parsedErr.data, - }) } + + return updateConnStatus(ds, { connStatus: 'offline', connError: err }) +} + +export async function getSchema( + ds: OracleDataSource +): Promise { + return getOracleSchema(ds, config().DATASOURCES_ENCRYPTION_KEY) } export async function updateConnStatus( diff --git a/apps/api/src/datasources/structure.ts b/apps/api/src/datasources/structure.ts index d14b609a..3a244c82 100644 --- a/apps/api/src/datasources/structure.ts +++ b/apps/api/src/datasources/structure.ts @@ -5,6 +5,7 @@ import * as psql from './psql.js' import * as athena from './athena.js' import * as mysql from './mysql.js' import * as trino from './trino.js' +import * as oracle from './oracle.js' import { DataSourceStructure, jsonString } from '@briefer/types' import { logger } from '../logger.js' import { z } from 'zod' @@ -202,7 +203,7 @@ async function fetchStructure( return await trino.getSchema(ds.data) } case 'oracle': { - return null + return await oracle.getSchema(ds.data) } } } catch (err) { diff --git a/apps/api/src/python/query/oracle.ts b/apps/api/src/python/query/oracle.ts index eabce549..c55a01b0 100644 --- a/apps/api/src/python/query/oracle.ts +++ b/apps/api/src/python/query/oracle.ts @@ -1,7 +1,15 @@ import { v4 as uuidv4 } from 'uuid' import { OracleDataSource, getDatabaseURL } from '@briefer/database' -import { RunQueryResult, SuccessRunQueryResult } from '@briefer/types' -import { makeSQLAlchemyQuery } from './sqlalchemy.js' +import { + DataSourceStructure, + RunQueryResult, + SuccessRunQueryResult, +} from '@briefer/types' +import { + getSQLAlchemySchema, + makeSQLAlchemyQuery, + pingSQLAlchemy, +} from './sqlalchemy.js' export async function makeOracleQuery( workspaceId: string, @@ -32,3 +40,21 @@ export async function makeOracleQuery( onProgress ) } + +export function pingOracle( + ds: OracleDataSource, + encryptionKey: string +): Promise { + return pingSQLAlchemy( + ds.workspaceId, + { type: 'oracle', data: ds }, + encryptionKey + ) +} + +export function getOracleSchema( + ds: OracleDataSource, + encryptionKey: string +): Promise { + return getSQLAlchemySchema({ type: 'oracle', data: ds }, encryptionKey) +} diff --git a/apps/api/src/python/query/psql.ts b/apps/api/src/python/query/psql.ts index 019d85d0..1b870a02 100644 --- a/apps/api/src/python/query/psql.ts +++ b/apps/api/src/python/query/psql.ts @@ -6,15 +6,14 @@ import { } from '@briefer/database' import { DataSourceStructure, - PythonErrorOutput, RunQueryResult, SuccessRunQueryResult, - jsonString, } from '@briefer/types' -import { makeSQLAlchemyQuery } from './sqlalchemy.js' -import { PythonExecutionError, executeCode } from '../index.js' -import { logger } from '../../logger.js' -import { z } from 'zod' +import { + getSQLAlchemySchema, + makeSQLAlchemyQuery, + pingSQLAlchemy, +} from './sqlalchemy.js' export async function makePSQLQuery( workspaceId: string, @@ -47,174 +46,18 @@ export async function makePSQLQuery( ) } -export async function pingPSQL( +export function pingPSQL( ds: PostgreSQLDataSource | RedshiftDataSource, type: 'psql' | 'redshift', encryptionKey: string ): Promise { - const databaseUrl = await getDatabaseURL({ type, data: ds }, encryptionKey) - - const code = `from sqlalchemy import create_engine -from sqlalchemy.sql.expression import text - -engine = create_engine(${JSON.stringify(databaseUrl)}) -connection = engine.connect() - -connection.execute(text("SELECT 1")).fetchall()` - - let pythonError: PythonErrorOutput | null = null - return executeCode( - ds.workspaceId, - `ping-psql-${ds.id}`, - code, - (outputs) => { - for (const output of outputs) { - if (output.type === 'error') { - pythonError = output - } - } - }, - { storeHistory: false } - ) - .then(({ promise }) => promise) - .then(() => { - if (!pythonError) { - return null - } - - return new PythonExecutionError( - pythonError.type, - pythonError.ename, - pythonError.evalue, - [] - ) - }) + return pingSQLAlchemy(ds.workspaceId, { type, data: ds }, encryptionKey) } -export async function getPSQLSchema( +export function getPSQLSchema( ds: PostgreSQLDataSource | RedshiftDataSource, type: 'psql' | 'redshift', encryptionKey: string ): Promise { - const databaseUrl = await getDatabaseURL({ type, data: ds }, encryptionKey) - - const code = ` -import json -from sqlalchemy import create_engine -from sqlalchemy import inspect - - -def get_data_source_structure(data_source_id): - engine = create_engine(f"${databaseUrl}") - schemas = {} - inspector = inspect(engine) - for schema_name in inspector.get_schema_names(): - print(json.dumps({"log": f"Getting tables for schema {schema_name}"})) - tables = {} - for table_name in inspector.get_table_names(schema=schema_name): - print(json.dumps({"log": f"Getting schema for table {table_name}"})) - columns = [] - for column in inspector.get_columns(table_name, schema=schema_name): - columns.append({ - "name": column["name"], - "type": str(column["type"]) - }) - tables[table_name] = { - "columns": columns - } - schemas[schema_name] = { - "tables": tables - } - - data_source_structure = { - "dataSourceId": data_source_id, - "schemas": schemas, - "defaultSchema": "public" - } - - return data_source_structure - - -structure = get_data_source_structure("${ds.id}") -print(json.dumps(structure, default=str))` - - let pythonError: PythonErrorOutput | null = null - let structure: DataSourceStructure | null = null - return executeCode( - ds.workspaceId, - `schema-psql-${ds.id}`, - code, - (outputs) => { - for (const output of outputs) { - if (output.type === 'stdio' && output.name === 'stdout') { - const lines = output.text.split('\n') - for (const line of lines) { - if (line === '') { - continue - } - - const parsedStructure = jsonString - .pipe( - z.union([DataSourceStructure, z.object({ log: z.string() })]) - ) - .safeParse(line) - if (parsedStructure.success) { - if ('log' in parsedStructure.data) { - logger.trace( - { - workspaceId: ds.workspaceId, - datasourceId: ds.id, - }, - parsedStructure.data.log - ) - } else { - structure = parsedStructure.data - } - } else { - logger.error( - { - workspaceId: ds.workspaceId, - datasourceId: ds.id, - err: parsedStructure.error, - line, - }, - 'Failed to parse line from PSQL schema output' - ) - } - } - } else if (output.type === 'error') { - pythonError = output - } else { - logger.error( - { - workspaceId: ds.workspaceId, - datasourceId: ds.id, - output, - }, - 'Unexpected output type from PSQL schema query' - ) - } - } - }, - { storeHistory: false } - ) - .then(({ promise }) => promise) - .then(() => { - if (structure) { - return structure - } - - if (pythonError) { - throw new PythonExecutionError( - pythonError.type, - pythonError.ename, - pythonError.evalue, - [] - ) - } - - throw new Error( - `Failed to get schema for datasource ${ds.id}. Got no output.` - ) - }) + return getSQLAlchemySchema({ type, data: ds }, encryptionKey) } diff --git a/apps/api/src/python/query/sqlalchemy.ts b/apps/api/src/python/query/sqlalchemy.ts index e9d2536f..10fb0efa 100644 --- a/apps/api/src/python/query/sqlalchemy.ts +++ b/apps/api/src/python/query/sqlalchemy.ts @@ -1,6 +1,15 @@ -import { RunQueryResult, SuccessRunQueryResult } from '@briefer/types' +import { + DataSourceStructure, + jsonString, + PythonErrorOutput, + RunQueryResult, + SuccessRunQueryResult, +} from '@briefer/types' import { makeQuery } from './index.js' -import { renderJinja } from '../index.js' +import { executeCode, PythonExecutionError, renderJinja } from '../index.js' +import { DataSource, getDatabaseURL } from '@briefer/database' +import { z } from 'zod' +import { logger } from '../../logger.js' export async function makeSQLAlchemyQuery( workspaceId: string, @@ -37,7 +46,7 @@ def briefer_make_sqlalchemy_query(): from psycopg2.errors import QueryCanceled import time - print(json.dumps({"type": "log", "message": "Starting PSQL query"})) + print(json.dumps({"type": "log", "message": "Starting SQLAlchemy query"})) def rename_duplicates(df): """Renames duplicate columns in a DataFrame by appending a suffix.""" @@ -224,3 +233,175 @@ briefer_make_sqlalchemy_query()` onProgress ) } + +export async function pingSQLAlchemy( + workspaceId: string, + ds: DataSource, + encryptionKey: string +): Promise { + const databaseUrl = await getDatabaseURL(ds, encryptionKey) + const query = ds.type === 'oracle' ? 'SELECT 1 FROM DUAL' : 'SELECT 1' + + const code = `from sqlalchemy import create_engine +from sqlalchemy.sql.expression import text + +engine = create_engine(${JSON.stringify(databaseUrl)}) +connection = engine.connect() + +connection.execute(text(${JSON.stringify(query)})).fetchall()` + + let pythonError: PythonErrorOutput | null = null + return executeCode( + workspaceId, + `ping-${ds.type}-${ds.data.id}`, + code, + (outputs) => { + for (const output of outputs) { + if (output.type === 'error') { + pythonError = output + } + } + }, + { storeHistory: false } + ) + .then(({ promise }) => promise) + .then(() => { + if (!pythonError) { + return null + } + + return new PythonExecutionError( + pythonError.type, + pythonError.ename, + pythonError.evalue, + [] + ) + }) +} + +export async function getSQLAlchemySchema( + ds: DataSource, + encryptionKey: string +): Promise { + const databaseUrl = await getDatabaseURL(ds, encryptionKey) + + const code = ` +import json +from sqlalchemy import create_engine +from sqlalchemy import inspect + + +def get_data_source_structure(data_source_id): + engine = create_engine(f"${databaseUrl}") + schemas = {} + inspector = inspect(engine) + for schema_name in inspector.get_schema_names(): + print(json.dumps({"log": f"Getting tables for schema {schema_name}"})) + tables = {} + for table_name in inspector.get_table_names(schema=schema_name): + print(json.dumps({"log": f"Getting schema for table {table_name}"})) + columns = [] + for column in inspector.get_columns(table_name, schema=schema_name): + columns.append({ + "name": column["name"], + "type": str(column["type"]) + }) + tables[table_name] = { + "columns": columns + } + schemas[schema_name] = { + "tables": tables + } + + data_source_structure = { + "dataSourceId": data_source_id, + "schemas": schemas, + "defaultSchema": "public" + } + + return data_source_structure + + +structure = get_data_source_structure("${ds.data.id}") +print(json.dumps(structure, default=str))` + + let pythonError: PythonErrorOutput | null = null + let structure: DataSourceStructure | null = null + return executeCode( + ds.data.workspaceId, + `schema-${ds.type}-${ds.data.id}`, + code, + (outputs) => { + for (const output of outputs) { + if (output.type === 'stdio' && output.name === 'stdout') { + const lines = output.text.split('\n') + for (const line of lines) { + if (line === '') { + continue + } + + const parsedStructure = jsonString + .pipe( + z.union([DataSourceStructure, z.object({ log: z.string() })]) + ) + .safeParse(line) + if (parsedStructure.success) { + if ('log' in parsedStructure.data) { + logger.trace( + { + workspaceId: ds.data.workspaceId, + datasourceId: ds.data.id, + }, + parsedStructure.data.log + ) + } else { + structure = parsedStructure.data + } + } else { + logger.error( + { + workspaceId: ds.data.workspaceId, + datasourceId: ds.data.id, + err: parsedStructure.error, + line, + }, + 'Failed to parse line from SQLAlchemy schema output' + ) + } + } + } else if (output.type === 'error') { + pythonError = output + } else { + logger.warn( + { + workspaceId: ds.data.workspaceId, + datasourceId: ds.data.id, + output, + }, + 'Unexpected output type from SQLAlchemy schema query' + ) + } + } + }, + { storeHistory: false } + ) + .then(({ promise }) => promise) + .then(() => { + if (structure) { + return structure + } + + if (pythonError) { + throw new PythonExecutionError( + pythonError.type, + pythonError.ename, + pythonError.evalue, + [] + ) + } + + throw new Error( + `Failed to get schema for datasource ${ds.data.id}. Got no output.` + ) + }) +} diff --git a/yarn.lock b/yarn.lock index 8a532316..643847b6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3185,13 +3185,6 @@ dependencies: undici-types "~6.19.2" -"@types/oracledb@^6.5.0": - version "6.5.1" - resolved "https://registry.yarnpkg.com/@types/oracledb/-/oracledb-6.5.1.tgz#17d021cabc9d216dfa6d3d65ae3ee585c33baab3" - integrity sha512-Ll0bKGXmCZVngBL3juSaytA8Jeocx0VghDHTt+FEC2bs8fdl9pzoaBXYWXjBUxCCT8Y/69m5AzuTgBd79j24WA== - dependencies: - "@types/node" "*" - "@types/papaparse@^5.3.10": version "5.3.14" resolved "https://registry.yarnpkg.com/@types/papaparse/-/papaparse-5.3.14.tgz#345cc2a675a90106ff1dc33b95500dfb30748031" @@ -9504,11 +9497,6 @@ optionator@^0.9.3: type-check "^0.4.0" word-wrap "^1.2.5" -oracledb@^6.5.1: - version "6.6.0" - resolved "https://registry.yarnpkg.com/oracledb/-/oracledb-6.6.0.tgz#bb40adbe81a84a1e544c48af9f120c61f030e936" - integrity sha512-T3dx+o3j+tVN53wQyr4yGTmoPHLy+a2V8yb1T2PmWrsj3ZlSt2Yu1BgV2yTDqnmBZYpRi/I3yJXRCOHHD7PiyA== - orderedmap@^2.0.0: version "2.1.1" resolved "https://registry.yarnpkg.com/orderedmap/-/orderedmap-2.1.1.tgz#61481269c44031c449915497bf5a4ad273c512d2"