Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/powerbi): fix issue with broken report lineage #10910

Merged
merged 20 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Iterable, Optional

from pydantic.fields import Field
Expand All @@ -18,6 +19,8 @@
from datahub.specific.dashboard import DashboardPatchBuilder
from datahub.specific.dataset import DatasetPatchBuilder

logger = logging.getLogger(__name__)


def convert_upstream_lineage_to_patch(
urn: str,
Expand Down Expand Up @@ -48,6 +51,20 @@ def convert_chart_info_to_patch(
for inputEdge in aspect.inputEdges:
patch_builder.add_input_edge(inputEdge)

patch_builder.set_chart_url(aspect.chartUrl).set_external_url(
aspect.externalUrl
).set_type(aspect.type).set_title(aspect.title).set_access(
aspect.access
).set_last_modified(
aspect.lastModified
).set_last_refreshed(
aspect.lastRefreshed
).set_description(
aspect.description
).add_inputs(
aspect.inputs
)

values = patch_builder.build()
if values:
mcp = next(iter(values))
Expand Down Expand Up @@ -76,8 +93,36 @@ def convert_dashboard_info_to_patch(
for chartEdge in aspect.chartEdges:
patch_builder.add_chart_edge(chartEdge)

if aspect.title:
patch_builder.set_title(aspect.title)

if aspect.description:
patch_builder.set_description(aspect.description)

if aspect.charts:
patch_builder.add_charts(aspect.charts)

if aspect.dashboardUrl:
patch_builder.set_dashboard_url(aspect.dashboardUrl)

if aspect.datasets:
patch_builder.add_datasets(aspect.datasets)

if aspect.access:
patch_builder.set_access(aspect.access)

if aspect.lastRefreshed:
patch_builder.set_last_refreshed(aspect.lastRefreshed)

if aspect.lastModified:
patch_builder.set_last_modified(last_modified=aspect.lastModified)

values = patch_builder.build()

if values:
logger.debug(
f"Generating patch DashboardInfo MetadataWorkUnit for dashboard {aspect.title}"
)
mcp = next(iter(values))
return MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,11 @@ class PowerBiDashboardSourceConfig(
)
profiling: PowerBiProfilingConfig = PowerBiProfilingConfig()

patch_metadata: bool = pydantic.Field(
default=True,
description="Patch dashboard metadata",
)

@root_validator(skip_on_failure=True)
def validate_extract_column_level_lineage(cls, values: Dict) -> Dict:
flags = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,8 +1197,7 @@ def report_to_datahub_work_units(
) -> Iterable[MetadataWorkUnit]:
mcps: List[MetadataChangeProposalWrapper] = []

logger.debug(f"Converting dashboard={report.name} to datahub dashboard")

logger.debug(f"Converting report={report.name} to datahub dashboard")
# Convert user to CorpUser
user_mcps = self.to_datahub_users(report.users)
# Convert pages to charts. A report has single dataset and same dataset used in pages to create visualization
Expand All @@ -1215,9 +1214,7 @@ def report_to_datahub_work_units(
mcps.extend(chart_mcps)
mcps.extend(report_mcps)

# Convert MCP to work_units
work_units = map(self._to_work_unit, mcps)
return work_units
return map(self._to_work_unit, mcps)


@platform_name("PowerBI")
Expand Down Expand Up @@ -1385,7 +1382,7 @@ def _get_dashboard_patch_work_unit(
DashboardInfoClass
] = work_unit.get_aspect_of_type(DashboardInfoClass)

if dashboard_info_aspect:
if dashboard_info_aspect and self.source_config.patch_metadata:
return convert_dashboard_info_to_patch(
work_unit.get_urn(),
dashboard_info_aspect,
Expand Down
108 changes: 108 additions & 0 deletions metadata-ingestion/src/datahub/specific/chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
AccessLevelClass,
AuditStampClass,
ChangeAuditStampsClass,
ChartInfoClass as ChartInfo,
ChartTypeClass,
EdgeClass as Edge,
GlobalTagsClass as GlobalTags,
GlossaryTermAssociationClass as Term,
Expand Down Expand Up @@ -311,3 +314,108 @@ def remove_custom_property(self, key: str) -> "ChartPatchBuilder":
"""
self.custom_properties_patch_helper.remove_property(key)
return self

def set_title(self, title: str) -> "ChartPatchBuilder":
assert title, "ChartInfo title should not be None"
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/title",
value=title,
)

return self

def set_description(self, description: str) -> "ChartPatchBuilder":
assert description, "DashboardInfo description should not be None"
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/description",
value=description,
)

return self

def set_last_refreshed(self, last_refreshed: Optional[int]) -> "ChartPatchBuilder":
if last_refreshed:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/lastRefreshed",
value=last_refreshed,
)

return self

def set_last_modified(
self, last_modified: "ChangeAuditStampsClass"
) -> "ChartPatchBuilder":
if last_modified:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/lastModified",
value=last_modified,
)

return self

def set_external_url(self, external_url: Optional[str]) -> "ChartPatchBuilder":
if external_url:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/externalUrl",
value=external_url,
)
return self

def set_chart_url(self, dashboard_url: Optional[str]) -> "ChartPatchBuilder":
if dashboard_url:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/chartUrl",
value=dashboard_url,
)

return self

def set_type(
self, type: Union[None, Union[str, "ChartTypeClass"]] = None
) -> "ChartPatchBuilder":
if type:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/type",
value=type,
)

return self

def set_access(
self, access: Union[None, Union[str, "AccessLevelClass"]] = None
) -> "ChartPatchBuilder":
if access:
self._add_patch(
ChartInfo.ASPECT_NAME,
"add",
path="/access",
value=access,
)

return self

def add_inputs(self, input_urns: Optional[List[str]]) -> "ChartPatchBuilder":
if input_urns:
for urn in input_urns:
self._add_patch(
aspect_name=ChartInfo.ASPECT_NAME,
op="add",
path=f"/inputs/{urn}",
value=urn,
)

return self
122 changes: 122 additions & 0 deletions metadata-ingestion/src/datahub/specific/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
AccessLevelClass,
AuditStampClass,
ChangeAuditStampsClass,
DashboardInfoClass as DashboardInfo,
EdgeClass as Edge,
GlobalTagsClass as GlobalTags,
Expand Down Expand Up @@ -405,3 +407,123 @@ def remove_custom_property(self, key: str) -> "DashboardPatchBuilder":
"""
self.custom_properties_patch_helper.remove_property(key)
return self

def set_title(self, title: str) -> "DashboardPatchBuilder":
assert title, "DashboardInfo title should not be None"
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/title",
value=title,
)

return self

def set_description(self, description: str) -> "DashboardPatchBuilder":
assert description, "DashboardInfo description should not be None"
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/description",
value=description,
)

return self

def add_custom_properties(
self, custom_properties: Optional[Dict[str, str]] = None
) -> "DashboardPatchBuilder":

if custom_properties:
for key, value in custom_properties.items():
self.custom_properties_patch_helper.add_property(key, value)

return self

def set_external_url(self, external_url: Optional[str]) -> "DashboardPatchBuilder":
if external_url:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/externalUrl",
value=external_url,
)
return self

def add_charts(self, chart_urns: Optional[List[str]]) -> "DashboardPatchBuilder":
if chart_urns:
for urn in chart_urns:
self._add_patch(
aspect_name=DashboardInfo.ASPECT_NAME,
op="add",
path=f"/charts/{urn}",
value=urn,
)

return self

def add_datasets(
self, dataset_urns: Optional[List[str]]
) -> "DashboardPatchBuilder":
if dataset_urns:
for urn in dataset_urns:
self._add_patch(
aspect_name=DashboardInfo.ASPECT_NAME,
op="add",
path=f"/datasets/{urn}",
value=urn,
)

return self

def set_dashboard_url(
self, dashboard_url: Optional[str]
) -> "DashboardPatchBuilder":
if dashboard_url:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/dashboardUrl",
value=dashboard_url,
)

return self

def set_access(
self, access: Union[None, Union[str, "AccessLevelClass"]] = None
) -> "DashboardPatchBuilder":
if access:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/access",
value=access,
)

return self

def set_last_refreshed(
self, last_refreshed: Optional[int]
) -> "DashboardPatchBuilder":
if last_refreshed:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/lastRefreshed",
value=last_refreshed,
)

return self

def set_last_modified(
self, last_modified: "ChangeAuditStampsClass"
) -> "DashboardPatchBuilder":
if last_modified:
self._add_patch(
DashboardInfo.ASPECT_NAME,
"add",
path="/lastModified",
value=last_modified,
)

return self
Loading
Loading