From df796d3c0c76a87afb1b662f1c867d791c6b57f3 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 21 Aug 2023 16:24:50 -0700 Subject: [PATCH] feat(systemMetadata): add pipeline names to system metadata Follow up on https://github.com/datahub-project/datahub/pull/8672 --- .../src/datahub/ingestion/extractor/mce_extractor.py | 7 +++++++ .../src/main/pegasus/com/linkedin/mxe/SystemMetadata.pdl | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py b/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py index 62e880a2e5334..36450dda153d7 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/mce_extractor.py @@ -27,6 +27,9 @@ def _try_reformat_with_black(code: str) -> str: class WorkUnitRecordExtractorConfig(ConfigModel): set_system_metadata = True + set_system_metadata_pipeline_name = ( + False # false for now until the models are available in OSS + ) unpack_mces_into_mcps = False @@ -66,6 +69,10 @@ def get_records( workunit.metadata.systemMetadata = SystemMetadata( lastObserved=get_sys_time(), runId=self.ctx.run_id ) + if self.config.set_system_metadata_pipeline_name: + workunit.metadata.systemMetadata.pipelineName = ( + self.ctx.pipeline_name + ) if ( isinstance(workunit.metadata, MetadataChangeEvent) and len(workunit.metadata.proposedSnapshot.aspects) == 0 diff --git a/metadata-models/src/main/pegasus/com/linkedin/mxe/SystemMetadata.pdl b/metadata-models/src/main/pegasus/com/linkedin/mxe/SystemMetadata.pdl index b9cf7d58d434e..e0f355229c912 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/mxe/SystemMetadata.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/mxe/SystemMetadata.pdl @@ -14,6 +14,11 @@ record SystemMetadata { */ runId: optional string = "no-run-id-provided" + /** + * The ingestion pipeline id that produced the metadata. Populated in case of batch ingestion. + */ + pipelineName: optional string + /** * The model registry name that was used to process this event */