Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/powerbi): fix issue with broken report lineage #10910

Merged
merged 20 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ def _add_patch(
# TODO: Validate that aspectName is a valid aspect for this entityType
self.patches[aspect_name].append(_Patch(op, path, value))

def add_patch(
self, aspect_name: str, op: str, path: Union[str, Sequence[str]], value: Any
) -> None:
return self._add_patch(
aspect_name=aspect_name,
op=op,
path=path,
value=value,
)

def build(self) -> Iterable[MetadataChangeProposalClass]:
return [
MetadataChangeProposalClass(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from typing import Iterable, Optional
from typing import ClassVar, Iterable, List, Optional, Union

from pydantic.fields import Field

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import datahub_guid, set_aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
ChartInfoClass,
DashboardInfoClass,
FineGrainedLineageClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
SystemMetadataClass,
UpstreamLineageClass,
)
Expand All @@ -19,6 +21,72 @@
from datahub.specific.dataset import DatasetPatchBuilder


class PatchEntityAspect:
SKIPPABLE_ATTRIBUTES: ClassVar[List[str]] = [
"ASPECT_INFO",
"ASPECT_NAME",
"ASPECT_TYPE",
"RECORD_SCHEMA",
]
aspect: Union[ChartInfoClass, DashboardInfoClass]
patch_builder: MetadataPatchProposal
attributes: List[str]

def __init__(
self,
# The PatchEntityAspect can patch any Aspect, however to silent the lint Union is added for DashboardInfoClass
# We can use it with any Aspect
aspect: Union[DashboardInfoClass, ChartInfoClass],
patch_builder: MetadataPatchProposal,
):
self.aspect = aspect
self.patch_builder = patch_builder
self.attributes = dir(self.aspect)

def is_attribute_includable(self, attribute_name: str) -> bool:
"""
a child class can override this to add additional attributes to skip while generating patch aspect
"""
if (
attribute_name.startswith("__")
or attribute_name.startswith("_")
or attribute_name in PatchEntityAspect.SKIPPABLE_ATTRIBUTES
):
return False

return True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify the is_attribute_includable method.

The method can be simplified by returning the negated condition directly.

-        if (
-            attribute_name.startswith("__")
-            or attribute_name.startswith("_")
-            or attribute_name in PatchEntityAspect.SKIPPABLE_ATTRIBUTES
-        ):
-            return False
-
-        return True
+        return not (
+            attribute_name.startswith("__")
+            or attribute_name.startswith("_")
+            or attribute_name in PatchEntityAspect.SKIPPABLE_ATTRIBUTES
+        )
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def is_attribute_includable(self, attribute_name: str) -> bool:
"""
a child class can override this to add additional attributes to skip while generating patch aspect
"""
if (
attribute_name.startswith("__")
or attribute_name.startswith("_")
or attribute_name in PatchEntityAspect.SKIPPABLE_ATTRIBUTES
):
return False
return True
def is_attribute_includable(self, attribute_name: str) -> bool:
"""
a child class can override this to add additional attributes to skip while generating patch aspect
"""
return not (
attribute_name.startswith("__")
or attribute_name.startswith("_")
or attribute_name in PatchEntityAspect.SKIPPABLE_ATTRIBUTES
)
Tools
Ruff

50-57: Return the negated condition directly

Inline condition

(SIM103)

def attribute_path(self, attribute_name: str) -> str:
"""
a child class can override this if path is not equal to attribute_name
"""
return f"/{attribute_name}"

def patch(self) -> Optional[MetadataChangeProposalClass]:
# filter property
properties = {
attr: getattr(self.aspect, attr)
for attr in self.attributes
if self.is_attribute_includable(attr)
and not callable(getattr(self.aspect, attr))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably too generic, and won't work for things that aren't simple attributes (e.g lists, property dicts). For now, please just add add_dashboard_title(...) and similar methods to the DashboardPatchBuilder, and call them from convert_dashboard_info_to_patch

I do like the idea though - but given the complexities / edge cases, it will probably make sense to use code generation to make it generic

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


for property_ in properties:
if properties[property_]:
self.patch_builder.add_patch(
aspect_name=self.aspect.ASPECT_NAME,
op="add",
path=self.attribute_path(property_),
value=properties[property_],
)

mcps: List[MetadataChangeProposalClass] = list(self.patch_builder.build())
if mcps:
return mcps[0]

return None


def convert_upstream_lineage_to_patch(
urn: str,
aspect: UpstreamLineageClass,
Expand All @@ -33,56 +101,44 @@ def convert_upstream_lineage_to_patch(
return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp)


def convert_chart_info_to_patch(
urn: str, aspect: ChartInfoClass, system_metadata: Optional[SystemMetadataClass]
def create_mw_for_patch_aspect(
patch_entity_aspect: PatchEntityAspect,
) -> Optional[MetadataWorkUnit]:
patch_builder = ChartPatchBuilder(urn, system_metadata)

if aspect.customProperties:
for key in aspect.customProperties:
patch_builder.add_custom_property(
key, str(aspect.customProperties.get(key))
)

if aspect.inputEdges:
for inputEdge in aspect.inputEdges:
patch_builder.add_input_edge(inputEdge)
mcp: Optional[MetadataChangeProposalClass] = patch_entity_aspect.patch()

values = patch_builder.build()
if values:
mcp = next(iter(values))
if mcp:
return MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp
)

return None


def convert_chart_info_to_patch(
urn: str, aspect: ChartInfoClass, system_metadata: Optional[SystemMetadataClass]
) -> Optional[MetadataWorkUnit]:
patch_builder = ChartPatchBuilder(urn, system_metadata)

patch_entity_aspect: PatchEntityAspect = PatchEntityAspect(
aspect=aspect,
patch_builder=patch_builder,
)

return create_mw_for_patch_aspect(patch_entity_aspect=patch_entity_aspect)


def convert_dashboard_info_to_patch(
urn: str, aspect: DashboardInfoClass, system_metadata: Optional[SystemMetadataClass]
) -> Optional[MetadataWorkUnit]:
patch_builder = DashboardPatchBuilder(urn, system_metadata)

if aspect.customProperties:
for key in aspect.customProperties:
patch_builder.add_custom_property(
key, str(aspect.customProperties.get(key))
)

if aspect.datasetEdges:
for datasetEdge in aspect.datasetEdges:
patch_builder.add_dataset_edge(datasetEdge)

if aspect.chartEdges:
for chartEdge in aspect.chartEdges:
patch_builder.add_chart_edge(chartEdge)
patch_entity_aspect: PatchEntityAspect = PatchEntityAspect(
aspect=aspect,
patch_builder=patch_builder,
)

values = patch_builder.build()
if values:
mcp = next(iter(values))
return MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp
)
return None
return create_mw_for_patch_aspect(patch_entity_aspect=patch_entity_aspect)


def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,8 +1197,7 @@ def report_to_datahub_work_units(
) -> Iterable[MetadataWorkUnit]:
mcps: List[MetadataChangeProposalWrapper] = []

logger.debug(f"Converting dashboard={report.name} to datahub dashboard")

logger.debug(f"Converting report={report.name} to datahub dashboard")
# Convert user to CorpUser
user_mcps = self.to_datahub_users(report.users)
# Convert pages to charts. A report has single dataset and same dataset used in pages to create visualization
Expand All @@ -1215,9 +1214,7 @@ def report_to_datahub_work_units(
mcps.extend(chart_mcps)
mcps.extend(report_mcps)

# Convert MCP to work_units
work_units = map(self._to_work_unit, mcps)
return work_units
return map(self._to_work_unit, mcps)


@platform_name("PowerBI")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,18 +300,49 @@
"json": [
{
"op": "add",
"path": "/customProperties/chartCount",
"value": "2"
"path": "/charts",
"value": [
"urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)"
]
},
{
"op": "add",
"path": "/customProperties/workspaceName",
"value": "demo-workspace"
"path": "/customProperties",
"value": {
"chartCount": "2",
"workspaceName": "demo-workspace",
"workspaceId": "64ED5CAD-7C10-4684-8180-826122881108"
}
},
{
"op": "add",
"path": "/dashboardUrl",
"value": "https://localhost/dashboards/web/1"
},
{
"op": "add",
"path": "/description",
"value": "Description of test dashboard"
},
{
"op": "add",
"path": "/lastModified",
"value": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
},
{
"op": "add",
"path": "/customProperties/workspaceId",
"value": "64ED5CAD-7C10-4684-8180-826122881108"
"path": "/title",
"value": "test_dashboard"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,18 +1214,44 @@
"json": [
{
"op": "add",
"path": "/customProperties/chartCount",
"value": "2"
"path": "/charts",
"value": [
"urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)"
]
},
{
"op": "add",
"path": "/customProperties/workspaceName",
"value": "demo-workspace"
"path": "/customProperties",
"value": {
"chartCount": "2",
"workspaceName": "demo-workspace",
"workspaceId": "64ED5CAD-7C10-4684-8180-826122881108"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

patches have very specific formats, and I don't think GMS will actually accept this formatting correctly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the implementation

}
},
{
"op": "add",
"path": "/dashboardUrl",
"value": "https://localhost/dashboards/web/1"
},
{
"op": "add",
"path": "/customProperties/workspaceId",
"value": "64ED5CAD-7C10-4684-8180-826122881108"
"path": "/lastModified",
"value": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
},
{
"op": "add",
"path": "/title",
"value": "test_dashboard"
}
]
},
Expand Down Expand Up @@ -1951,6 +1977,50 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,reports.5b218778-e7a5-4d73-8187-f10824047715)",
"changeType": "PATCH",
"aspectName": "dashboardInfo",
"aspect": {
"json": [
{
"op": "add",
"path": "/dashboardUrl",
"value": "https://app.powerbi.com/groups/f089354e-8366-4e18-aea3-4cb4a3a50b48/reports/5b218778-e7a5-4d73-8187-f10824047715"
},
{
"op": "add",
"path": "/description",
"value": "Acryl sales marketing report"
},
{
"op": "add",
"path": "/lastModified",
"value": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
},
{
"op": "add",
"path": "/title",
"value": "SalesMarketing"
}
]
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,reports.5b218778-e7a5-4d73-8187-f10824047715)",
Expand Down
Loading
Loading