Skip to content

Commit

Permalink
Merge branch 'master' into patch-4
Browse files Browse the repository at this point in the history
  • Loading branch information
dyhn78 authored Oct 23, 2023
2 parents 60fe142 + 10456c5 commit 6dea57c
Show file tree
Hide file tree
Showing 18 changed files with 431 additions and 99 deletions.
1 change: 0 additions & 1 deletion docker/datahub-frontend/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ export JAVA_OPTS="-Xms512m \
-Djava.security.auth.login.config=datahub-frontend/conf/jaas.conf \
-Dlogback.configurationFile=datahub-frontend/conf/logback.xml \
-Dlogback.debug=false \
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 \
${PROMETHEUS_AGENT:-} ${OTEL_AGENT:-} \
${TRUSTSTORE_FILE:-} ${TRUSTSTORE_TYPE:-} ${TRUSTSTORE_PASSWORD:-} \
${HTTP_PROXY:-} ${HTTPS_PROXY:-} ${NO_PROXY:-} \
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion metadata-ingestion/adding-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Some sources use the default `SourceReport` class, but others inherit and extend

### 3. Implement the source itself

The core for the source is the `get_workunits` method, which produces a stream of metadata events (typically MCP objects) wrapped up in a MetadataWorkUnit.
The core for the source is the `get_workunits_internal` method, which produces a stream of metadata events (typically MCP objects) wrapped up in a MetadataWorkUnit.
The [file source](./src/datahub/ingestion/source/file.py) is a good and simple example.

The MetadataChangeEventClass is defined in the metadata models which are generated
Expand Down
9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,16 @@ def add_lineage(

def gen_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.generate_lineage:
yield from self._gen_lineage_workunits()
for mcp in self._gen_lineage_mcps():
yield mcp.as_workunit()
if self.generate_usage_statistics:
yield from self._gen_usage_statistics_workunits()

def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
for downstream_urn in self._lineage_map:
upstreams: List[UpstreamClass] = []
fine_upstreams: List[FineGrainedLineageClass] = []
for upstream_urn, edge in self._lineage_map[downstream_urn].items():
for edge in self._lineage_map[downstream_urn].values():
upstreams.append(edge.gen_upstream_aspect())
fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects())

Expand All @@ -201,7 +202,7 @@ def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
)
yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn, aspect=upstream_lineage
).as_workunit()
)

def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._usage_aggregator.generate_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def assert_sql_result_with_resolver(
*,
expected_file: pathlib.Path,
schema_resolver: SchemaResolver,
allow_table_error: bool = False,
**kwargs: Any,
) -> None:
# HACK: Our BigQuery source overwrites this value and doesn't undo it.
Expand All @@ -36,6 +37,14 @@ def assert_sql_result_with_resolver(
**kwargs,
)

if res.debug_info.table_error:
if allow_table_error:
logger.info(
f"SQL parser table error: {res.debug_info.table_error}",
exc_info=res.debug_info.table_error,
)
else:
raise res.debug_info.table_error
if res.debug_info.column_error:
logger.warning(
f"SQL parser column error: {res.debug_info.column_error}",
Expand Down
92 changes: 74 additions & 18 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ class SqlParsingResult(_ParserBaseModel):
)


def _parse_statement(sql: str, dialect: str) -> sqlglot.Expression:
statement = sqlglot.parse_one(
sql, read=dialect, error_level=sqlglot.ErrorLevel.RAISE
def _parse_statement(sql: sqlglot.exp.ExpOrStr, dialect: str) -> sqlglot.Expression:
statement: sqlglot.Expression = sqlglot.maybe_parse(
sql, dialect=dialect, error_level=sqlglot.ErrorLevel.RAISE
)
return statement

Expand Down Expand Up @@ -467,14 +467,20 @@ def _column_level_lineage( # noqa: C901
default_db: Optional[str],
default_schema: Optional[str],
) -> List[_ColumnLineageInfo]:
if not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
is_create_ddl = _is_create_table_ddl(statement)
if (
not isinstance(
statement,
_SupportedColumnLineageTypesTuple,
)
and not is_create_ddl
):
raise UnsupportedStatementTypeError(
f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}"
)

column_lineage: List[_ColumnLineageInfo] = []

use_case_insensitive_cols = dialect in {
# Column identifiers are case-insensitive in BigQuery, so we need to
# do a normalization step beforehand to make sure it's resolved correctly.
Expand Down Expand Up @@ -580,6 +586,38 @@ def _schema_aware_fuzzy_column_resolve(
) from e
logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect))

# Handle the create DDL case.
if is_create_ddl:
assert (
output_table is not None
), "output_table must be set for create DDL statements"

create_schema: sqlglot.exp.Schema = statement.this
sqlglot_columns = create_schema.expressions

for column_def in sqlglot_columns:
if not isinstance(column_def, sqlglot.exp.ColumnDef):
# Ignore things like constraints.
continue

output_col = _schema_aware_fuzzy_column_resolve(
output_table, column_def.name
)
output_col_type = column_def.args.get("kind")

column_lineage.append(
_ColumnLineageInfo(
downstream=_DownstreamColumnRef(
table=output_table,
column=output_col,
column_type=output_col_type,
),
upstreams=[],
)
)

return column_lineage

# Try to figure out the types of the output columns.
try:
statement = sqlglot.optimizer.annotate_types.annotate_types(
Expand All @@ -589,8 +627,6 @@ def _schema_aware_fuzzy_column_resolve(
# This is not a fatal error, so we can continue.
logger.debug("sqlglot failed to annotate types: %s", e)

column_lineage = []

try:
assert isinstance(statement, _SupportedColumnLineageTypesTuple)

Expand All @@ -599,7 +635,6 @@ def _schema_aware_fuzzy_column_resolve(
(select_col.alias_or_name, select_col) for select_col in statement.selects
]
logger.debug("output columns: %s", [col[0] for col in output_columns])
output_col: str
for output_col, original_col_expression in output_columns:
if output_col == "*":
# If schema information is available, the * will be expanded to the actual columns.
Expand Down Expand Up @@ -628,7 +663,7 @@ def _schema_aware_fuzzy_column_resolve(

# Generate SELECT lineage.
# Using a set here to deduplicate upstreams.
direct_col_upstreams: Set[_ColumnRef] = set()
direct_raw_col_upstreams: Set[_ColumnRef] = set()
for node in lineage_node.walk():
if node.downstream:
# We only want the leaf nodes.
Expand All @@ -643,8 +678,9 @@ def _schema_aware_fuzzy_column_resolve(
if node.subfield:
normalized_col = f"{normalized_col}.{node.subfield}"

col = _schema_aware_fuzzy_column_resolve(table_ref, normalized_col)
direct_col_upstreams.add(_ColumnRef(table=table_ref, column=col))
direct_raw_col_upstreams.add(
_ColumnRef(table=table_ref, column=normalized_col)
)
else:
# This branch doesn't matter. For example, a count(*) column would go here, and
# we don't get any column-level lineage for that.
Expand All @@ -665,7 +701,16 @@ def _schema_aware_fuzzy_column_resolve(
if original_col_expression.type:
output_col_type = original_col_expression.type

if not direct_col_upstreams:
# Fuzzy resolve upstream columns.
direct_resolved_col_upstreams = {
_ColumnRef(
table=edge.table,
column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column),
)
for edge in direct_raw_col_upstreams
}

if not direct_resolved_col_upstreams:
logger.debug(f' "{output_col}" has no upstreams')
column_lineage.append(
_ColumnLineageInfo(
Expand All @@ -674,12 +719,12 @@ def _schema_aware_fuzzy_column_resolve(
column=output_col,
column_type=output_col_type,
),
upstreams=sorted(direct_col_upstreams),
upstreams=sorted(direct_resolved_col_upstreams),
# logic=column_logic.sql(pretty=True, dialect=dialect),
)
)

# TODO: Also extract referenced columns (e.g. non-SELECT lineage)
# TODO: Also extract referenced columns (aka auxillary / non-SELECT lineage)
except (sqlglot.errors.OptimizeError, ValueError) as e:
raise SqlUnderstandingError(
f"sqlglot failed to compute some lineage: {e}"
Expand All @@ -700,6 +745,12 @@ def _extract_select_from_create(
return statement


def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool:
return isinstance(statement, sqlglot.exp.Create) and isinstance(
statement.this, sqlglot.exp.Schema
)


def _try_extract_select(
statement: sqlglot.exp.Expression,
) -> sqlglot.exp.Expression:
Expand Down Expand Up @@ -766,6 +817,7 @@ def _translate_sqlglot_type(
def _translate_internal_column_lineage(
table_name_urn_mapping: Dict[_TableName, str],
raw_column_lineage: _ColumnLineageInfo,
dialect: str,
) -> ColumnLineageInfo:
downstream_urn = None
if raw_column_lineage.downstream.table:
Expand All @@ -779,7 +831,9 @@ def _translate_internal_column_lineage(
)
if raw_column_lineage.downstream.column_type
else None,
native_column_type=raw_column_lineage.downstream.column_type.sql()
native_column_type=raw_column_lineage.downstream.column_type.sql(
dialect=dialect
)
if raw_column_lineage.downstream.column_type
and raw_column_lineage.downstream.column_type.this
!= sqlglot.exp.DataType.Type.UNKNOWN
Expand All @@ -800,12 +854,14 @@ def _get_dialect(platform: str) -> str:
# TODO: convert datahub platform names to sqlglot dialect
if platform == "presto-on-hive":
return "hive"
if platform == "mssql":
return "tsql"
else:
return platform


def _sqlglot_lineage_inner(
sql: str,
sql: sqlglot.exp.ExpOrStr,
schema_resolver: SchemaResolver,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
Expand Down Expand Up @@ -918,7 +974,7 @@ def _sqlglot_lineage_inner(
if column_lineage:
column_lineage_urns = [
_translate_internal_column_lineage(
table_name_urn_mapping, internal_col_lineage
table_name_urn_mapping, internal_col_lineage, dialect=dialect
)
for internal_col_lineage in column_lineage
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -36,7 +36,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -54,7 +54,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -72,7 +72,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -32,7 +32,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -34,7 +34,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand All @@ -52,7 +52,7 @@
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"native_column_type": "TEXT"
"native_column_type": "STRING"
},
"upstreams": [
{
Expand Down
Loading

0 comments on commit 6dea57c

Please sign in to comment.