diff --git a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py index 071d590f270f8..dedcfa0385f75 100644 --- a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py +++ b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py @@ -179,15 +179,16 @@ def add_lineage( def gen_workunits(self) -> Iterable[MetadataWorkUnit]: if self.generate_lineage: - yield from self._gen_lineage_workunits() + for mcp in self._gen_lineage_mcps(): + yield mcp.as_workunit() if self.generate_usage_statistics: yield from self._gen_usage_statistics_workunits() - def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]: + def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: for downstream_urn in self._lineage_map: upstreams: List[UpstreamClass] = [] fine_upstreams: List[FineGrainedLineageClass] = [] - for upstream_urn, edge in self._lineage_map[downstream_urn].items(): + for edge in self._lineage_map[downstream_urn].values(): upstreams.append(edge.gen_upstream_aspect()) fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects()) @@ -201,7 +202,7 @@ def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]: ) yield MetadataChangeProposalWrapper( entityUrn=downstream_urn, aspect=upstream_lineage - ).as_workunit() + ) def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]: yield from self._usage_aggregator.generate_workunits( diff --git a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py index b3b1331db768b..2b610947e9043 100644 --- a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py +++ b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py @@ -24,6 +24,7 @@ def assert_sql_result_with_resolver( *, expected_file: pathlib.Path, schema_resolver: SchemaResolver, + allow_table_error: bool = False, **kwargs: Any, ) -> None: # HACK: Our BigQuery source overwrites this value and doesn't undo it. @@ -36,6 +37,14 @@ def assert_sql_result_with_resolver( **kwargs, ) + if res.debug_info.table_error: + if allow_table_error: + logger.info( + f"SQL parser table error: {res.debug_info.table_error}", + exc_info=res.debug_info.table_error, + ) + else: + raise res.debug_info.table_error if res.debug_info.column_error: logger.warning( f"SQL parser column error: {res.debug_info.column_error}", diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index ddfe4c4b87425..a44bb328b13a5 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -467,14 +467,20 @@ def _column_level_lineage( # noqa: C901 default_db: Optional[str], default_schema: Optional[str], ) -> List[_ColumnLineageInfo]: - if not isinstance( - statement, - _SupportedColumnLineageTypesTuple, + is_create_ddl = _is_create_table_ddl(statement) + if ( + not isinstance( + statement, + _SupportedColumnLineageTypesTuple, + ) + and not is_create_ddl ): raise UnsupportedStatementTypeError( f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}" ) + column_lineage: List[_ColumnLineageInfo] = [] + use_case_insensitive_cols = dialect in { # Column identifiers are case-insensitive in BigQuery, so we need to # do a normalization step beforehand to make sure it's resolved correctly. @@ -580,6 +586,38 @@ def _schema_aware_fuzzy_column_resolve( ) from e logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect)) + # Handle the create DDL case. + if is_create_ddl: + assert ( + output_table is not None + ), "output_table must be set for create DDL statements" + + create_schema: sqlglot.exp.Schema = statement.this + sqlglot_columns = create_schema.expressions + + for column_def in sqlglot_columns: + if not isinstance(column_def, sqlglot.exp.ColumnDef): + # Ignore things like constraints. + continue + + output_col = _schema_aware_fuzzy_column_resolve( + output_table, column_def.name + ) + output_col_type = column_def.args.get("kind") + + column_lineage.append( + _ColumnLineageInfo( + downstream=_DownstreamColumnRef( + table=output_table, + column=output_col, + column_type=output_col_type, + ), + upstreams=[], + ) + ) + + return column_lineage + # Try to figure out the types of the output columns. try: statement = sqlglot.optimizer.annotate_types.annotate_types( @@ -589,8 +627,6 @@ def _schema_aware_fuzzy_column_resolve( # This is not a fatal error, so we can continue. logger.debug("sqlglot failed to annotate types: %s", e) - column_lineage = [] - try: assert isinstance(statement, _SupportedColumnLineageTypesTuple) @@ -599,7 +635,6 @@ def _schema_aware_fuzzy_column_resolve( (select_col.alias_or_name, select_col) for select_col in statement.selects ] logger.debug("output columns: %s", [col[0] for col in output_columns]) - output_col: str for output_col, original_col_expression in output_columns: if output_col == "*": # If schema information is available, the * will be expanded to the actual columns. @@ -628,7 +663,7 @@ def _schema_aware_fuzzy_column_resolve( # Generate SELECT lineage. # Using a set here to deduplicate upstreams. - direct_col_upstreams: Set[_ColumnRef] = set() + direct_raw_col_upstreams: Set[_ColumnRef] = set() for node in lineage_node.walk(): if node.downstream: # We only want the leaf nodes. @@ -643,8 +678,9 @@ def _schema_aware_fuzzy_column_resolve( if node.subfield: normalized_col = f"{normalized_col}.{node.subfield}" - col = _schema_aware_fuzzy_column_resolve(table_ref, normalized_col) - direct_col_upstreams.add(_ColumnRef(table=table_ref, column=col)) + direct_raw_col_upstreams.add( + _ColumnRef(table=table_ref, column=normalized_col) + ) else: # This branch doesn't matter. For example, a count(*) column would go here, and # we don't get any column-level lineage for that. @@ -665,7 +701,16 @@ def _schema_aware_fuzzy_column_resolve( if original_col_expression.type: output_col_type = original_col_expression.type - if not direct_col_upstreams: + # Fuzzy resolve upstream columns. + direct_resolved_col_upstreams = { + _ColumnRef( + table=edge.table, + column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column), + ) + for edge in direct_raw_col_upstreams + } + + if not direct_resolved_col_upstreams: logger.debug(f' "{output_col}" has no upstreams') column_lineage.append( _ColumnLineageInfo( @@ -674,12 +719,12 @@ def _schema_aware_fuzzy_column_resolve( column=output_col, column_type=output_col_type, ), - upstreams=sorted(direct_col_upstreams), + upstreams=sorted(direct_resolved_col_upstreams), # logic=column_logic.sql(pretty=True, dialect=dialect), ) ) - # TODO: Also extract referenced columns (e.g. non-SELECT lineage) + # TODO: Also extract referenced columns (aka auxillary / non-SELECT lineage) except (sqlglot.errors.OptimizeError, ValueError) as e: raise SqlUnderstandingError( f"sqlglot failed to compute some lineage: {e}" @@ -700,6 +745,12 @@ def _extract_select_from_create( return statement +def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool: + return isinstance(statement, sqlglot.exp.Create) and isinstance( + statement.this, sqlglot.exp.Schema + ) + + def _try_extract_select( statement: sqlglot.exp.Expression, ) -> sqlglot.exp.Expression: diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json index 4773974545bfa..cf31b71cb50f6 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json @@ -4,5 +4,58 @@ "out_tables": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)" ], - "column_lineage": null + "column_lineage": [ + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "id", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "INTEGER" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "month", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "total_cost", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "REAL" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "area", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "REAL" + }, + "upstreams": [] + } + ] } \ No newline at end of file