diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index ef9990ca3804e..ffceb7a5d1b02 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -22,6 +22,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - Protobuf CLI will no longer create binary encoded protoc custom properties. Flag added `-protocProp` in case this behavior is required. +- #10814 Data flow info and data job info aspect will produce an additional field that will require a corresponding upgrade of server. Otherwise server can reject the aspects. - #10868 - OpenAPI V3 - Creation of aspects will need to be wrapped within a `value` key and the API is now symmetric with respect to input and outputs. Example Global Tags Aspect: diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index cb2c536bbab20..3870e6978ee64 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -3,6 +3,7 @@ from typing import Callable, Dict, Iterable, List, Optional, Set, cast import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import ALL_ENV_TYPES from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( @@ -110,7 +111,20 @@ def generate_tags_aspect(self) -> List[GlobalTagsClass]: ) return [tags] + def _get_env(self) -> Optional[str]: + env: Optional[str] = None + if self.cluster in ALL_ENV_TYPES: + env = self.cluster + elif self.env in ALL_ENV_TYPES: + env = self.env + else: + logger.warning( + f"cluster {self.cluster} and {self.env} is not a valid environment type so Environment filter won't work." + ) + return env + def generate_mce(self) -> MetadataChangeEventClass: + env = self._get_env() flow_mce = MetadataChangeEventClass( proposedSnapshot=DataFlowSnapshotClass( urn=str(self.urn), @@ -120,6 +134,7 @@ def generate_mce(self) -> MetadataChangeEventClass: description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), *self.generate_ownership_aspect(), *self.generate_tags_aspect(), @@ -130,6 +145,7 @@ def generate_mce(self) -> MetadataChangeEventClass: return flow_mce def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + env = self._get_env() mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataFlowInfoClass( @@ -137,6 +153,7 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), ) yield mcp diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index e56e9f059d724..514f0a5093aa5 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -1,7 +1,9 @@ +import logging from dataclasses import dataclass, field from typing import Callable, Dict, Iterable, List, Optional, Set import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import ALL_ENV_TYPES from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( @@ -22,6 +24,8 @@ from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub.utilities.urns.dataset_urn import DatasetUrn +logger = logging.getLogger(__name__) + @dataclass class DataJob: @@ -103,6 +107,13 @@ def generate_tags_aspect(self) -> Iterable[GlobalTagsClass]: def generate_mcp( self, materialize_iolets: bool = True ) -> Iterable[MetadataChangeProposalWrapper]: + env: Optional[str] = None + if self.flow_urn.cluster in ALL_ENV_TYPES: + env = self.flow_urn.cluster + else: + logger.warning( + f"cluster {self.flow_urn.cluster} is not a valid environment type so Environment filter won't work." + ) mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataJobInfoClass( @@ -111,6 +122,7 @@ def generate_mcp( description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), ) yield mcp diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index d2ae437605644..29b186978a76a 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -7,7 +7,8 @@ "aspect": { "json": { "customProperties": {}, - "name": "postgres" + "name": "postgres", + "env": "PROD" } }, "systemMetadata": { @@ -68,7 +69,8 @@ "name": "postgres", "type": { "string": "COMMAND" - } + }, + "env": "PROD" } }, "systemMetadata": { diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index 59e545183a4ec..0cd3bb83f90f5 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -7,7 +7,8 @@ "aspect": { "json": { "customProperties": {}, - "name": "postgres" + "name": "postgres", + "env": "PROD" } }, "systemMetadata": { @@ -68,7 +69,8 @@ "name": "postgres", "type": { "string": "COMMAND" - } + }, + "env": "PROD" } }, "systemMetadata": { diff --git a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl index 2ff3e8cd930af..766181df01809 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl @@ -4,6 +4,7 @@ import com.linkedin.common.CustomProperties import com.linkedin.common.ExternalReference import com.linkedin.common.Urn import com.linkedin.common.TimeStamp +import com.linkedin.common.FabricType /** * Information about a Data processing flow @@ -63,4 +64,15 @@ record DataFlowInfo includes CustomProperties, ExternalReference { } } lastModified: optional TimeStamp + + /** + * Environment for this flow + */ + @Searchable = { + "fieldType": "KEYWORD", + "addToFilters": true, + "filterNameOverride": "Environment", + "queryByDefault": false + } + env: optional FabricType } diff --git a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl index 250fb76003777..46879e359e014 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl @@ -5,6 +5,7 @@ import com.linkedin.common.CustomProperties import com.linkedin.common.ExternalReference import com.linkedin.common.DataFlowUrn import com.linkedin.common.TimeStamp +import com.linkedin.common.FabricType /** * Information about a Data processing job @@ -72,4 +73,15 @@ record DataJobInfo includes CustomProperties, ExternalReference { */ @deprecated = "Use Data Process Instance model, instead" status: optional JobStatus + + /** + * Environment for this job + */ + @Searchable = { + "fieldType": "KEYWORD", + "addToFilters": true, + "filterNameOverride": "Environment", + "queryByDefault": false + } + env: optional FabricType } diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 72578be8c54d0..eb92cf75a4d4e 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -1491,6 +1491,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1587,6 +1598,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 9b93f1184cd59..0c983a021d4e7 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -1541,6 +1541,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1637,6 +1648,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json index 18ef55011ed5a..4af65cdb48b50 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json @@ -1228,6 +1228,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1324,6 +1335,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json index cf05978820911..e788c5d28ce71 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json @@ -1228,6 +1228,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1324,6 +1335,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json index 15f16dd2ea6cd..dbdba0040d443 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json @@ -1541,6 +1541,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1637,6 +1648,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo"