Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): support view lineage for all sqlalchemy sources #9039

11 changes: 6 additions & 5 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@
"grpcio-tools>=1.44.0,<2",
}

usage_common = {
"sqlparse",
}


sql_common = {
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
Expand All @@ -115,7 +120,7 @@
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
}
} | usage_common

sqllineage_lib = {
"sqllineage==1.3.8",
Expand Down Expand Up @@ -243,10 +248,6 @@

powerbi_report_server = {"requests", "requests_ntlm"}

usage_common = {
"sqlparse",
}

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
"databricks-sdk>=0.9.0",
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class VersionedConfig(ConfigModel):

class LineageConfig(ConfigModel):
incremental_lineage: bool = Field(
default=True,
default=False,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incremental lineage requires presence of DataHubGraph, which is available by default only when using DataHub rest sink. We plan to keep this default enabled in managed ingestion.

description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def process_sql_parsing_result(
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
include_column_lineage: bool = True,
) -> Iterable[MetadataWorkUnit]:
"""Process a single query and yield any generated workunits.

Expand All @@ -130,7 +131,9 @@ def process_sql_parsing_result(
_merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=result.column_lineage,
column_lineage=result.column_lineage
if include_column_lineage
else None,
upstream_edges=self._lineage_map[downstream_urn],
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ def auto_incremental_lineage(
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
if lineage_aspect.fineGrainedLineages:
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield _convert_upstream_lineage_to_patch(
Copy link
Collaborator Author

@mayurinehate mayurinehate Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a table level upstream only aspect with empty upstreams, we ignore it, as part of incremental lineage.

urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns

return [
auto_lowercase_dataset_urns,
auto_status_aspect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ class DBTCommonConfig(
default=False,
description="When enabled, dbt test warnings will be treated as failures.",
)
# override fault value to True.
incremental_lineage: bool = Field(
default=True,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

@validator("target_platform")
def validate_target_platform_value(cls, target_platform: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def get_workunits(
return

self._populate_external_lineage_map(discovered_tables)

if self.config.include_view_lineage:
if len(discovered_views) > 0:
yield from self.get_view_upstream_workunits(
Expand Down Expand Up @@ -200,14 +199,15 @@ def _gen_workunit_from_sql_parsing_result(
self,
dataset_identifier: str,
result: SqlParsingResult,
) -> MetadataWorkUnit:
) -> Iterable[MetadataWorkUnit]:
upstreams, fine_upstreams = self.get_upstreams_from_sql_parsing_result(
self.dataset_urn_builder(dataset_identifier), result
)
self.report.num_views_with_upstreams += 1
return self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)
if upstreams:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not emit upstream lineage if no upstreams are found.

self.report.num_views_with_upstreams += 1
yield self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)

def _gen_workunits_from_query_result(
self,
Expand Down Expand Up @@ -251,7 +251,7 @@ def get_view_upstream_workunits(
)
if result:
views_processed.add(view_identifier)
yield self._gen_workunit_from_sql_parsing_result(
yield from self._gen_workunit_from_sql_parsing_result(
view_identifier, result
)
self.report.view_lineage_parse_secs = timer.elapsed_seconds()
Expand Down
83 changes: 71 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import json
import logging
import re
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Union

from pydantic.class_validators import validator
from pydantic.fields import Field

# This import verifies that the dependencies are available.
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveDialect, HiveTimestamp
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
Expand All @@ -18,8 +21,10 @@
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.sql.sql_common import register_custom_type
from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
Expand All @@ -31,6 +36,7 @@
SchemaField,
TimeTypeClass,
)
from datahub.metadata.schema_classes import ViewPropertiesClass
from datahub.utilities import config_clean
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column

Expand Down Expand Up @@ -90,19 +96,34 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw):
logger.warning(f"Failed to patch method due to {e}")


@reflection.cache # type: ignore
def get_view_names_patched(self, connection, schema=None, **kw):
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
return [row[0] for row in connection.execute(query)]


@reflection.cache # type: ignore
def get_view_definition_patched(self, connection, view_name, schema=None, **kw):
full_table = self.identifier_preparer.quote_identifier(view_name)
if schema:
full_table = "{}.{}".format(
self.identifier_preparer.quote_identifier(schema),
self.identifier_preparer.quote_identifier(view_name),
)
row = connection.execute("SHOW CREATE TABLE {}".format(full_table)).fetchone()
return row[0]


HiveDialect.get_view_names = get_view_names_patched
HiveDialect.get_view_definition = get_view_definition_patched


class HiveConfig(TwoTierSQLAlchemyConfig):
# defaults
scheme = Field(default="hive", hidden_from_docs=True)

# Hive SQLAlchemy connector returns views as tables.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# Disabling views helps us prevent this duplication.
include_views = Field(
default=False,
hidden_from_docs=True,
description="Hive SQLAlchemy connector returns views as tables. See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273. Disabling views helps us prevent this duplication.",
)

@validator("host_port")
def clean_host_port(cls, v):
return config_clean.remove_protocol(v)
Expand Down Expand Up @@ -174,3 +195,41 @@ def get_schema_fields_for_column(
return new_fields

return fields

# Hive SQLAlchemy connector returns views as tables in get_table_names.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# This override makes sure that we ingest view definitions for views
def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: SQLCommonConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)

try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""

if view_definition:
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()
20 changes: 10 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ class BasePostgresConfig(BasicSQLAlchemyConfig):


class PostgresConfig(BasePostgresConfig):
include_view_lineage = Field(
default=False, description="Include table lineage for views"
)

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand Down Expand Up @@ -183,9 +179,10 @@ def get_inspectors(self) -> Iterable[Inspector]:
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()

for inspector in self.get_inspectors():
if self.config.include_view_lineage:
yield from self._get_view_lineage_workunits(inspector)
if self.views_failed_parsing:
for inspector in self.get_inspectors():
if self.config.include_view_lineage:
yield from self._get_view_lineage_workunits(inspector)

def _get_view_lineage_elements(
self, inspector: Inspector
Expand Down Expand Up @@ -245,11 +242,14 @@ def _get_view_lineage_workunits(
dependent_view, dependent_schema = key

# Construct a lineage object.
view_identifier = self.get_identifier(
schema=dependent_schema, entity=dependent_view, inspector=inspector
)
if view_identifier not in self.views_failed_parsing:
return
urn = mce_builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=self.get_identifier(
schema=dependent_schema, entity=dependent_view, inspector=inspector
),
name=view_identifier,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
Expand Down
Loading
Loading