diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index e7902d165051b..4bc34b7b0d3ce 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -350,8 +350,8 @@ "json": { "timestampMillis": 1717179743558, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -367,8 +367,8 @@ "json": { "timestampMillis": 1717179743932, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", @@ -552,8 +552,8 @@ "json": { "timestampMillis": 1717179743960, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -742,8 +742,8 @@ "json": { "timestampMillis": 1717179748679, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -759,8 +759,8 @@ "json": { "timestampMillis": 1717179749258, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", @@ -875,8 +875,8 @@ "json": { "timestampMillis": 1717179749324, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1161,8 +1161,8 @@ "json": { "timestampMillis": 1717179757397, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -1178,8 +1178,8 @@ "json": { "timestampMillis": 1717179758424, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", @@ -1420,8 +1420,8 @@ "json": { "timestampMillis": 1717179758496, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1483,10 +1483,10 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [ + "inputDatasets": [], + "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], - "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" ], @@ -1555,6 +1555,19 @@ } } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ] + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", @@ -1640,19 +1653,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ] - } - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", @@ -1662,8 +1662,8 @@ "json": { "timestampMillis": 1718733767964, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -1679,8 +1679,8 @@ "json": { "timestampMillis": 1718733768638, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1697,10 +1697,10 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [ + "inputDatasets": [], + "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" ], - "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" ], @@ -1809,19 +1809,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" - ] - } - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", @@ -1843,8 +1830,8 @@ "json": { "timestampMillis": 1718733773354, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -1860,8 +1847,8 @@ "json": { "timestampMillis": 1718733774147, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1870,5 +1857,18 @@ } } } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" + ] + } + } } ] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index a9af068e2e4e9..99bda0e0f2569 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -336,8 +336,8 @@ "json": { "timestampMillis": 1717180072004, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -382,8 +382,8 @@ "json": { "timestampMillis": 1719864194882, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", @@ -435,8 +435,8 @@ "json": { "timestampMillis": 1717180072275, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -641,8 +641,8 @@ "json": { "timestampMillis": 1717180078196, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -722,8 +722,8 @@ "json": { "timestampMillis": 1717180078619, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1000,8 +1000,8 @@ "json": { "timestampMillis": 1717180084642, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -1081,8 +1081,8 @@ "json": { "timestampMillis": 1717180085266, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1186,10 +1186,10 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [ + "inputDatasets": [], + "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], - "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" ], @@ -1287,8 +1287,8 @@ "json": { "timestampMillis": 1717180091148, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -1368,8 +1368,8 @@ "json": { "timestampMillis": 1717180091923, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1499,10 +1499,10 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [ + "inputDatasets": [], + "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" ], - "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" ], @@ -1613,8 +1613,8 @@ "json": { "timestampMillis": 1717180096108, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED", "attempt": 1 @@ -1630,8 +1630,8 @@ "json": { "timestampMillis": 1719864203487, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "actor": "urn:li:corpuser:airflow", "operationType": "CREATE", @@ -1712,8 +1712,8 @@ "json": { "timestampMillis": 1717180096993, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -1727,10 +1727,10 @@ "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", + "aspectName": "dataProcessInstanceOutput", "aspect": { "json": { - "inputs": [ + "outputs": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" ] } @@ -1740,10 +1740,10 @@ "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", + "aspectName": "dataProcessInstanceOutput", "aspect": { "json": { - "inputs": [ + "outputs": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ] } diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 976ff8bcc9b3f..0146343002171 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -189,35 +189,49 @@ def _table_level_lineage( statement: sqlglot.Expression, dialect: sqlglot.Dialect ) -> Tuple[Set[_TableName], Set[_TableName]]: # Generate table-level lineage. - modified = { - _TableName.from_sqlglot_table(expr.this) - for expr in statement.find_all( - sqlglot.exp.Create, - sqlglot.exp.Insert, - sqlglot.exp.Update, - sqlglot.exp.Delete, - sqlglot.exp.Merge, - ) - # In some cases like "MERGE ... then INSERT (col1, col2) VALUES (col1, col2)", - # the `this` on the INSERT part isn't a table. - if isinstance(expr.this, sqlglot.exp.Table) - } | { - # For statements that include a column list, like - # CREATE DDL statements and `INSERT INTO table (col1, col2) SELECT ...` - # the table name is nested inside a Schema object. - _TableName.from_sqlglot_table(expr.this.this) - for expr in statement.find_all( - sqlglot.exp.Create, - sqlglot.exp.Insert, - ) - if isinstance(expr.this, sqlglot.exp.Schema) - and isinstance(expr.this.this, sqlglot.exp.Table) - } + modified = ( + { + _TableName.from_sqlglot_table(expr.this) + for expr in statement.find_all( + sqlglot.exp.Create, + sqlglot.exp.Insert, + sqlglot.exp.Update, + sqlglot.exp.Delete, + sqlglot.exp.Merge, + sqlglot.exp.AlterTable, + ) + # In some cases like "MERGE ... then INSERT (col1, col2) VALUES (col1, col2)", + # the `this` on the INSERT part isn't a table. + if isinstance(expr.this, sqlglot.exp.Table) + } + | { + # For statements that include a column list, like + # CREATE DDL statements and `INSERT INTO table (col1, col2) SELECT ...` + # the table name is nested inside a Schema object. + _TableName.from_sqlglot_table(expr.this.this) + for expr in statement.find_all( + sqlglot.exp.Create, + sqlglot.exp.Insert, + ) + if isinstance(expr.this, sqlglot.exp.Schema) + and isinstance(expr.this.this, sqlglot.exp.Table) + } + | { + # For drop statements, we only want it if a table/view is being dropped. + # Other "kinds" will not have table.name populated. + _TableName.from_sqlglot_table(expr.this) + for expr in ([statement] if isinstance(statement, sqlglot.exp.Drop) else []) + if isinstance(expr.this, sqlglot.exp.Table) + and expr.this.this + and expr.this.name + } + ) tables = ( { _TableName.from_sqlglot_table(table) for table in statement.find_all(sqlglot.exp.Table) + if not isinstance(table.parent, sqlglot.exp.Drop) } # ignore references created in this query - modified diff --git a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py index 39c0dddd31400..72b5f6c5e26e4 100644 --- a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py +++ b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py @@ -15,7 +15,6 @@ logger = logging.getLogger(__name__) -# TODO: Hook this into the standard --update-golden-files mechanism. UPDATE_FILES = os.environ.get("UPDATE_SQLPARSER_FILES", "false").lower() == "true" diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_alter_table_column.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_alter_table_column.json new file mode 100644 index 0000000000000..3c6c9737e8e19 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_alter_table_column.json @@ -0,0 +1,14 @@ +{ + "query_type": "UNKNOWN", + "query_type_props": {}, + "query_fingerprint": "7d04253c3add0194c557942ef9b7485f38e68762d300dad364b9cec8656035b3", + "in_tables": [], + "out_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-bq-project.covid_data.covid_deaths,PROD)" + ], + "column_lineage": null, + "debug_info": { + "confidence": 0.2, + "generalized_statement": "ALTER TABLE `my-bq-project.covid_data.covid_deaths` DROP COLUMN patient_name" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_drop_schema.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_drop_schema.json new file mode 100644 index 0000000000000..2784b8e9543b2 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_drop_schema.json @@ -0,0 +1,12 @@ +{ + "query_type": "UNKNOWN", + "query_type_props": {}, + "query_fingerprint": "4eefab57619a812a94030acce0071857561265945e79d798563adb53bd0b9646", + "in_tables": [], + "out_tables": [], + "column_lineage": null, + "debug_info": { + "confidence": 0.9, + "generalized_statement": "DROP SCHEMA my_schema" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_drop_table.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_drop_table.json new file mode 100644 index 0000000000000..ae8b3f99897dc --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_drop_table.json @@ -0,0 +1,14 @@ +{ + "query_type": "UNKNOWN", + "query_type_props": {}, + "query_fingerprint": "d1c29ad73325b08bb66e62ec00ba1d5be4412ec72b4bbc9c094f1272b9da4f86", + "in_tables": [], + "out_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,my_schema.my_table,PROD)" + ], + "column_lineage": null, + "debug_info": { + "confidence": 0.2, + "generalized_statement": "DROP TABLE my_schema.my_table" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_drop_view.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_drop_view.json new file mode 100644 index 0000000000000..6650ef396a570 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_drop_view.json @@ -0,0 +1,14 @@ +{ + "query_type": "UNKNOWN", + "query_type_props": {}, + "query_fingerprint": "35a3c60e7ed98884dde3f1f5fe9079f844832430589a3326b97d617b8303f191", + "in_tables": [], + "out_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,my_schema.my_view,PROD)" + ], + "column_lineage": null, + "debug_info": { + "confidence": 0.2, + "generalized_statement": "DROP VIEW my_schema.my_view" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index e5b669329f16c..3096c9b8269a1 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -2,11 +2,22 @@ import pytest +import datahub.testing.check_sql_parser_result as checker from datahub.testing.check_sql_parser_result import assert_sql_result RESOURCE_DIR = pathlib.Path(__file__).parent / "goldens" +@pytest.fixture(autouse=True) +def set_update_sql_parser( + pytestconfig: pytest.Config, monkeypatch: pytest.MonkeyPatch +) -> None: + update_golden = pytestconfig.getoption("--update-golden-files") + + if update_golden: + monkeypatch.setattr(checker, "UPDATE_FILES", True) + + def test_invalid_sql(): assert_sql_result( """ @@ -1202,3 +1213,43 @@ def test_bigquery_information_schema_query() -> None: dialect="bigquery", expected_file=RESOURCE_DIR / "test_bigquery_information_schema_query.json", ) + + +def test_bigquery_alter_table_column() -> None: + assert_sql_result( + """\ +ALTER TABLE `my-bq-project.covid_data.covid_deaths` drop COLUMN patient_name + """, + dialect="bigquery", + expected_file=RESOURCE_DIR / "test_bigquery_alter_table_column.json", + ) + + +def test_sqlite_drop_table() -> None: + assert_sql_result( + """\ +DROP TABLE my_schema.my_table +""", + dialect="sqlite", + expected_file=RESOURCE_DIR / "test_sqlite_drop_table.json", + ) + + +def test_sqlite_drop_view() -> None: + assert_sql_result( + """\ +DROP VIEW my_schema.my_view +""", + dialect="sqlite", + expected_file=RESOURCE_DIR / "test_sqlite_drop_view.json", + ) + + +def test_snowflake_drop_schema() -> None: + assert_sql_result( + """\ +DROP SCHEMA my_schema +""", + dialect="snowflake", + expected_file=RESOURCE_DIR / "test_snowflake_drop_schema.json", + )