diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index 25781cd2f1dcc..7072ebf6473df 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -332,10 +332,14 @@ def __init__(self) -> None: } def process_s3_provenance_event(self, event): + logger.debug(f"Processing s3 provenance event: {event}") attributes = event.get("attributes", []) s3_bucket = get_attribute_value(attributes, "s3.bucket") s3_key = get_attribute_value(attributes, "s3.key") if not s3_key: + logger.debug( + "s3.key not present in the list of attributes, trying to use filename attribute instead" + ) s3_key = get_attribute_value(attributes, "filename") s3_url = f"s3://{s3_bucket}/{s3_key}" @@ -344,6 +348,7 @@ def process_s3_provenance_event(self, event): dataset_name = s3_path.replace("/", ".") platform = "s3" dataset_urn = builder.make_dataset_urn(platform, s3_path, self.env) + logger.debug(f"Reasoned s3 dataset urn: {dataset_urn}") return ExternalDataset( platform, dataset_name, @@ -910,6 +915,11 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 ) for component in self.nifi_flow.components.values(): + logger.debug( + f"Beginng construction of workunits for component {component.id} of type {component.type} and name {component.name}" + ) + logger.debug(f"Inlets of the component: {component.inlets.keys()}") + logger.debug(f"Outlets of the component: {component.outlets.keys()}") job_name = component.name job_urn = builder.make_data_job_urn_with_flow(flow_urn, component.id) @@ -937,6 +947,9 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 jobProperties["last_event_time"] = component.last_event_time for dataset in component.inlets.values(): + logger.debug( + f"Yielding dataset workunits for {dataset.dataset_urn} (inlet)" + ) yield from self.construct_dataset_workunits( dataset.platform, dataset.dataset_name, @@ -945,6 +958,9 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 ) for dataset in component.outlets.values(): + logger.debug( + f"Yielding dataset workunits for {dataset.dataset_urn} (outlet)" + ) yield from self.construct_dataset_workunits( dataset.platform, dataset.dataset_name, @@ -1207,6 +1223,7 @@ def construct_job_workunits( inputJobs: List[str] = [], status: Optional[str] = None, ) -> Iterable[MetadataWorkUnit]: + logger.debug(f"Begining construction of job workunit for {job_urn}") if job_properties: job_properties = {k: v for k, v in job_properties.items() if v is not None} @@ -1229,8 +1246,12 @@ def construct_job_workunits( inlets.sort() outlets.sort() inputJobs.sort() + logger.debug(f"Inlets after sorting: {inlets}") + logger.debug(f"Outlets after sorting: {outlets}") + logger.debug(f"Input jobs after sorting: {inputJobs}") if self.config.incremental_lineage: + logger.debug("Preparing mcps for incremental lineage") patch_builder: DataJobPatchBuilder = DataJobPatchBuilder(job_urn) for inlet in inlets: patch_builder.add_input_dataset(inlet) @@ -1239,6 +1260,7 @@ def construct_job_workunits( for inJob in inputJobs: patch_builder.add_input_datajob(inJob) for patch_mcp in patch_builder.build(): + logger.debug(f"Preparing Patch MCP: {patch_mcp}") yield MetadataWorkUnit( id=f"{job_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp ) diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index 2d944edeb3640..8da8edc8ef0f2 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -330,7 +330,7 @@ def add_output_dataset( self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path=f"/outputDatasetEdges/{self.quote(str(input))}", + path=f"/outputDatasetEdges/{self.quote(str(output))}", value=output_edge, ) return self diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py index 8c2a4b2c4a6dd..267da6cdd5d20 100644 --- a/metadata-ingestion/tests/unit/patch/test_patch_builder.py +++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py @@ -1,10 +1,14 @@ +import json import pathlib import pytest +from freezegun.api import freeze_time from datahub.emitter.mce_builder import ( make_chart_urn, make_dashboard_urn, + make_data_flow_urn, + make_data_job_urn_with_flow, make_dataset_urn, make_schema_field_urn, make_tag_urn, @@ -22,6 +26,7 @@ ) from datahub.specific.chart import ChartPatchBuilder from datahub.specific.dashboard import DashboardPatchBuilder +from datahub.specific.datajob import DataJobPatchBuilder from datahub.specific.dataset import DatasetPatchBuilder from tests.test_helpers import mce_helpers @@ -175,3 +180,85 @@ def test_basic_dashboard_patch_builder(): ), ), ] + + +@freeze_time("2020-04-14 07:00:00") +def test_datajob_patch_builder(): + flow_urn = make_data_flow_urn( + orchestrator="nifi", flow_id="252C34e5af19-0192-1000-b248-b1abee565b5d" + ) + job_urn = make_data_job_urn_with_flow( + flow_urn, "5ca6fee7-0192-1000-f206-dfbc2b0d8bfb" + ) + patcher = DataJobPatchBuilder(job_urn) + + patcher.add_output_dataset( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)" + ) + patcher.add_output_dataset( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)" + ) + patcher.add_output_dataset( + "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)" + ) + + assert patcher.build() == [ + MetadataChangeProposalClass( + entityType="dataJob", + entityUrn="urn:li:dataJob:(urn:li:dataFlow:(nifi,252C34e5af19-0192-1000-b248-b1abee565b5d,prod),5ca6fee7-0192-1000-f206-dfbc2b0d8bfb)", + changeType="PATCH", + aspectName="dataJobInputOutput", + aspect=GenericAspectClass( + value=json.dumps( + [ + { + "op": "add", + "path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder1,DEV)", + "value": { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)", + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + }, + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + }, + }, + }, + { + "op": "add", + "path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder3,DEV)", + "value": { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)", + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + }, + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + }, + }, + }, + { + "op": "add", + "path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder2,DEV)", + "value": { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)", + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + }, + "lastModified": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub", + }, + }, + }, + ] + ).encode("utf-8"), + contentType="application/json-patch+json", + ), + ) + ]