Skip to content

Commit

Permalink
add ddl parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Oct 12, 2023
1 parent 7f7d226 commit 90dc0d9
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 17 deletions.
9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,16 @@ def add_lineage(

def gen_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.generate_lineage:
yield from self._gen_lineage_workunits()
for mcp in self._gen_lineage_mcps():
yield mcp.as_workunit()
if self.generate_usage_statistics:
yield from self._gen_usage_statistics_workunits()

def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
for downstream_urn in self._lineage_map:
upstreams: List[UpstreamClass] = []
fine_upstreams: List[FineGrainedLineageClass] = []
for upstream_urn, edge in self._lineage_map[downstream_urn].items():
for edge in self._lineage_map[downstream_urn].values():
upstreams.append(edge.gen_upstream_aspect())
fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects())

Expand All @@ -201,7 +202,7 @@ def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
)
yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn, aspect=upstream_lineage
).as_workunit()
)

def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._usage_aggregator.generate_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def assert_sql_result_with_resolver(
*,
expected_file: pathlib.Path,
schema_resolver: SchemaResolver,
allow_table_error: bool = False,
**kwargs: Any,
) -> None:
# HACK: Our BigQuery source overwrites this value and doesn't undo it.
Expand All @@ -36,6 +37,14 @@ def assert_sql_result_with_resolver(
**kwargs,
)

if res.debug_info.table_error:
if allow_table_error:
logger.info(
f"SQL parser table error: {res.debug_info.table_error}",
exc_info=res.debug_info.table_error,
)
else:
raise res.debug_info.table_error
if res.debug_info.column_error:
logger.warning(
f"SQL parser column error: {res.debug_info.column_error}",
Expand Down
75 changes: 63 additions & 12 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,14 +467,20 @@ def _column_level_lineage( # noqa: C901
default_db: Optional[str],
default_schema: Optional[str],
) -> List[_ColumnLineageInfo]:
if not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
is_create_ddl = _is_create_table_ddl(statement)
if (
not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
)
and not is_create_ddl
):
raise UnsupportedStatementTypeError(
f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}"
)

column_lineage: List[_ColumnLineageInfo] = []

use_case_insensitive_cols = dialect in {
# Column identifiers are case-insensitive in BigQuery, so we need to
# do a normalization step beforehand to make sure it's resolved correctly.
Expand Down Expand Up @@ -580,6 +586,38 @@ def _schema_aware_fuzzy_column_resolve(
) from e
logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect))

# Handle the create DDL case.
if is_create_ddl:
assert (
output_table is not None
), "output_table must be set for create DDL statements"

create_schema: sqlglot.exp.Schema = statement.this
sqlglot_columns = create_schema.expressions

for column_def in sqlglot_columns:
if not isinstance(column_def, sqlglot.exp.ColumnDef):
# Ignore things like constraints.
continue

output_col = _schema_aware_fuzzy_column_resolve(
output_table, column_def.name
)
output_col_type = column_def.args.get("kind")

column_lineage.append(
_ColumnLineageInfo(
downstream=_DownstreamColumnRef(
table=output_table,
column=output_col,
column_type=output_col_type,
),
upstreams=[],
)
)

return column_lineage

# Try to figure out the types of the output columns.
try:
statement = sqlglot.optimizer.annotate_types.annotate_types(
Expand All @@ -589,8 +627,6 @@ def _schema_aware_fuzzy_column_resolve(
# This is not a fatal error, so we can continue.
logger.debug("sqlglot failed to annotate types: %s", e)

column_lineage = []

try:
assert isinstance(statement, _SupportedColumnLineageTypesTuple)

Expand All @@ -599,7 +635,6 @@ def _schema_aware_fuzzy_column_resolve(
(select_col.alias_or_name, select_col) for select_col in statement.selects
]
logger.debug("output columns: %s", [col[0] for col in output_columns])
output_col: str
for output_col, original_col_expression in output_columns:
if output_col == "*":
# If schema information is available, the * will be expanded to the actual columns.
Expand Down Expand Up @@ -628,7 +663,7 @@ def _schema_aware_fuzzy_column_resolve(

# Generate SELECT lineage.
# Using a set here to deduplicate upstreams.
direct_col_upstreams: Set[_ColumnRef] = set()
direct_raw_col_upstreams: Set[_ColumnRef] = set()
for node in lineage_node.walk():
if node.downstream:
# We only want the leaf nodes.
Expand All @@ -643,8 +678,9 @@ def _schema_aware_fuzzy_column_resolve(
if node.subfield:
normalized_col = f"{normalized_col}.{node.subfield}"

col = _schema_aware_fuzzy_column_resolve(table_ref, normalized_col)
direct_col_upstreams.add(_ColumnRef(table=table_ref, column=col))
direct_raw_col_upstreams.add(
_ColumnRef(table=table_ref, column=normalized_col)
)
else:
# This branch doesn't matter. For example, a count(*) column would go here, and
# we don't get any column-level lineage for that.
Expand All @@ -665,7 +701,16 @@ def _schema_aware_fuzzy_column_resolve(
if original_col_expression.type:
output_col_type = original_col_expression.type

if not direct_col_upstreams:
# Fuzzy resolve upstream columns.
direct_resolved_col_upstreams = {
_ColumnRef(
table=edge.table,
column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column),
)
for edge in direct_raw_col_upstreams
}

if not direct_resolved_col_upstreams:
logger.debug(f' "{output_col}" has no upstreams')
column_lineage.append(
_ColumnLineageInfo(
Expand All @@ -674,12 +719,12 @@ def _schema_aware_fuzzy_column_resolve(
column=output_col,
column_type=output_col_type,
),
upstreams=sorted(direct_col_upstreams),
upstreams=sorted(direct_resolved_col_upstreams),
# logic=column_logic.sql(pretty=True, dialect=dialect),
)
)

# TODO: Also extract referenced columns (e.g. non-SELECT lineage)
# TODO: Also extract referenced columns (aka auxillary / non-SELECT lineage)
except (sqlglot.errors.OptimizeError, ValueError) as e:
raise SqlUnderstandingError(
f"sqlglot failed to compute some lineage: {e}"
Expand All @@ -700,6 +745,12 @@ def _extract_select_from_create(
return statement


def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool:
return isinstance(statement, sqlglot.exp.Create) and isinstance(
statement.this, sqlglot.exp.Schema
)


def _try_extract_select(
statement: sqlglot.exp.Expression,
) -> sqlglot.exp.Expression:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,58 @@
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)"
],
"column_lineage": null
"column_lineage": [
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)",
"column": "id",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"native_column_type": "INTEGER"
},
"upstreams": []
},
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)",
"column": "month",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
},
"upstreams": []
},
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)",
"column": "total_cost",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"native_column_type": "REAL"
},
"upstreams": []
},
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)",
"column": "area",
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"native_column_type": "REAL"
},
"upstreams": []
}
]
}

0 comments on commit 90dc0d9

Please sign in to comment.