Skip to content

Commit

Permalink
fix(ingestion/powerbi): fix issue with broken report lineage (#10910)
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored Jul 31, 2024
1 parent 0667470 commit dffdef2
Show file tree
Hide file tree
Showing 22 changed files with 2,302 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Iterable, Optional

from pydantic.fields import Field
Expand All @@ -18,6 +19,8 @@
from datahub.specific.dashboard import DashboardPatchBuilder
from datahub.specific.dataset import DatasetPatchBuilder

logger = logging.getLogger(__name__)


def convert_upstream_lineage_to_patch(
urn: str,
Expand Down Expand Up @@ -48,6 +51,20 @@ def convert_chart_info_to_patch(
for inputEdge in aspect.inputEdges:
patch_builder.add_input_edge(inputEdge)

patch_builder.set_chart_url(aspect.chartUrl).set_external_url(
aspect.externalUrl
).set_type(aspect.type).set_title(aspect.title).set_access(
aspect.access
).set_last_modified(
aspect.lastModified
).set_last_refreshed(
aspect.lastRefreshed
).set_description(
aspect.description
).add_inputs(
aspect.inputs
)

values = patch_builder.build()
if values:
mcp = next(iter(values))
Expand Down Expand Up @@ -76,8 +93,36 @@ def convert_dashboard_info_to_patch(
for chartEdge in aspect.chartEdges:
patch_builder.add_chart_edge(chartEdge)

if aspect.title:
patch_builder.set_title(aspect.title)

if aspect.description:
patch_builder.set_description(aspect.description)

if aspect.charts:
patch_builder.add_charts(aspect.charts)

if aspect.dashboardUrl:
patch_builder.set_dashboard_url(aspect.dashboardUrl)

if aspect.datasets:
patch_builder.add_datasets(aspect.datasets)

if aspect.access:
patch_builder.set_access(aspect.access)

if aspect.lastRefreshed:
patch_builder.set_last_refreshed(aspect.lastRefreshed)

if aspect.lastModified:
patch_builder.set_last_modified(last_modified=aspect.lastModified)

values = patch_builder.build()

if values:
logger.debug(
f"Generating patch DashboardInfo MetadataWorkUnit for dashboard {aspect.title}"
)
mcp = next(iter(values))
return MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,11 @@ class PowerBiDashboardSourceConfig(
)
profiling: PowerBiProfilingConfig = PowerBiProfilingConfig()

patch_metadata: bool = pydantic.Field(
default=True,
description="Patch dashboard metadata",
)

@root_validator(skip_on_failure=True)
def validate_extract_column_level_lineage(cls, values: Dict) -> Dict:
flags = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,8 +1197,7 @@ def report_to_datahub_work_units(
) -> Iterable[MetadataWorkUnit]:
mcps: List[MetadataChangeProposalWrapper] = []

logger.debug(f"Converting dashboard={report.name} to datahub dashboard")

logger.debug(f"Converting report={report.name} to datahub dashboard")
# Convert user to CorpUser
user_mcps = self.to_datahub_users(report.users)
# Convert pages to charts. A report has single dataset and same dataset used in pages to create visualization
Expand All @@ -1215,9 +1214,7 @@ def report_to_datahub_work_units(
mcps.extend(chart_mcps)
mcps.extend(report_mcps)

# Convert MCP to work_units
work_units = map(self._to_work_unit, mcps)
return work_units
return map(self._to_work_unit, mcps)


@platform_name("PowerBI")
Expand Down Expand Up @@ -1385,7 +1382,7 @@ def _get_dashboard_patch_work_unit(
DashboardInfoClass
] = work_unit.get_aspect_of_type(DashboardInfoClass)

if dashboard_info_aspect:
if dashboard_info_aspect and self.source_config.patch_metadata:
return convert_dashboard_info_to_patch(
work_unit.get_urn(),
dashboard_info_aspect,
Expand Down
108 changes: 108 additions & 0 deletions metadata-ingestion/src/datahub/specific/chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
AccessLevelClass,
AuditStampClass,
ChangeAuditStampsClass,
ChartInfoClass as ChartInfo,
ChartTypeClass,
EdgeClass as Edge,
GlobalTagsClass as GlobalTags,
GlossaryTermAssociationClass as Term,
Expand Down Expand Up @@ -311,3 +314,108 @@ def remove_custom_property(self, key: str) -> "ChartPatchBuilder":
"""
self.custom_properties_patch_helper.remove_property(key)
return self

def set_title(self, title: str) -> "ChartPatchBuilder":
assert title, "ChartInfo title should not be None"
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/title",
value=title,
)

return self

def set_description(self, description: str) -> "ChartPatchBuilder":
assert description, "DashboardInfo description should not be None"
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/description",
value=description,
)

return self

def set_last_refreshed(self, last_refreshed: Optional[int]) -> "ChartPatchBuilder":
if last_refreshed:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/lastRefreshed",
value=last_refreshed,
)

return self

def set_last_modified(
self, last_modified: "ChangeAuditStampsClass"
) -> "ChartPatchBuilder":
if last_modified:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/lastModified",
value=last_modified,
)

return self

def set_external_url(self, external_url: Optional[str]) -> "ChartPatchBuilder":
if external_url:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/externalUrl",
value=external_url,
)
return self

def set_chart_url(self, dashboard_url: Optional[str]) -> "ChartPatchBuilder":
if dashboard_url:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/chartUrl",
value=dashboard_url,
)

return self

def set_type(
self, type: Union[None, Union[str, "ChartTypeClass"]] = None
) -> "ChartPatchBuilder":
if type:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/type",
value=type,
)

return self

def set_access(
self, access: Union[None, Union[str, "AccessLevelClass"]] = None
) -> "ChartPatchBuilder":
if access:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/access",
value=access,
)

return self

def add_inputs(self, input_urns: Optional[List[str]]) -> "ChartPatchBuilder":
if input_urns:
for urn in input_urns:
self._add_patch(
aspect_name=ChartInfo.ASPECT_NAME,
op="add",
path=f"/inputs/{urn}",
value=urn,
)

return self
122 changes: 122 additions & 0 deletions metadata-ingestion/src/datahub/specific/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
AccessLevelClass,
AuditStampClass,
ChangeAuditStampsClass,
DashboardInfoClass as DashboardInfo,
EdgeClass as Edge,
GlobalTagsClass as GlobalTags,
Expand Down Expand Up @@ -405,3 +407,123 @@ def remove_custom_property(self, key: str) -> "DashboardPatchBuilder":
"""
self.custom_properties_patch_helper.remove_property(key)
return self

def set_title(self, title: str) -> "DashboardPatchBuilder":
assert title, "DashboardInfo title should not be None"
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/title",
value=title,
)

return self

def set_description(self, description: str) -> "DashboardPatchBuilder":
assert description, "DashboardInfo description should not be None"
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/description",
value=description,
)

return self

def add_custom_properties(
self, custom_properties: Optional[Dict[str, str]] = None
) -> "DashboardPatchBuilder":

if custom_properties:
for key, value in custom_properties.items():
self.custom_properties_patch_helper.add_property(key, value)

return self

def set_external_url(self, external_url: Optional[str]) -> "DashboardPatchBuilder":
if external_url:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/externalUrl",
value=external_url,
)
return self

def add_charts(self, chart_urns: Optional[List[str]]) -> "DashboardPatchBuilder":
if chart_urns:
for urn in chart_urns:
self._add_patch(
aspect_name=DashboardInfo.ASPECT_NAME,
op="add",
path=f"/charts/{urn}",
value=urn,
)

return self

def add_datasets(
self, dataset_urns: Optional[List[str]]
) -> "DashboardPatchBuilder":
if dataset_urns:
for urn in dataset_urns:
self._add_patch(
aspect_name=DashboardInfo.ASPECT_NAME,
op="add",
path=f"/datasets/{urn}",
value=urn,
)

return self

def set_dashboard_url(
self, dashboard_url: Optional[str]
) -> "DashboardPatchBuilder":
if dashboard_url:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/dashboardUrl",
value=dashboard_url,
)

return self

def set_access(
self, access: Union[None, Union[str, "AccessLevelClass"]] = None
) -> "DashboardPatchBuilder":
if access:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/access",
value=access,
)

return self

def set_last_refreshed(
self, last_refreshed: Optional[int]
) -> "DashboardPatchBuilder":
if last_refreshed:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/lastRefreshed",
value=last_refreshed,
)

return self

def set_last_modified(
self, last_modified: "ChangeAuditStampsClass"
) -> "DashboardPatchBuilder":
if last_modified:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/lastModified",
value=last_modified,
)

return self
Loading

0 comments on commit dffdef2

Please sign in to comment.