Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sqlparser): parse create DDL statements #9002

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,16 @@ def add_lineage(

def gen_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.generate_lineage:
yield from self._gen_lineage_workunits()
for mcp in self._gen_lineage_mcps():
yield mcp.as_workunit()
if self.generate_usage_statistics:
yield from self._gen_usage_statistics_workunits()

def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
for downstream_urn in self._lineage_map:
upstreams: List[UpstreamClass] = []
fine_upstreams: List[FineGrainedLineageClass] = []
for upstream_urn, edge in self._lineage_map[downstream_urn].items():
for edge in self._lineage_map[downstream_urn].values():
upstreams.append(edge.gen_upstream_aspect())
fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects())

Expand All @@ -201,7 +202,7 @@ def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
)
yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn, aspect=upstream_lineage
).as_workunit()
)

def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._usage_aggregator.generate_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def assert_sql_result_with_resolver(
*,
expected_file: pathlib.Path,
schema_resolver: SchemaResolver,
allow_table_error: bool = False,
**kwargs: Any,
) -> None:
# HACK: Our BigQuery source overwrites this value and doesn't undo it.
Expand All @@ -36,6 +37,14 @@ def assert_sql_result_with_resolver(
**kwargs,
)

if res.debug_info.table_error:
if allow_table_error:
logger.info(
f"SQL parser table error: {res.debug_info.table_error}",
exc_info=res.debug_info.table_error,
)
else:
raise res.debug_info.table_error
if res.debug_info.column_error:
logger.warning(
f"SQL parser column error: {res.debug_info.column_error}",
Expand Down
92 changes: 74 additions & 18 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ class SqlParsingResult(_ParserBaseModel):
)


def _parse_statement(sql: str, dialect: str) -> sqlglot.Expression:
statement = sqlglot.parse_one(
sql, read=dialect, error_level=sqlglot.ErrorLevel.RAISE
def _parse_statement(sql: sqlglot.exp.ExpOrStr, dialect: str) -> sqlglot.Expression:
statement: sqlglot.Expression = sqlglot.maybe_parse(
sql, dialect=dialect, error_level=sqlglot.ErrorLevel.RAISE
)
return statement

Expand Down Expand Up @@ -467,14 +467,20 @@ def _column_level_lineage( # noqa: C901
default_db: Optional[str],
default_schema: Optional[str],
) -> List[_ColumnLineageInfo]:
if not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
is_create_ddl = _is_create_table_ddl(statement)
if (
not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
)
and not is_create_ddl
):
raise UnsupportedStatementTypeError(
f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}"
)

column_lineage: List[_ColumnLineageInfo] = []

use_case_insensitive_cols = dialect in {
# Column identifiers are case-insensitive in BigQuery, so we need to
# do a normalization step beforehand to make sure it's resolved correctly.
Expand Down Expand Up @@ -580,6 +586,38 @@ def _schema_aware_fuzzy_column_resolve(
) from e
logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect))

# Handle the create DDL case.
if is_create_ddl:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is getting quite large. Doesn't have to be now but might be nice to break this up if we can. I don't know if I could make edits to this code right now, if I had to

assert (
output_table is not None
), "output_table must be set for create DDL statements"

create_schema: sqlglot.exp.Schema = statement.this
sqlglot_columns = create_schema.expressions

for column_def in sqlglot_columns:
if not isinstance(column_def, sqlglot.exp.ColumnDef):
# Ignore things like constraints.
continue

output_col = _schema_aware_fuzzy_column_resolve(
output_table, column_def.name
)
output_col_type = column_def.args.get("kind")

column_lineage.append(
_ColumnLineageInfo(
downstream=_DownstreamColumnRef(
table=output_table,
column=output_col,
column_type=output_col_type,
),
upstreams=[],
)
)

return column_lineage

