Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/tableau): support column level lineage for custom sql #8466

Merged
merged 21 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5b17ea4
wip
siddiquebagwan-gslab Jul 18, 2023
c0966f4
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 19, 2023
1a06fb0
lineage working with new parser
siddiquebagwan-gslab Jul 19, 2023
965366d
CLL
siddiquebagwan-gslab Jul 19, 2023
b70ebb4
CLL
siddiquebagwan-gslab Jul 20, 2023
9720834
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 20, 2023
dce60e7
lint fix
siddiquebagwan-gslab Jul 20, 2023
be020a6
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 24, 2023
f785f25
test case
siddiquebagwan-gslab Jul 24, 2023
0c9a275
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 25, 2023
c6af421
test case
siddiquebagwan-gslab Jul 25, 2023
8942921
lint fix
siddiquebagwan-gslab Jul 25, 2023
b774909
Merge branch 'master' into master+tableau-cll
siddiquebagwan Jul 25, 2023
3c53f5f
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 26, 2023
fd561aa
review comments
siddiquebagwan-gslab Jul 26, 2023
bb66232
Merge branch 'master+tableau-cll' of github.com:mohdsiddique/datahub …
siddiquebagwan-gslab Jul 26, 2023
e6dddfc
Merge branch 'master' into master+tableau-cll
siddiquebagwan Jul 26, 2023
aca19cd
resolve merge conflict
siddiquebagwan-gslab Jul 31, 2023
81b98f4
Merge branch 'master+tableau-cll' of github.com:acryldata/datahub-for…
siddiquebagwan-gslab Jul 31, 2023
4855380
Merge branch 'master+tableau-cll' of github.com:mohdsiddique/datahub …
siddiquebagwan-gslab Jul 31, 2023
96c91c1
Merge branch 'master' into master+tableau-cll
hsheth2 Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 185 additions & 52 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
Union,
cast,
)

import dateutil.parser as dp
import tableauserverclient as TSC
from pydantic import root_validator, validator
from pydantic.fields import Field
from requests.adapters import ConnectionError
from sqllineage.runner import LineageRunner
from tableauserverclient import (
PersonalAccessTokenAuth,
Server,
Expand Down Expand Up @@ -71,6 +81,7 @@
dashboard_graphql_query,
database_tables_graphql_query,
embedded_datasource_graphql_query,
get_overridden_info,
get_unique_custom_sql,
make_table_urn,
published_datasource_graphql_query,
Expand Down Expand Up @@ -124,6 +135,7 @@
ViewPropertiesClass,
)
from datahub.utilities import config_clean
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -871,15 +883,15 @@ def _create_upstream_table_lineage(
f"A total of {len(upstream_tables)} upstream table edges found for datasource {datasource[tableau_constant.ID]}"
)

if datasource.get(tableau_constant.FIELDS):
datasource_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datasource[tableau_constant.ID],
platform_instance=self.config.platform_instance,
env=self.config.env,
)
datasource_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datasource[tableau_constant.ID],
platform_instance=self.config.platform_instance,
env=self.config.env,
)

if self.config.extract_column_level_lineage:
if self.config.extract_column_level_lineage:
if datasource.get(tableau_constant.FIELDS):
# Find fine grained lineage for datasource column to datasource column edge,
# upstream columns may be from same datasource
upstream_fields = self.get_upstream_fields_of_field_in_datasource(
Expand All @@ -898,6 +910,12 @@ def _create_upstream_table_lineage(
logger.debug(
f"A total of {len(fine_grained_lineages)} upstream column edges found for datasource {datasource[tableau_constant.ID]}"
)
elif datasource.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL):
# Find upstream lineage from custom sql
upstream_fields = self.get_upstream_fields_from_custom_sql(
datasource, datasource_urn
)
fine_grained_lineages.extend(upstream_fields)

return upstream_tables, fine_grained_lineages

Expand Down Expand Up @@ -1140,6 +1158,57 @@ def get_upstream_fields_of_field_in_datasource(self, datasource, datasource_urn)
)
return fine_grained_lineages

