Skip to content

Commit

Permalink
feat(datajob/flow): add environment filter using info aspects (#10814)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jul 22, 2024
1 parent aa97cba commit 9f570a7
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- Protobuf CLI will no longer create binary encoded protoc custom properties. Flag added `-protocProp` in case this
behavior is required.
- #10814 Data flow info and data job info aspect will produce an additional field that will require a corresponding upgrade of server. Otherwise server can reject the aspects.
- #10868 - OpenAPI V3 - Creation of aspects will need to be wrapped within a `value` key and the API is now symmetric with respect to input and outputs.

Example Global Tags Aspect:
Expand Down
17 changes: 17 additions & 0 deletions metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Callable, Dict, Iterable, List, Optional, Set, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.source_common import ALL_ENV_TYPES
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -110,7 +111,20 @@ def generate_tags_aspect(self) -> List[GlobalTagsClass]:
)
return [tags]

def _get_env(self) -> Optional[str]:
env: Optional[str] = None
if self.cluster in ALL_ENV_TYPES:
env = self.cluster
elif self.env in ALL_ENV_TYPES:
env = self.env
else:
logger.warning(
f"cluster {self.cluster} and {self.env} is not a valid environment type so Environment filter won't work."
)
return env

def generate_mce(self) -> MetadataChangeEventClass:
env = self._get_env()
flow_mce = MetadataChangeEventClass(
proposedSnapshot=DataFlowSnapshotClass(
urn=str(self.urn),
Expand All @@ -120,6 +134,7 @@ def generate_mce(self) -> MetadataChangeEventClass:
description=self.description,
customProperties=self.properties,
externalUrl=self.url,
env=env,
),
*self.generate_ownership_aspect(),
*self.generate_tags_aspect(),
Expand All @@ -130,13 +145,15 @@ def generate_mce(self) -> MetadataChangeEventClass:
return flow_mce

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
env = self._get_env()
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataFlowInfoClass(
name=self.name if self.name is not None else self.id,
description=self.description,
customProperties=self.properties,
externalUrl=self.url,
env=env,
),
)
yield mcp
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from dataclasses import dataclass, field
from typing import Callable, Dict, Iterable, List, Optional, Set

import datahub.emitter.mce_builder as builder
from datahub.configuration.source_common import ALL_ENV_TYPES
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
Expand All @@ -22,6 +24,8 @@
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger = logging.getLogger(__name__)


@dataclass
class DataJob:
Expand Down Expand Up @@ -103,6 +107,13 @@ def generate_tags_aspect(self) -> Iterable[GlobalTagsClass]:
def generate_mcp(
self, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
env: Optional[str] = None
if self.flow_urn.cluster in ALL_ENV_TYPES:
env = self.flow_urn.cluster
else:
logger.warning(
f"cluster {self.flow_urn.cluster} is not a valid environment type so Environment filter won't work."
)
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInfoClass(
Expand All @@ -111,6 +122,7 @@ def generate_mcp(
description=self.description,
customProperties=self.properties,
externalUrl=self.url,
env=env,
),
)
yield mcp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"aspect": {
"json": {
"customProperties": {},
"name": "postgres"
"name": "postgres",
"env": "PROD"
}
},
"systemMetadata": {
Expand Down Expand Up @@ -68,7 +69,8 @@
"name": "postgres",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
},
"systemMetadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"aspect": {
"json": {
"customProperties": {},
"name": "postgres"
"name": "postgres",
"env": "PROD"
}
},
"systemMetadata": {
Expand Down Expand Up @@ -68,7 +69,8 @@
"name": "postgres",
"type": {
"string": "COMMAND"
}
},
"env": "PROD"
}
},
"systemMetadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.linkedin.common.CustomProperties
import com.linkedin.common.ExternalReference
import com.linkedin.common.Urn
import com.linkedin.common.TimeStamp
import com.linkedin.common.FabricType

/**
* Information about a Data processing flow
Expand Down Expand Up @@ -63,4 +64,15 @@ record DataFlowInfo includes CustomProperties, ExternalReference {
}
}
lastModified: optional TimeStamp

/**
* Environment for this flow
*/
@Searchable = {
"fieldType": "KEYWORD",
"addToFilters": true,
"filterNameOverride": "Environment",
"queryByDefault": false
}
env: optional FabricType
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.linkedin.common.CustomProperties
import com.linkedin.common.ExternalReference
import com.linkedin.common.DataFlowUrn
import com.linkedin.common.TimeStamp
import com.linkedin.common.FabricType

/**
* Information about a Data processing job
Expand Down Expand Up @@ -72,4 +73,15 @@ record DataJobInfo includes CustomProperties, ExternalReference {
*/
@deprecated = "Use Data Process Instance model, instead"
status: optional JobStatus

/**
* Environment for this job
*/
@Searchable = {
"fieldType": "KEYWORD",
"addToFilters": true,
"filterNameOverride": "Environment",
"queryByDefault": false
}
env: optional FabricType
}
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
Expand Down Expand Up @@ -1587,6 +1598,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
Expand Down Expand Up @@ -1637,6 +1648,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
Expand Down Expand Up @@ -1324,6 +1335,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
Expand Down Expand Up @@ -1324,6 +1335,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,17 @@
"fieldType" : "DATETIME"
}
}
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this flow",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataFlowInfo"
Expand Down Expand Up @@ -1637,6 +1648,17 @@
"doc" : "Status of the job - Deprecated for Data Process Instance model.",
"optional" : true,
"deprecated" : "Use Data Process Instance model, instead"
}, {
"name" : "env",
"type" : "com.linkedin.common.FabricType",
"doc" : "Environment for this job",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldType" : "KEYWORD",
"filterNameOverride" : "Environment",
"queryByDefault" : false
}
} ],
"Aspect" : {
"name" : "dataJobInfo"
Expand Down

0 comments on commit 9f570a7

Please sign in to comment.