Skip to content

Commit

Permalink
feat(ingestion/powerbi): support multiple tables as upstream in nativ…
Browse files Browse the repository at this point in the history
…e SQL parsing (#8592)
  • Loading branch information
siddiquebagwan-gslab authored Aug 23, 2023
1 parent 4116716 commit 8ee58af
Show file tree
Hide file tree
Showing 10 changed files with 714 additions and 281 deletions.
15 changes: 15 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ class DataPlatformPair:
powerbi_data_platform_name: str


@dataclass
class PowerBIPlatformDetail:
data_platform_pair: DataPlatformPair
data_platform_server: str


class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
Expand Down Expand Up @@ -382,6 +388,15 @@ class PowerBiDashboardSourceConfig(
description="The instance of the platform that all assets produced by this recipe belong to",
)

# Enable advance sql construct
enable_advance_lineage_sql_construct: bool = pydantic.Field(
default=False,
description="Whether to enable advance native sql construct for parsing like join, sub-queries. "
"along this flag , the native_query_parsing should be enabled. "
"By default convert_lineage_urns_to_lowercase is enabled, in-case if you have disabled it in previous ingestion execution then it may break lineage "
"as this option generates the upstream datasets URN in lowercase.",
)

@validator("dataset_type_mapping")
@classmethod
def map_data_platform(cls, value):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from datahub.ingestion.source.powerbi.config import (
PlatformDetail,
PowerBiDashboardSourceConfig,
PowerBIPlatformDetail,
)
from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable

logger = logging.getLogger(__name__)


class AbstractDataPlatformInstanceResolver(ABC):
@abstractmethod
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
pass

Expand All @@ -32,10 +32,10 @@ class ResolvePlatformInstanceFromDatasetTypeMapping(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[
dataplatform_table.data_platform_pair.powerbi_data_platform_name
data_platform_detail.data_platform_pair.powerbi_data_platform_name
]

if isinstance(platform, PlatformDetail):
Expand All @@ -48,13 +48,13 @@ class ResolvePlatformInstanceFromServerToPlatformInstance(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
return (
self.config.server_to_platform_instance[
dataplatform_table.datasource_server
data_platform_detail.data_platform_server
]
if dataplatform_table.datasource_server
if data_platform_detail.data_platform_server
in self.config.server_to_platform_instance
else PlatformDetail.parse_obj({})
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
from typing import List
from typing import List, Optional

import sqlparse

import datahub.utilities.sqlglot_lineage as sqlglot_l
from datahub.ingestion.api.common import PipelineContext
from datahub.utilities.sqlglot_lineage import SqlParsingResult

SPECIAL_CHARACTERS = ["#(lf)", "(lf)"]

logger = logging.getLogger()
Expand Down Expand Up @@ -45,3 +49,30 @@ def get_tables(native_query: str) -> List[str]:
from_index = from_index + 1

return tables


def parse_custom_sql(
ctx: PipelineContext,
query: str,
schema: Optional[str],
database: Optional[str],
platform: str,
env: str,
platform_instance: Optional[str],
) -> Optional["SqlParsingResult"]:

logger.debug("Using sqlglot_lineage to parse custom sql")

sql_query = remove_special_characters(query)

logger.debug(f"Parsing sql={sql_query}")

return sqlglot_l.create_lineage_sql_parsed_result(
query=sql_query,
schema=schema,
database=database,
platform=platform,
platform_instance=platform_instance,
env=env,
graph=ctx.graph,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
import lark
from lark import Lark, Tree

from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
)
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
)
from datahub.ingestion.source.powerbi.m_query import resolver, validator
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
Expand Down Expand Up @@ -45,7 +52,9 @@ def _parse_expression(expression: str) -> Tree:
def get_upstream_tables(
table: Table,
reporter: PowerBiDashboardSourceReport,
native_query_enabled: bool = True,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
parameters: Dict[str, str] = {},
) -> List[resolver.DataPlatformTable]:
if table.expression is None:
Expand All @@ -58,7 +67,7 @@ def get_upstream_tables(
parse_tree: Tree = _parse_expression(table.expression)

valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=native_query_enabled
parse_tree, native_query_enabled=config.native_query_parsing
)
if valid is False:
assert message is not None
Expand All @@ -84,7 +93,11 @@ def get_upstream_tables(
parse_tree=parse_tree,
reporter=reporter,
parameters=parameters,
).resolve_to_data_platform_table_list()
).resolve_to_data_platform_table_list(
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)

except BaseException as e:
reporter.report_warning(table.full_name, "Failed to process m-query expression")
Expand Down
Loading

0 comments on commit 8ee58af

Please sign in to comment.