Skip to content

Commit

Permalink
Merge branch 'master' into remove-legacy-snowflake-lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Aug 23, 2023
2 parents cfd34c3 + 68abf9c commit d22f6c0
Show file tree
Hide file tree
Showing 16 changed files with 773 additions and 316 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
tag: ${{ steps.tag.outputs.tag }}
needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.publish != 'true' }}
needs: [setup, datahub_ingestion_base_slim_build]
steps:
- name: Check out the repo
Expand Down Expand Up @@ -605,7 +606,7 @@ jobs:
uses: actions/checkout@v3
- name: Download image Slim Image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' }}
if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }}
- name: Run Trivy vulnerability scanner Slim Image
Expand All @@ -630,6 +631,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
tag: ${{ steps.tag.outputs.tag }}
needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.publish != 'true' }}
needs: [setup, datahub_ingestion_base_full_build]
steps:
- name: Check out the repo
Expand Down Expand Up @@ -685,7 +687,7 @@ jobs:
uses: actions/checkout@v3
- name: Download image Full Image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' }}
if: ${{ needs.datahub_ingestion_full_build.outputs.needs_artifact_download == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_full_build.outputs.tag }}
- name: Run Trivy vulnerability scanner Full Image
Expand Down Expand Up @@ -792,7 +794,7 @@ jobs:
image: ${{ env.DATAHUB_UPGRADE_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
- name: Download datahub-ingestion-slim image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' }}
if: ${{ needs.datahub_ingestion_slim_build.outputs.needs_artifact_download == 'true' }}
with:
image: ${{ env.DATAHUB_INGESTION_IMAGE }}:${{ needs.datahub_ingestion_slim_build.outputs.tag }}
- name: Disk Check
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def get_long_description():
"mypy==1.0.0",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
"pydantic>=1.9.0",
"pydantic>=1.10.0",
*test_api_requirements,
pytest_dep,
"pytest-asyncio>=0.16.0",
Expand Down
15 changes: 15 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ class DataPlatformPair:
powerbi_data_platform_name: str


@dataclass
class PowerBIPlatformDetail:
data_platform_pair: DataPlatformPair
data_platform_server: str


class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
Expand Down Expand Up @@ -382,6 +388,15 @@ class PowerBiDashboardSourceConfig(
description="The instance of the platform that all assets produced by this recipe belong to",
)

# Enable advance sql construct
enable_advance_lineage_sql_construct: bool = pydantic.Field(
default=False,
description="Whether to enable advance native sql construct for parsing like join, sub-queries. "
"along this flag , the native_query_parsing should be enabled. "
"By default convert_lineage_urns_to_lowercase is enabled, in-case if you have disabled it in previous ingestion execution then it may break lineage "
"as this option generates the upstream datasets URN in lowercase.",
)

@validator("dataset_type_mapping")
@classmethod
def map_data_platform(cls, value):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from datahub.ingestion.source.powerbi.config import (
PlatformDetail,
PowerBiDashboardSourceConfig,
PowerBIPlatformDetail,
)
from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable

logger = logging.getLogger(__name__)


class AbstractDataPlatformInstanceResolver(ABC):
@abstractmethod
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
pass

Expand All @@ -32,10 +32,10 @@ class ResolvePlatformInstanceFromDatasetTypeMapping(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[
dataplatform_table.data_platform_pair.powerbi_data_platform_name
data_platform_detail.data_platform_pair.powerbi_data_platform_name
]

if isinstance(platform, PlatformDetail):
Expand All @@ -48,13 +48,13 @@ class ResolvePlatformInstanceFromServerToPlatformInstance(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail:
return (
self.config.server_to_platform_instance[
dataplatform_table.datasource_server
data_platform_detail.data_platform_server
]
if dataplatform_table.datasource_server
if data_platform_detail.data_platform_server
in self.config.server_to_platform_instance
else PlatformDetail.parse_obj({})
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
from typing import List
from typing import List, Optional

import sqlparse

import datahub.utilities.sqlglot_lineage as sqlglot_l
from datahub.ingestion.api.common import PipelineContext
from datahub.utilities.sqlglot_lineage import SqlParsingResult

SPECIAL_CHARACTERS = ["#(lf)", "(lf)"]

logger = logging.getLogger()
Expand Down Expand Up @@ -45,3 +49,30 @@ def get_tables(native_query: str) -> List[str]:
from_index = from_index + 1

return tables


def parse_custom_sql(
ctx: PipelineContext,
query: str,
schema: Optional[str],
database: Optional[str],
platform: str,
env: str,
platform_instance: Optional[str],
) -> Optional["SqlParsingResult"]:

logger.debug("Using sqlglot_lineage to parse custom sql")

sql_query = remove_special_characters(query)

logger.debug(f"Parsing sql={sql_query}")

return sqlglot_l.create_lineage_sql_parsed_result(
query=sql_query,
schema=schema,
database=database,
platform=platform,
platform_instance=platform_instance,
env=env,
graph=ctx.graph,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
import lark
from lark import Lark, Tree

from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
)
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
)
from datahub.ingestion.source.powerbi.m_query import resolver, validator
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
Expand Down Expand Up @@ -45,7 +52,9 @@ def _parse_expression(expression: str) -> Tree:
def get_upstream_tables(
table: Table,
reporter: PowerBiDashboardSourceReport,
native_query_enabled: bool = True,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
parameters: Dict[str, str] = {},
) -> List[resolver.DataPlatformTable]:
if table.expression is None:
Expand All @@ -58,7 +67,7 @@ def get_upstream_tables(
parse_tree: Tree = _parse_expression(table.expression)

valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=native_query_enabled
parse_tree, native_query_enabled=config.native_query_parsing
)
if valid is False:
assert message is not None
Expand All @@ -84,7 +93,11 @@ def get_upstream_tables(
parse_tree=parse_tree,
reporter=reporter,
parameters=parameters,
).resolve_to_data_platform_table_list()
).resolve_to_data_platform_table_list(
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)

except BaseException as e:
reporter.report_warning(table.full_name, "Failed to process m-query expression")
Expand Down
Loading

0 comments on commit d22f6c0

Please sign in to comment.