Skip to content

Commit

Permalink
feat(sqlparser): parse create DDL statements (#9002)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 23, 2023
1 parent 633e6d6 commit 8fb95e8
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 97 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,16 @@ def add_lineage(

def gen_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.generate_lineage:
yield from self._gen_lineage_workunits()
for mcp in self._gen_lineage_mcps():
yield mcp.as_workunit()
if self.generate_usage_statistics:
yield from self._gen_usage_statistics_workunits()

def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
for downstream_urn in self._lineage_map:
upstreams: List[UpstreamClass] = []
fine_upstreams: List[FineGrainedLineageClass] = []
for upstream_urn, edge in self._lineage_map[downstream_urn].items():
for edge in self._lineage_map[downstream_urn].values():
upstreams.append(edge.gen_upstream_aspect())
fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects())

Expand All @@ -201,7 +202,7 @@ def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
)
yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn, aspect=upstream_lineage
).as_workunit()
)

def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._usage_aggregator.generate_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def assert_sql_result_with_resolver(
*,
expected_file: pathlib.Path,
schema_resolver: SchemaResolver,
allow_table_error: bool = False,
**kwargs: Any,
) -> None:
# HACK: Our BigQuery source overwrites this value and doesn't undo it.
Expand All @@ -36,6 +37,14 @@ def assert_sql_result_with_resolver(
**kwargs,
)

if res.debug_info.table_error:
if allow_table_error:
logger.info(
f"SQL parser table error: {res.debug_info.table_error}",
exc_info=res.debug_info.table_error,
)
else:
raise res.debug_info.table_error
if res.debug_info.column_error:
logger.warning(
f"SQL parser column error: {res.debug_info.column_error}",
Expand Down
92 changes: 74 additions & 18 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ class SqlParsingResult(_ParserBaseModel):
)


def _parse_statement(sql: str, dialect: str) -> sqlglot.Expression:
statement = sqlglot.parse_one(
sql, read=dialect, error_level=sqlglot.ErrorLevel.RAISE
def _parse_statement(sql: sqlglot.exp.ExpOrStr, dialect: str) -> sqlglot.Expression:
statement: sqlglot.Expression = sqlglot.maybe_parse(
sql, dialect=dialect, error_level=sqlglot.ErrorLevel.RAISE
)
return statement

Expand Down Expand Up @@ -467,14 +467,20 @@ def _column_level_lineage( # noqa: C901
default_db: Optional[str],
default_schema: Optional[str],
) -> List[_ColumnLineageInfo]:
if not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
is_create_ddl = _is_create_table_ddl(statement)
if (
not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
)
and not is_create_ddl
):
raise UnsupportedStatementTypeError(
f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}"
)

column_lineage: List[_ColumnLineageInfo] = []

use_case_insensitive_cols = dialect in {
# Column identifiers are case-insensitive in BigQuery, so we need to
# do a normalization step beforehand to make sure it's resolved correctly.
Expand Down Expand Up @@ -580,6 +586,38 @@ def _schema_aware_fuzzy_column_resolve(
) from e
logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect))

# Handle the create DDL case.
if is_create_ddl:
assert (
output_table is not None
), "output_table must be set for create DDL statements"

create_schema: sqlglot.exp.Schema = statement.this
sqlglot_columns = create_schema.expressions

for column_def in sqlglot_columns:
if not isinstance(column_def, sqlglot.exp.ColumnDef):
# Ignore things like constraints.
continue

output_col = _schema_aware_fuzzy_column_resolve(
output_table, column_def.name
)
output_col_type = column_def.args.get("kind")

column_lineage.append(
_ColumnLineageInfo(
downstream=_DownstreamColumnRef(
table=output_table,
column=output_col,
column_type=output_col_type,
),
upstreams=[],
)
)

return column_lineage