# Try to figure out the types of the output columns.
try:
statement = sqlglot.optimizer.annotate_types.annotate_types(
Expand All @@ -589,8 +627,6 @@ def _schema_aware_fuzzy_column_resolve(
# This is not a fatal error, so we can continue.
logger.debug("sqlglot failed to annotate types: %s", e)

column_lineage = []

try:
assert isinstance(statement, _SupportedColumnLineageTypesTuple)

Expand All @@ -599,7 +635,6 @@ def _schema_aware_fuzzy_column_resolve(
(select_col.alias_or_name, select_col) for select_col in statement.selects
]
logger.debug("output columns: %s", [col[0] for col in output_columns])
output_col: str
for output_col, original_col_expression in output_columns:
if output_col == "*":
# If schema information is available, the * will be expanded to the actual columns.
Expand Down Expand Up @@ -628,7 +663,7 @@ def _schema_aware_fuzzy_column_resolve(

# Generate SELECT lineage.
# Using a set here to deduplicate upstreams.
direct_col_upstreams: Set[_ColumnRef] = set()
direct_raw_col_upstreams: Set[_ColumnRef] = set()
for node in lineage_node.walk():
if node.downstream:
# We only want the leaf nodes.
Expand All @@ -643,8 +678,9 @@ def _schema_aware_fuzzy_column_resolve(
if node.subfield:
normalized_col = f"{normalized_col}.{node.subfield}"

col = _schema_aware_fuzzy_column_resolve(table_ref, normalized_col)
direct_col_upstreams.add(_ColumnRef(table=table_ref, column=col))
direct_raw_col_upstreams.add(
_ColumnRef(table=table_ref, column=normalized_col)
)
else:
# This branch doesn't matter. For example, a count(*) column would go here, and
# we don't get any column-level lineage for that.
Expand All @@ -665,7 +701,16 @@ def _schema_aware_fuzzy_column_resolve(
if original_col_expression.type:
output_col_type = original_col_expression.type

if not direct_col_upstreams:
# Fuzzy resolve upstream columns.
direct_resolved_col_upstreams = {
_ColumnRef(
table=edge.table,
column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column),
)
for edge in direct_raw_col_upstreams
}

if not direct_resolved_col_upstreams:
logger.debug(f' "{output_col}" has no upstreams')
column_lineage.append(
_ColumnLineageInfo(
Expand All @@ -674,12 +719,12 @@ def _schema_aware_fuzzy_column_resolve(
column=output_col,
column_type=output_col_type,
),
upstreams=sorted(direct_col_upstreams),
upstreams=sorted(direct_resolved_col_upstreams),
# logic=column_logic.sql(pretty=True, dialect=dialect),
)
)

# TODO: Also extract referenced columns (e.g. non-SELECT lineage)
# TODO: Also extract referenced columns (aka auxillary / non-SELECT lineage)
except (sqlglot.errors.OptimizeError, ValueError) as e:
raise SqlUnderstandingError(
f"sqlglot failed to compute some lineage: {e}"
Expand All @@ -700,6 +745,12 @@ def _extract_select_from_create(
return statement


def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool:
return isinstance(statement, sqlglot.exp.Create) and isinstance(
statement.this, sqlglot.exp.Schema
)


def _try_extract_select(
statement: sqlglot.exp.Expression,
) -> sqlglot.exp.Expression:
Expand Down Expand Up @@ -766,6 +817,7 @@ def _translate_sqlglot_type(
def _translate_internal_column_lineage(
table_name_urn_mapping: Dict[_TableName, str],
raw_column_lineage: _ColumnLineageInfo,
dialect: str,
) -> ColumnLineageInfo:
downstream_urn = None
if raw_column_lineage.downstream.table:
Expand All @@ -779,7 +831,9 @@ def _translate_internal_column_lineage(
)
if raw_column_lineage.downstream.column_type
else None,
native_column_type=raw_column_lineage.downstream.column_type.sql()
native_column_type=raw_column_lineage.downstream.column_type.sql(
dialect=dialect
)
if raw_column_lineage.downstream.column_type
and raw_column_lineage.downstream.column_type.this
!= sqlglot.exp.DataType.Type.UNKNOWN
Expand All @@ -800,12 +854,14 @@ def _get_dialect(platform: str) -> str:
# TODO: convert datahub platform names to sqlglot dialect
if platform == "presto-on-hive":
return "hive"
if platform == "mssql":
return "tsql"
else:
return platform


def _sqlglot_lineage_inner(
sql: str,
sql: sqlglot.exp.ExpOrStr,
schema_resolver: SchemaResolver,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
Expand Down Expand Up @@ -918,7 +974,7 @@ def _sqlglot_lineage_inner(
if column_lineage:
column_lineage_urns = [
_translate_internal_column_lineage(
table_name_urn_mapping, internal_col_lineage
table_name_urn_mapping, internal_col_lineage, dialect=dialect
)
for internal_col_lineage in column_lineage
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -36,7 +36,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -54,7 +54,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -72,7 +72,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -34,7 +34,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -52,7 +52,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -39,7 +39,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Loading
Loading