Skip to content

Commit

Permalink
fix(ingestion/nifi): Fix for incremental lineage ingestion for nifi (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored Oct 4, 2024
1 parent 67ff486 commit 134ad21
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 1 deletion.
22 changes: 22 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,14 @@ def __init__(self) -> None:
}

def process_s3_provenance_event(self, event):
logger.debug(f"Processing s3 provenance event: {event}")
attributes = event.get("attributes", [])
s3_bucket = get_attribute_value(attributes, "s3.bucket")
s3_key = get_attribute_value(attributes, "s3.key")
if not s3_key:
logger.debug(
"s3.key not present in the list of attributes, trying to use filename attribute instead"
)
s3_key = get_attribute_value(attributes, "filename")

s3_url = f"s3://{s3_bucket}/{s3_key}"
Expand All @@ -344,6 +348,7 @@ def process_s3_provenance_event(self, event):
dataset_name = s3_path.replace("/", ".")
platform = "s3"
dataset_urn = builder.make_dataset_urn(platform, s3_path, self.env)
logger.debug(f"Reasoned s3 dataset urn: {dataset_urn}")
return ExternalDataset(
platform,
dataset_name,
Expand Down Expand Up @@ -910,6 +915,11 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
)

for component in self.nifi_flow.components.values():
logger.debug(
f"Beginng construction of workunits for component {component.id} of type {component.type} and name {component.name}"
)
logger.debug(f"Inlets of the component: {component.inlets.keys()}")
logger.debug(f"Outlets of the component: {component.outlets.keys()}")
job_name = component.name
job_urn = builder.make_data_job_urn_with_flow(flow_urn, component.id)

Expand Down Expand Up @@ -937,6 +947,9 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
jobProperties["last_event_time"] = component.last_event_time

for dataset in component.inlets.values():
logger.debug(
f"Yielding dataset workunits for {dataset.dataset_urn} (inlet)"
)
yield from self.construct_dataset_workunits(
dataset.platform,
dataset.dataset_name,
Expand All @@ -945,6 +958,9 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
)

for dataset in component.outlets.values():
logger.debug(
f"Yielding dataset workunits for {dataset.dataset_urn} (outlet)"
)
yield from self.construct_dataset_workunits(
dataset.platform,
dataset.dataset_name,
Expand Down Expand Up @@ -1207,6 +1223,7 @@ def construct_job_workunits(
inputJobs: List[str] = [],
status: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
logger.debug(f"Begining construction of job workunit for {job_urn}")
if job_properties:
job_properties = {k: v for k, v in job_properties.items() if v is not None}

Expand All @@ -1229,8 +1246,12 @@ def construct_job_workunits(
inlets.sort()
outlets.sort()
inputJobs.sort()
logger.debug(f"Inlets after sorting: {inlets}")
logger.debug(f"Outlets after sorting: {outlets}")
logger.debug(f"Input jobs after sorting: {inputJobs}")

if self.config.incremental_lineage:
logger.debug("Preparing mcps for incremental lineage")
patch_builder: DataJobPatchBuilder = DataJobPatchBuilder(job_urn)
for inlet in inlets:
patch_builder.add_input_dataset(inlet)
Expand All @@ -1239,6 +1260,7 @@ def construct_job_workunits(
for inJob in inputJobs:
patch_builder.add_input_datajob(inJob)
for patch_mcp in patch_builder.build():
logger.debug(f"Preparing Patch MCP: {patch_mcp}")
yield MetadataWorkUnit(
id=f"{job_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp
)
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/specific/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def add_output_dataset(
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetEdges/{self.quote(str(input))}",
path=f"/outputDatasetEdges/{self.quote(str(output))}",
value=output_edge,
)
return self
Expand Down
87 changes: 87 additions & 0 deletions metadata-ingestion/tests/unit/patch/test_patch_builder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json
import pathlib

import pytest
from freezegun.api import freeze_time

from datahub.emitter.mce_builder import (
make_chart_urn,
make_dashboard_urn,
make_data_flow_urn,
make_data_job_urn_with_flow,
make_dataset_urn,
make_schema_field_urn,
make_tag_urn,
Expand All @@ -22,6 +26,7 @@
)
from datahub.specific.chart import ChartPatchBuilder
from datahub.specific.dashboard import DashboardPatchBuilder
from datahub.specific.datajob import DataJobPatchBuilder
from datahub.specific.dataset import DatasetPatchBuilder
from tests.test_helpers import mce_helpers

Expand Down Expand Up @@ -175,3 +180,85 @@ def test_basic_dashboard_patch_builder():
),
),
]


@freeze_time("2020-04-14 07:00:00")
def test_datajob_patch_builder():
flow_urn = make_data_flow_urn(
orchestrator="nifi", flow_id="252C34e5af19-0192-1000-b248-b1abee565b5d"
)
job_urn = make_data_job_urn_with_flow(
flow_urn, "5ca6fee7-0192-1000-f206-dfbc2b0d8bfb"
)
patcher = DataJobPatchBuilder(job_urn)

patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
)
patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
)
patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
)

assert patcher.build() == [
MetadataChangeProposalClass(
entityType="dataJob",
entityUrn="urn:li:dataJob:(urn:li:dataFlow:(nifi,252C34e5af19-0192-1000-b248-b1abee565b5d,prod),5ca6fee7-0192-1000-f206-dfbc2b0d8bfb)",
changeType="PATCH",
aspectName="dataJobInputOutput",
aspect=GenericAspectClass(
value=json.dumps(
[
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder1,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder3,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder2,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
},
]
).encode("utf-8"),
contentType="application/json-patch+json",
),
)
]

0 comments on commit 134ad21

Please sign in to comment.