def get_upstream_fields_from_custom_sql(
self, datasource: dict, datasource_urn: str
) -> List[FineGrainedLineage]:
fine_grained_lineages: List[FineGrainedLineage] = []

parsed_result = self.parse_custom_sql(
datasource=datasource,
datasource_urn=datasource_urn,
env=self.config.env,
platform=self.platform,
platform_instance=self.config.platform_instance,
func_overridden_info=None, # Here we don't want to override any information from configuration
)

if parsed_result is None:
logger.info(
f"Failed to extract column level lineage from datasource {datasource_urn}"
)
return fine_grained_lineages

cll: List[ColumnLineageInfo] = (
parsed_result.column_lineage
if parsed_result.column_lineage is not None
else []
)
for cll_info in cll:
downstream = (
[
builder.make_schema_field_urn(
datasource_urn, cll_info.downstream.column
)
]
if cll_info.downstream is not None
and cll_info.downstream.column is not None
else []
)
upstreams = [
builder.make_schema_field_urn(column_ref.table, column_ref.column)
for column_ref in cll_info.upstreams
]
fine_grained_lineages.append(
FineGrainedLineage(
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=downstream,
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=upstreams,
)
)

return fine_grained_lineages

def get_transform_operation(self, field):
field_type = field[tableau_constant.TYPE_NAME]
if field_type in (
Expand Down Expand Up @@ -1447,53 +1516,117 @@ def _create_lineage_to_upstream_tables(
aspect=upstream_lineage,
)

def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict
) -> Iterable[MetadataWorkUnit]:
database = csql.get(tableau_constant.DATABASE) or {}
def parse_custom_sql(
self,
datasource: dict,
datasource_urn: str,
platform: str,
env: str,
platform_instance: Optional[str],
func_overridden_info: Optional[
Callable[
[
str,
Optional[str],
Optional[Dict[str, str]],
Optional[TableauLineageOverrides],
],
Tuple[Optional[str], Optional[str], str, str],
]
],
) -> Optional["SqlParsingResult"]:

database_info = datasource.get(tableau_constant.DATABASE) or {}

if datasource.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False):
logger.debug(f"datasource {datasource_urn} is not created from custom sql")
return None

if (
csql.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL, False)
and tableau_constant.NAME in database
and tableau_constant.CONNECTION_TYPE in database
tableau_constant.NAME not in database_info
or tableau_constant.CONNECTION_TYPE not in database_info
):
upstream_tables = []
query = csql.get(tableau_constant.QUERY)
parser = LineageRunner(query)

