Skip to content

Commit

Permalink
feat(ingest): sql parser perf + asyncio fixes (#9119)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 31, 2023
1 parent 94d438d commit ea12732
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 13 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==18.5.2.dev45",
"acryl-sqlglot==18.17.1.dev16",
}

sql_common = (
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pathlib
import platform
import signal
import subprocess
import sys
import tempfile
Expand Down Expand Up @@ -770,6 +771,10 @@ def quickstart( # noqa: C901
logger.debug("docker compose up still running, sending SIGKILL")
up_process.kill()
up_process.wait()
else:
# If the docker process got a keyboard interrupt, raise one here.
if up_process.returncode in {128 + signal.SIGINT, -signal.SIGINT}:
raise KeyboardInterrupt

# Check docker health every few seconds.
status = check_docker_quickstart()
Expand Down
12 changes: 4 additions & 8 deletions metadata-ingestion/src/datahub/upgrade/upgrade.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import contextlib
import functools
import logging
import sys
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -374,17 +373,14 @@ def check_upgrade(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def async_wrapper(*args: Any, **kwargs: Any) -> Any:
async def run_inner_func():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, functools.partial(func, *args, **kwargs)
)
return func(*args, **kwargs)

async def run_func_check_upgrade():
version_stats_future = asyncio.ensure_future(retrieve_version_stats())
the_one_future = asyncio.ensure_future(run_inner_func())
ret = await the_one_future
main_func_future = asyncio.ensure_future(run_inner_func())
ret = await main_func_future

# the one future has returned
# the main future has returned
# we check the other futures quickly
try:
version_stats = await asyncio.wait_for(version_stats_future, 0.5)
Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def get_query_type_of_sql(expression: sqlglot.exp.Expression) -> QueryType:
sqlglot.exp.Update: QueryType.UPDATE,
sqlglot.exp.Delete: QueryType.DELETE,
sqlglot.exp.Merge: QueryType.MERGE,
sqlglot.exp.Subqueryable: QueryType.SELECT, # unions, etc. are also selects
}

for cls, query_type in mapping.items():
Expand Down Expand Up @@ -820,10 +821,8 @@ def _extract_select_from_update(
)

# Update statements always implicitly have the updated table in context.
# TODO: Retain table name alias.
# TODO: Retain table name alias, if one was present.
if select_statement.args.get("from"):
# select_statement = sqlglot.parse_one(select_statement.sql(dialect=dialect))

select_statement = select_statement.join(
statement.this, append=True, join_kind="cross"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"query_type": "UNKNOWN",
"query_type": "SELECT",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf10.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf100.orders,PROD)"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"query_type": "SELECT",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)"
],
"out_tables": [],
"column_lineage": [
{
"downstream": {
"table": null,
"column": "col1",
"column_type": null,
"native_column_type": null
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)",
"column": "col1"
},
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)",
"column": "col1"
}
]
},
{
"downstream": {
"table": null,
"column": "col2",
"column_type": null,
"native_column_type": null
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)",
"column": "col2"
},
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)",
"column": "col2"
}
]
}
]
}
14 changes: 14 additions & 0 deletions metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,20 @@ def test_teradata_default_normalization():
)


def test_teradata_strange_operators():
assert_sql_result(
"""
select col1, col2 from dbc.table1
where col1 eq 'value1'
minus
select col1, col2 from dbc.table2
""",
dialect="teradata",
default_schema="dbc",
expected_file=RESOURCE_DIR / "test_teradata_strange_operators.json",
)


def test_snowflake_update_hardcoded():
assert_sql_result(
"""
Expand Down

0 comments on commit ea12732

Please sign in to comment.