# Try to figure out the types of the output columns.
try:
statement = sqlglot.optimizer.annotate_types.annotate_types(
Expand All @@ -589,8 +627,6 @@ def _schema_aware_fuzzy_column_resolve(
# This is not a fatal error, so we can continue.
logger.debug("sqlglot failed to annotate types: %s", e)

column_lineage = []

try:
assert isinstance(statement, _SupportedColumnLineageTypesTuple)

Expand All @@ -599,7 +635,6 @@ def _schema_aware_fuzzy_column_resolve(
(select_col.alias_or_name, select_col) for select_col in statement.selects
]
logger.debug("output columns: %s", [col[0] for col in output_columns])
output_col: str
for output_col, original_col_expression in output_columns:
if output_col == "*":
# If schema information is available, the * will be expanded to the actual columns.
Expand Down Expand Up @@ -628,7 +663,7 @@ def _schema_aware_fuzzy_column_resolve(

# Generate SELECT lineage.
# Using a set here to deduplicate upstreams.
direct_col_upstreams: Set[_ColumnRef] = set()
direct_raw_col_upstreams: Set[_ColumnRef] = set()
for node in lineage_node.walk():
if node.downstream:
# We only want the leaf nodes.
Expand All @@ -643,8 +678,9 @@ def _schema_aware_fuzzy_column_resolve(
if node.subfield:
normalized_col = f"{normalized_col}.{node.subfield}"

col = _schema_aware_fuzzy_column_resolve(table_ref, normalized_col)
direct_col_upstreams.add(_ColumnRef(table=table_ref, column=col))
direct_raw_col_upstreams.add(
_ColumnRef(table=table_ref, column=normalized_col)
)
else:
# This branch doesn't matter. For example, a count(*) column would go here, and
# we don't get any column-level lineage for that.
Expand All @@ -665,7 +701,16 @@ def _schema_aware_fuzzy_column_resolve(
if original_col_expression.type:
output_col_type = original_col_expression.type

if not direct_col_upstreams:
# Fuzzy resolve upstream columns.
direct_resolved_col_upstreams = {
_ColumnRef(
table=edge.table,
column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column),
)
for edge in direct_raw_col_upstreams
}

if not direct_resolved_col_upstreams:
logger.debug(f' "{output_col}" has no upstreams')
column_lineage.append(
_ColumnLineageInfo(
Expand All @@ -674,12 +719,12 @@ def _schema_aware_fuzzy_column_resolve(
column=output_col,
column_type=output_col_type,
),
upstreams=sorted(direct_col_upstreams),
upstreams=sorted(direct_resolved_col_upstreams),
# logic=column_logic.sql(pretty=True, dialect=dialect),
)
)

# TODO: Also extract referenced columns (e.g. non-SELECT lineage)
# TODO: Also extract referenced columns (aka auxillary / non-SELECT lineage)
except (sqlglot.errors.OptimizeError, ValueError) as e:
raise SqlUnderstandingError(
f"sqlglot failed to compute some lineage: {e}"
Expand All @@ -700,6 +745,12 @@ def _extract_select_from_create(
return statement


def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool:
return isinstance(statement, sqlglot.exp.Create) and isinstance(
statement.this, sqlglot.exp.Schema
)


def _try_extract_select(
statement: sqlglot.exp.Expression,
) -> sqlglot.exp.Expression:
Expand Down Expand Up @@ -766,6 +817,7 @@ def _translate_sqlglot_type(
def _translate_internal_column_lineage(
table_name_urn_mapping: Dict[_TableName, str],
raw_column_lineage: _ColumnLineageInfo,
dialect: str,
) -> ColumnLineageInfo:
downstream_urn = None
if raw_column_lineage.downstream.table:
Expand All @@ -779,7 +831,9 @@ def _translate_internal_column_lineage(
)
if raw_column_lineage.downstream.column_type
else None,
native_column_type=raw_column_lineage.downstream.column_type.sql()
native_column_type=raw_column_lineage.downstream.column_type.sql(
dialect=dialect
)
if raw_column_lineage.downstream.column_type
and raw_column_lineage.downstream.column_type.this
!= sqlglot.exp.DataType.Type.UNKNOWN
Expand All @@ -800,12 +854,14 @@ def _get_dialect(platform: str) -> str:
# TODO: convert datahub platform names to sqlglot dialect
if platform == "presto-on-hive":
return "hive"
if platform == "mssql":
return "tsql"
else:
return platform


def _sqlglot_lineage_inner(
sql: str,
sql: sqlglot.exp.ExpOrStr,
schema_resolver: SchemaResolver,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
Expand Down Expand Up @@ -918,7 +974,7 @@ def _sqlglot_lineage_inner(
if column_lineage:
column_lineage_urns = [
_translate_internal_column_lineage(
table_name_urn_mapping, internal_col_lineage
table_name_urn_mapping, internal_col_lineage, dialect=dialect
)
for internal_col_lineage in column_lineage
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -36,7 +36,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -54,7 +54,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -72,7 +72,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -34,7 +34,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -52,7 +52,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -39,7 +39,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Loading

0 comments on commit 8fb95e8

Please sign in to comment.