try:
for table in parser.source_tables:
split_table = str(table).split(".")
if len(split_table) == 2:
datset = make_table_urn(
env=self.config.env,
upstream_db=database.get(tableau_constant.NAME),
connection_type=database.get(
tableau_constant.CONNECTION_TYPE, ""
),
schema=split_table[0],
full_name=split_table[1],
platform_instance_map=self.config.platform_instance_map,
lineage_overrides=self.config.lineage_overrides,
)
upstream_tables.append(
UpstreamClass(
type=DatasetLineageType.TRANSFORMED, dataset=datset
)
)
except Exception as e:
self.report.report_warning(
key="csql-lineage",
reason=f"Unable to retrieve lineage from query. "
f"Query: {query} "
f"Reason: {str(e)} ",
logger.debug(
f"database information is missing from datasource {datasource_urn}"
)
return None

query = datasource.get(tableau_constant.QUERY)
if query is None:
logger.debug(
f"raw sql query is not available for datasource {datasource_urn}"
)
return None

logger.debug(f"Parsing sql={query}")

upstream_db = database_info.get(tableau_constant.NAME)

if func_overridden_info is not None:
# Override the information as per configuration
upstream_db, platform_instance, platform, _ = func_overridden_info(
database_info[tableau_constant.CONNECTION_TYPE],
database_info.get(tableau_constant.NAME),
self.config.platform_instance_map,
self.config.lineage_overrides,
)

parsed_result: Optional["SqlParsingResult"] = None
try:
if self.ctx.graph is not None:
parsed_result = self.ctx.graph.parse_sql_lineage(
query,
default_db=upstream_db,
platform=platform,
platform_instance=platform_instance,
env=self.config.env,
)
upstream_lineage = UpstreamLineage(upstreams=upstream_tables)
yield self.get_metadata_change_proposal(
csql_urn,
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
except Exception as e:
self.report.report_warning(
key="csql-lineage",
reason=f"Unable to retrieve lineage from query. "
f"Query: {query} "
f"Reason: {str(e)} ",
)

return parsed_result

def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict
) -> Iterable[MetadataWorkUnit]:

parsed_result = self.parse_custom_sql(
datasource=csql,
datasource_urn=csql_urn,
env=self.config.env,
platform=self.platform,
platform_instance=self.config.platform_instance,
func_overridden_info=get_overridden_info,
)

if parsed_result is None:
logger.info(
f"Failed to extract table level lineage for datasource {csql_urn}"
)
return

upstream_tables = []

for dataset_urn in parsed_result.in_tables:
upstream_tables.append(
UpstreamClass(type=DatasetLineageType.TRANSFORMED, dataset=dataset_urn)
)

logger.debug(f"Upstream tables = {upstream_tables}")

upstream_lineage = UpstreamLineage(upstreams=upstream_tables)

yield self.get_metadata_change_proposal(
csql_urn,
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
)

def _get_schema_metadata_for_datasource(
self, datasource_fields: List[dict]
) -> Optional[SchemaMetadata]:
Expand Down
44 changes: 35 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import html
from functools import lru_cache
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple

from pydantic.fields import Field

Expand Down Expand Up @@ -574,15 +574,13 @@ def get_platform_instance(
return None


def make_table_urn(
env: str,
upstream_db: Optional[str],
def get_overridden_info(
connection_type: str,
schema: str,
full_name: str,
upstream_db: Optional[str],
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:
) -> Tuple[Optional[str], Optional[str], str, str]:

original_platform = platform = get_platform(connection_type)
if (
lineage_overrides is not None
Expand All @@ -599,10 +597,38 @@ def make_table_urn(
):
upstream_db = lineage_overrides.database_override_map[upstream_db]

platform_instance = get_platform_instance(original_platform, platform_instance_map)

if original_platform in ("athena", "hive", "mysql"): # Two tier databases
upstream_db = None

return upstream_db, platform_instance, platform, original_platform


def make_table_urn(
env: str,
upstream_db: Optional[str],
connection_type: str,
schema: str,
full_name: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:

upstream_db, platform_instance, platform, original_platform = get_overridden_info(
connection_type=connection_type,
upstream_db=upstream_db,
lineage_overrides=lineage_overrides,
platform_instance_map=platform_instance_map,
)

table_name = get_fully_qualified_table_name(
original_platform, upstream_db, schema, full_name
original_platform,
upstream_db if upstream_db is not None else "",
schema,
full_name,
)
platform_instance = get_platform_instance(original_platform, platform_instance_map)

return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
UpstreamLineage,
)
from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass
from datahub.utilities.sqlglot_lineage import SqlParsingResult
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
Expand Down Expand Up @@ -755,7 +756,14 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
config.lineage_overrides = TableauLineageOverrides(
database_override_map={"production database": "prod"}
)

context.graph.parse_sql_lineage.return_value = SqlParsingResult( # type:ignore
in_tables=[
"urn:li:dataset:(urn:li:dataPlatform:bigquery,invent_dw.userdetail,PROD)"
]
)
source = TableauSource(config=config, ctx=context)

lineage = source._create_lineage_from_unsupported_csql(
csql_urn="urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)",
csql={
Expand Down