diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java index 272a93fa1989c..606123cac926d 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java @@ -8,6 +8,7 @@ import com.datahub.plugins.auth.authorization.Authorizer; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; @@ -21,6 +22,8 @@ public class TestUtils { public static EntityService getMockEntityService() { + PathSpecBasedSchemaAnnotationVisitor.class.getClassLoader() + .setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false); EntityRegistry registry = new ConfigEntityRegistry(TestUtils.class.getResourceAsStream("/test-entity-registry.yaml")); EntityService mockEntityService = Mockito.mock(EntityService.class); Mockito.when(mockEntityService.getEntityRegistry()).thenReturn(registry); diff --git a/datahub-graphql-core/src/test/resources/test-entity-registry.yaml b/datahub-graphql-core/src/test/resources/test-entity-registry.yaml index d694ae53ac42f..efd75a7fb07f5 100644 --- a/datahub-graphql-core/src/test/resources/test-entity-registry.yaml +++ b/datahub-graphql-core/src/test/resources/test-entity-registry.yaml @@ -181,6 +181,7 @@ entities: - assertionInfo - dataPlatformInstance - assertionRunEvent + - assertionActions - status - name: dataHubRetention category: internal @@ -292,4 +293,11 @@ entities: aspects: - ownershipTypeInfo - status +- name: dataContract + category: core + keyAspect: dataContractKey + aspects: + - dataContractProperties + - dataContractStatus + - status events: diff --git a/metadata-ingestion/examples/data_contract/pet_of_the_week.dhub.dc.yaml b/metadata-ingestion/examples/data_contract/pet_of_the_week.dhub.dc.yaml new file mode 100644 index 0000000000000..c73904403f678 --- /dev/null +++ b/metadata-ingestion/examples/data_contract/pet_of_the_week.dhub.dc.yaml @@ -0,0 +1,21 @@ +# id: pet_details_dc # Optional: This is the unique identifier for the data contract +display_name: Data Contract for SampleHiveDataset +entity: urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) +freshness: + time: 0700 + granularity: DAILY +schema: + properties: + field_foo: + type: string + native_type: VARCHAR(100) + field_bar: + type: boolean + required: + - field_bar +data_quality: + - type: column_range + config: + column: field_foo + min: 0 + max: 100 diff --git a/metadata-ingestion/src/datahub/api/entities/datacontract/__init__.py b/metadata-ingestion/src/datahub/api/entities/datacontract/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/src/datahub/api/entities/datacontract/data_quality_assertion.py b/metadata-ingestion/src/datahub/api/entities/datacontract/data_quality_assertion.py new file mode 100644 index 0000000000000..a665e95e93c43 --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/datacontract/data_quality_assertion.py @@ -0,0 +1,107 @@ +from typing import List, Optional, Union + +import pydantic +from typing_extensions import Literal + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.metadata.schema_classes import ( + AssertionInfoClass, + AssertionStdAggregationClass, + AssertionStdOperatorClass, + AssertionStdParameterClass, + AssertionStdParametersClass, + AssertionStdParameterTypeClass, + AssertionTypeClass, + DatasetAssertionInfoClass, + DatasetAssertionScopeClass, +) + + +class IdConfigMixin(ConfigModel): + id_raw: Optional[str] = pydantic.Field( + default=None, + alias="id", + description="The id of the assertion. If not provided, one will be generated using the type.", + ) + + def generate_default_id(self) -> str: + raise NotImplementedError + + +class CustomSQLAssertion(IdConfigMixin, ConfigModel): + type: Literal["custom_sql"] + + sql: str + + def generate_dataset_assertion_info( + self, entity_urn: str + ) -> DatasetAssertionInfoClass: + return DatasetAssertionInfoClass( + dataset=entity_urn, + scope=DatasetAssertionScopeClass.UNKNOWN, + fields=[], + operator=AssertionStdOperatorClass._NATIVE_, + aggregation=AssertionStdAggregationClass._NATIVE_, + logic=self.sql, + ) + + +class ColumnUniqueAssertion(IdConfigMixin, ConfigModel): + type: Literal["unique"] + + # TODO: support multiple columns? + column: str + + def generate_default_id(self) -> str: + return f"{self.type}-{self.column}" + + def generate_dataset_assertion_info( + self, entity_urn: str + ) -> DatasetAssertionInfoClass: + return DatasetAssertionInfoClass( + dataset=entity_urn, + scope=DatasetAssertionScopeClass.DATASET_COLUMN, + fields=[builder.make_schema_field_urn(entity_urn, self.column)], + operator=AssertionStdOperatorClass.EQUAL_TO, + aggregation=AssertionStdAggregationClass.UNIQUE_PROPOTION, # purposely using the misspelled version to work with gql + parameters=AssertionStdParametersClass( + value=AssertionStdParameterClass( + value="1", type=AssertionStdParameterTypeClass.NUMBER + ) + ), + ) + + +class DataQualityAssertion(ConfigModel): + __root__: Union[ + CustomSQLAssertion, + ColumnUniqueAssertion, + ] = pydantic.Field(discriminator="type") + + @property + def id(self) -> str: + if self.__root__.id_raw: + return self.__root__.id_raw + try: + return self.__root__.generate_default_id() + except NotImplementedError: + return self.__root__.type + + def generate_mcp( + self, assertion_urn: str, entity_urn: str + ) -> List[MetadataChangeProposalWrapper]: + dataset_assertion_info = self.__root__.generate_dataset_assertion_info( + entity_urn + ) + + return [ + MetadataChangeProposalWrapper( + entityUrn=assertion_urn, + aspect=AssertionInfoClass( + type=AssertionTypeClass.DATASET, + datasetAssertion=dataset_assertion_info, + ), + ) + ] diff --git a/metadata-ingestion/src/datahub/api/entities/datacontract/datacontract.py b/metadata-ingestion/src/datahub/api/entities/datacontract/datacontract.py new file mode 100644 index 0000000000000..2df446623a9d6 --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/datacontract/datacontract.py @@ -0,0 +1,213 @@ +import collections +from typing import Iterable, List, Optional, Tuple + +import pydantic +from ruamel.yaml import YAML +from typing_extensions import Literal + +import datahub.emitter.mce_builder as builder +from datahub.api.entities.datacontract.data_quality_assertion import ( + DataQualityAssertion, +) +from datahub.api.entities.datacontract.freshness_assertion import FreshnessAssertion +from datahub.api.entities.datacontract.schema_assertion import SchemaAssertion +from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import datahub_guid, make_assertion_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.metadata.schema_classes import ( + DataContractPropertiesClass, + DataContractStateClass, + DataContractStatusClass, + DataQualityContractClass, + FreshnessContractClass, + SchemaContractClass, + StatusClass, +) +from datahub.utilities.urns.urn import guess_entity_type + + +class DataContract(ConfigModel): + """A yml representation of a Data Contract. + + This model is used as a simpler, Python-native representation of a DataHub data contract. + It can be easily parsed from a YAML file, and can be easily converted into series of MCPs + that can be emitted to DataHub. + """ + + version: Literal[1] + + id: Optional[str] = pydantic.Field( + default=None, + alias="urn", + description="The data contract urn. If not provided, one will be generated.", + ) + entity: str = pydantic.Field( + description="The entity urn that the Data Contract is associated with" + ) + # TODO: add support for properties + # properties: Optional[Dict[str, str]] = None + + schema_field: Optional[SchemaAssertion] = pydantic.Field( + default=None, alias="schema" + ) + + freshness: Optional[FreshnessAssertion] = pydantic.Field(default=None) + + # TODO: Add a validator to ensure that ids are unique + data_quality: Optional[List[DataQualityAssertion]] = None + + _original_yaml_dict: Optional[dict] = None + + @pydantic.validator("data_quality") + def validate_data_quality( + cls, data_quality: Optional[List[DataQualityAssertion]] + ) -> Optional[List[DataQualityAssertion]]: + if data_quality: + # Raise an error if there are duplicate ids. + id_counts = collections.Counter(dq_check.id for dq_check in data_quality) + duplicates = [id for id, count in id_counts.items() if count > 1] + + if duplicates: + raise ValueError( + f"Got multiple data quality tests with the same type or ID: {duplicates}. Set a unique ID for each data quality test." + ) + + return data_quality + + @property + def urn(self) -> str: + if self.id: + assert guess_entity_type(self.id) == "dataContract" + return self.id + + # Data contract urns are stable + guid_obj = {"entity": self.entity} + urn = f"urn:li:dataContract:{datahub_guid(guid_obj)}" + return urn + + def _generate_freshness_assertion( + self, freshness: FreshnessAssertion + ) -> Tuple[str, List[MetadataChangeProposalWrapper]]: + guid_dict = { + "contract": self.urn, + "entity": self.entity, + "freshness": freshness.id, + } + assertion_urn = builder.make_assertion_urn(builder.datahub_guid(guid_dict)) + + return ( + assertion_urn, + freshness.generate_mcp(assertion_urn, self.entity), + ) + + def _generate_schema_assertion( + self, schema_metadata: SchemaAssertion + ) -> Tuple[str, List[MetadataChangeProposalWrapper]]: + # ingredients for guid -> the contract id, the fact that this is a schema assertion and the entity on which the assertion is made + guid_dict = { + "contract": self.urn, + "entity": self.entity, + "schema": schema_metadata.id, + } + assertion_urn = make_assertion_urn(datahub_guid(guid_dict)) + + return ( + assertion_urn, + schema_metadata.generate_mcp(assertion_urn, self.entity), + ) + + def _generate_data_quality_assertion( + self, data_quality: DataQualityAssertion + ) -> Tuple[str, List[MetadataChangeProposalWrapper]]: + guid_dict = { + "contract": self.urn, + "entity": self.entity, + "data_quality": data_quality.id, + } + assertion_urn = make_assertion_urn(datahub_guid(guid_dict)) + + return ( + assertion_urn, + data_quality.generate_mcp(assertion_urn, self.entity), + ) + + def _generate_dq_assertions( + self, data_quality_spec: List[DataQualityAssertion] + ) -> Tuple[List[str], List[MetadataChangeProposalWrapper]]: + assertion_urns = [] + assertion_mcps = [] + + for dq_check in data_quality_spec: + assertion_urn, assertion_mcp = self._generate_data_quality_assertion( + dq_check + ) + + assertion_urns.append(assertion_urn) + assertion_mcps.extend(assertion_mcp) + + return (assertion_urns, assertion_mcps) + + def generate_mcp( + self, + ) -> Iterable[MetadataChangeProposalWrapper]: + schema_assertion_urn = None + if self.schema_field is not None: + ( + schema_assertion_urn, + schema_assertion_mcps, + ) = self._generate_schema_assertion(self.schema_field) + yield from schema_assertion_mcps + + freshness_assertion_urn = None + if self.freshness: + ( + freshness_assertion_urn, + sla_assertion_mcps, + ) = self._generate_freshness_assertion(self.freshness) + yield from sla_assertion_mcps + + dq_assertions, dq_assertion_mcps = self._generate_dq_assertions( + self.data_quality or [] + ) + yield from dq_assertion_mcps + + # Now that we've generated the assertions, we can generate + # the actual data contract. + yield from MetadataChangeProposalWrapper.construct_many( + entityUrn=self.urn, + aspects=[ + DataContractPropertiesClass( + entity=self.entity, + schema=[SchemaContractClass(assertion=schema_assertion_urn)] + if schema_assertion_urn + else None, + freshness=[ + FreshnessContractClass(assertion=freshness_assertion_urn) + ] + if freshness_assertion_urn + else None, + dataQuality=[ + DataQualityContractClass(assertion=dq_assertion_urn) + for dq_assertion_urn in dq_assertions + ], + ), + # Also emit status. + StatusClass(removed=False), + # Emit the contract state as PENDING. + DataContractStatusClass(state=DataContractStateClass.PENDING) + if True + else None, + ], + ) + + @classmethod + def from_yaml( + cls, + file: str, + ) -> "DataContract": + with open(file) as fp: + yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip) + orig_dictionary = yaml.load(fp) + parsed_data_contract = DataContract.parse_obj(orig_dictionary) + parsed_data_contract._original_yaml_dict = orig_dictionary + return parsed_data_contract diff --git a/metadata-ingestion/src/datahub/api/entities/datacontract/freshness_assertion.py b/metadata-ingestion/src/datahub/api/entities/datacontract/freshness_assertion.py new file mode 100644 index 0000000000000..ee8fa1181e614 --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/datacontract/freshness_assertion.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from datetime import timedelta +from typing import List, Union + +import pydantic +from typing_extensions import Literal + +from datahub.configuration.common import ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.metadata.schema_classes import ( + AssertionInfoClass, + AssertionTypeClass, + CalendarIntervalClass, + FixedIntervalScheduleClass, + FreshnessAssertionInfoClass, + FreshnessAssertionScheduleClass, + FreshnessAssertionScheduleTypeClass, + FreshnessAssertionTypeClass, + FreshnessCronScheduleClass, +) + + +class CronFreshnessAssertion(ConfigModel): + type: Literal["cron"] + + cron: str = pydantic.Field( + description="The cron expression to use. See https://crontab.guru/ for help." + ) + timezone: str = pydantic.Field( + "UTC", + description="The timezone to use for the cron schedule. Defaults to UTC.", + ) + + +class FixedIntervalFreshnessAssertion(ConfigModel): + type: Literal["interval"] + + interval: timedelta + + +class FreshnessAssertion(ConfigModel): + __root__: Union[ + CronFreshnessAssertion, FixedIntervalFreshnessAssertion + ] = pydantic.Field(discriminator="type") + + @property + def id(self): + return self.__root__.type + + def generate_mcp( + self, assertion_urn: str, entity_urn: str + ) -> List[MetadataChangeProposalWrapper]: + freshness = self.__root__ + + if isinstance(freshness, CronFreshnessAssertion): + schedule = FreshnessAssertionScheduleClass( + type=FreshnessAssertionScheduleTypeClass.CRON, + cron=FreshnessCronScheduleClass( + cron=freshness.cron, + timezone=freshness.timezone, + ), + ) + elif isinstance(freshness, FixedIntervalFreshnessAssertion): + schedule = FreshnessAssertionScheduleClass( + type=FreshnessAssertionScheduleTypeClass.FIXED_INTERVAL, + fixedInterval=FixedIntervalScheduleClass( + unit=CalendarIntervalClass.SECOND, + multiple=int(freshness.interval.total_seconds()), + ), + ) + else: + raise ValueError(f"Unknown freshness type {freshness}") + + assertionInfo = AssertionInfoClass( + type=AssertionTypeClass.FRESHNESS, + freshnessAssertion=FreshnessAssertionInfoClass( + entity=entity_urn, + type=FreshnessAssertionTypeClass.DATASET_CHANGE, + schedule=schedule, + ), + ) + + return [ + MetadataChangeProposalWrapper(entityUrn=assertion_urn, aspect=assertionInfo) + ] diff --git a/metadata-ingestion/src/datahub/api/entities/datacontract/schema_assertion.py b/metadata-ingestion/src/datahub/api/entities/datacontract/schema_assertion.py new file mode 100644 index 0000000000000..b5b592e01f58f --- /dev/null +++ b/metadata-ingestion/src/datahub/api/entities/datacontract/schema_assertion.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import json +from typing import List, Union + +import pydantic +from typing_extensions import Literal + +from datahub.configuration.common import ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.extractor.json_schema_util import get_schema_metadata +from datahub.metadata.schema_classes import ( + AssertionInfoClass, + AssertionTypeClass, + SchemaAssertionInfoClass, + SchemaFieldClass, + SchemalessClass, + SchemaMetadataClass, +) + + +class JsonSchemaContract(ConfigModel): + type: Literal["json-schema"] + + json_schema: dict = pydantic.Field(alias="json-schema") + + _schema_metadata: SchemaMetadataClass + + def _init_private_attributes(self) -> None: + super()._init_private_attributes() + self._schema_metadata = get_schema_metadata( + platform="urn:li:dataPlatform:datahub", + name="", + json_schema=self.json_schema, + raw_schema_string=json.dumps(self.json_schema), + ) + + +class FieldListSchemaContract(ConfigModel, arbitrary_types_allowed=True): + type: Literal["field-list"] + + fields: List[SchemaFieldClass] + + _schema_metadata: SchemaMetadataClass + + def _init_private_attributes(self) -> None: + super()._init_private_attributes() + self._schema_metadata = SchemaMetadataClass( + schemaName="", + platform="urn:li:dataPlatform:datahub", + version=0, + hash="", + platformSchema=SchemalessClass(), + fields=self.fields, + ) + + +class SchemaAssertion(ConfigModel): + __root__: Union[JsonSchemaContract, FieldListSchemaContract] = pydantic.Field( + discriminator="type" + ) + + @property + def id(self): + return self.__root__.type + + def generate_mcp( + self, assertion_urn: str, entity_urn: str + ) -> List[MetadataChangeProposalWrapper]: + schema_metadata = self.__root__._schema_metadata + + assertionInfo = AssertionInfoClass( + type=AssertionTypeClass.DATA_SCHEMA, + schemaAssertion=SchemaAssertionInfoClass( + entity=entity_urn, schema=schema_metadata + ), + ) + + return [ + MetadataChangeProposalWrapper(entityUrn=assertion_urn, aspect=assertionInfo) + ] diff --git a/metadata-ingestion/src/datahub/cli/specific/datacontract_cli.py b/metadata-ingestion/src/datahub/cli/specific/datacontract_cli.py new file mode 100644 index 0000000000000..3745943c8c96a --- /dev/null +++ b/metadata-ingestion/src/datahub/cli/specific/datacontract_cli.py @@ -0,0 +1,80 @@ +import logging +from typing import Optional + +import click +from click_default_group import DefaultGroup + +from datahub.api.entities.datacontract.datacontract import DataContract +from datahub.ingestion.graph.client import get_default_graph +from datahub.telemetry import telemetry +from datahub.upgrade import upgrade + +logger = logging.getLogger(__name__) + + +@click.group(cls=DefaultGroup, default="upsert") +def datacontract() -> None: + """A group of commands to interact with the DataContract entity in DataHub.""" + pass + + +@datacontract.command() +@click.option("-f", "--file", required=True, type=click.Path(exists=True)) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def upsert(file: str) -> None: + """Upsert (create or update) a Data Contract in DataHub.""" + + data_contract: DataContract = DataContract.from_yaml(file) + urn = data_contract.urn + + with get_default_graph() as graph: + if not graph.exists(data_contract.entity): + raise ValueError( + f"Cannot define a data contract for non-existent entity {data_contract.entity}" + ) + + try: + for mcp in data_contract.generate_mcp(): + graph.emit(mcp) + click.secho(f"Update succeeded for urn {urn}.", fg="green") + except Exception as e: + logger.exception(e) + click.secho( + f"Update failed for {urn}: {e}", + fg="red", + ) + + +@datacontract.command() +@click.option( + "--urn", required=False, type=str, help="The urn for the data contract to delete" +) +@click.option( + "-f", + "--file", + required=False, + type=click.Path(exists=True), + help="The file containing the data contract definition", +) +@click.option("--hard/--soft", required=False, is_flag=True, default=False) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def delete(urn: Optional[str], file: Optional[str], hard: bool) -> None: + """Delete a Data Contract in DataHub. Defaults to a soft-delete. Use --hard to completely erase metadata.""" + + if not urn: + if not file: + raise click.UsageError( + "Must provide either an urn or a file to delete a data contract" + ) + + data_contract = DataContract.from_yaml(file) + urn = data_contract.urn + + with get_default_graph() as graph: + if not graph.exists(urn): + raise ValueError(f"Data Contract {urn} does not exist") + + graph.delete_entity(urn, hard=hard) + click.secho(f"Data Contract {urn} deleted") diff --git a/metadata-ingestion/src/datahub/cli/specific/file_loader.py b/metadata-ingestion/src/datahub/cli/specific/file_loader.py index 54f12e024d294..a9787343fdb91 100644 --- a/metadata-ingestion/src/datahub/cli/specific/file_loader.py +++ b/metadata-ingestion/src/datahub/cli/specific/file_loader.py @@ -1,9 +1,7 @@ -import io from pathlib import Path from typing import Union -from datahub.configuration.common import ConfigurationError -from datahub.configuration.yaml import YamlConfigurationMechanism +from datahub.configuration.config_loader import load_config_file def load_file(config_file: Path) -> Union[dict, list]: @@ -17,19 +15,11 @@ def load_file(config_file: Path) -> Union[dict, list]: evolve to becoming a standard function that all the specific. cli variants will use to load up the models from external files """ - if not isinstance(config_file, Path): - config_file = Path(config_file) - if not config_file.is_file(): - raise ConfigurationError(f"Cannot open config file {config_file}") - if config_file.suffix in {".yaml", ".yml"}: - config_mech: YamlConfigurationMechanism = YamlConfigurationMechanism() - else: - raise ConfigurationError( - f"Only .yaml and .yml are supported. Cannot process file type {config_file.suffix}" - ) - - raw_config_file = config_file.read_text() - config_fp = io.StringIO(raw_config_file) - raw_config = config_mech.load_config(config_fp) - return raw_config + res = load_config_file( + config_file, + squirrel_original_config=False, + resolve_env_vars=False, + allow_stdin=False, + ) + return res diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 0928818c7005c..64c9ec1bb5704 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -1,11 +1,11 @@ """Convenience functions for creating MCEs""" +import hashlib import json import logging import os import re import time from enum import Enum -from hashlib import md5 from typing import ( TYPE_CHECKING, Any, @@ -21,7 +21,6 @@ import typing_inspect from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION -from datahub.emitter.serialization_helper import pre_json_transform from datahub.metadata.schema_classes import ( AssertionKeyClass, AuditStampClass, @@ -159,11 +158,24 @@ def container_urn_to_key(guid: str) -> Optional[ContainerKeyClass]: return None +class _DatahubKeyJSONEncoder(json.JSONEncoder): + # overload method default + def default(self, obj: Any) -> Any: + if hasattr(obj, "guid"): + return obj.guid() + # Call the default method for other types + return json.JSONEncoder.default(self, obj) + + def datahub_guid(obj: dict) -> str: - obj_str = json.dumps( - pre_json_transform(obj), separators=(",", ":"), sort_keys=True - ).encode("utf-8") - return md5(obj_str).hexdigest() + json_key = json.dumps( + obj, + separators=(",", ":"), + sort_keys=True, + cls=_DatahubKeyJSONEncoder, + ) + md5_hash = hashlib.md5(json_key.encode("utf-8")) + return str(md5_hash.hexdigest()) def make_assertion_urn(assertion_id: str) -> str: diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 7419577b367aa..06f689dfd317b 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -1,11 +1,10 @@ -import hashlib -import json -from typing import Any, Dict, Iterable, List, Optional, TypeVar +from typing import Dict, Iterable, List, Optional, TypeVar from pydantic.fields import Field from pydantic.main import BaseModel from datahub.emitter.mce_builder import ( + datahub_guid, make_container_urn, make_data_platform_urn, make_dataplatform_instance_urn, @@ -33,24 +32,13 @@ ) -def _stable_guid_from_dict(d: dict) -> str: - json_key = json.dumps( - d, - separators=(",", ":"), - sort_keys=True, - cls=DatahubKeyJSONEncoder, - ) - md5_hash = hashlib.md5(json_key.encode("utf-8")) - return str(md5_hash.hexdigest()) - - class DatahubKey(BaseModel): def guid_dict(self) -> Dict[str, str]: return self.dict(by_alias=True, exclude_none=True) def guid(self) -> str: bag = self.guid_dict() - return _stable_guid_from_dict(bag) + return datahub_guid(bag) class ContainerKey(DatahubKey): @@ -137,15 +125,6 @@ def as_urn(self) -> str: ) -class DatahubKeyJSONEncoder(json.JSONEncoder): - # overload method default - def default(self, obj: Any) -> Any: - if hasattr(obj, "guid"): - return obj.guid() - # Call the default method for other types - return json.JSONEncoder.default(self, obj) - - KeyType = TypeVar("KeyType", bound=ContainerKey) diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 84615fd9a6148..5bfab3b841fa3 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -21,6 +21,7 @@ from datahub.cli.ingest_cli import ingest from datahub.cli.migrate import migrate from datahub.cli.put_cli import put +from datahub.cli.specific.datacontract_cli import datacontract from datahub.cli.specific.dataproduct_cli import dataproduct from datahub.cli.specific.group_cli import group from datahub.cli.specific.user_cli import user @@ -158,6 +159,7 @@ def init() -> None: datahub.add_command(user) datahub.add_command(group) datahub.add_command(dataproduct) +datahub.add_command(datacontract) try: from datahub.cli.lite_cli import lite diff --git a/metadata-ingestion/src/datahub/ingestion/api/closeable.py b/metadata-ingestion/src/datahub/ingestion/api/closeable.py index 523174b9978b3..80a5008ed6368 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/closeable.py +++ b/metadata-ingestion/src/datahub/ingestion/api/closeable.py @@ -1,7 +1,9 @@ from abc import abstractmethod from contextlib import AbstractContextManager from types import TracebackType -from typing import Optional, Type +from typing import Optional, Type, TypeVar + +_Self = TypeVar("_Self", bound="Closeable") class Closeable(AbstractContextManager): @@ -9,6 +11,10 @@ class Closeable(AbstractContextManager): def close(self) -> None: pass + def __enter__(self: _Self) -> _Self: + # This method is mainly required for type checking. + return self + def __exit__( self, exc_type: Optional[Type[BaseException]], diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 782d94f39e8a5..f6e78ab900443 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -701,18 +701,22 @@ def create_test_entity_mcps( assertion_urn = mce_builder.make_assertion_urn( mce_builder.datahub_guid( { - "platform": DBT_PLATFORM, - "name": node.dbt_name, - "instance": self.config.platform_instance, - **( - # Ideally we'd include the env unconditionally. However, we started out - # not including env in the guid, so we need to maintain backwards compatibility - # with existing PROD assertions. - {"env": self.config.env} - if self.config.env != mce_builder.DEFAULT_ENV - and self.config.include_env_in_assertion_guid - else {} - ), + k: v + for k, v in { + "platform": DBT_PLATFORM, + "name": node.dbt_name, + "instance": self.config.platform_instance, + **( + # Ideally we'd include the env unconditionally. However, we started out + # not including env in the guid, so we need to maintain backwards compatibility + # with existing PROD assertions. + {"env": self.config.env} + if self.config.env != mce_builder.DEFAULT_ENV + and self.config.include_env_in_assertion_guid + else {} + ), + }.items() + if v is not None } ) ) diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index f116550328819..8b393a8f6f1c6 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -35,6 +35,7 @@ from datahub.cli.cli_utils import get_boolean_env_variable from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.emitter.serialization_helper import pre_json_transform from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( get_platform_from_sqlalchemy_uri, ) @@ -253,13 +254,15 @@ def get_assertions_with_results( # possibly for each validation run assertionUrn = builder.make_assertion_urn( builder.datahub_guid( - { - "platform": GE_PLATFORM_NAME, - "nativeType": expectation_type, - "nativeParameters": kwargs, - "dataset": assertion_datasets[0], - "fields": assertion_fields, - } + pre_json_transform( + { + "platform": GE_PLATFORM_NAME, + "nativeType": expectation_type, + "nativeParameters": kwargs, + "dataset": assertion_datasets[0], + "fields": assertion_fields, + } + ) ) ) logger.debug( @@ -638,7 +641,7 @@ def get_dataset_partitions(self, batch_identifier, data_asset): ].batch_request.runtime_parameters["query"] partitionSpec = PartitionSpecClass( type=PartitionTypeClass.QUERY, - partition=f"Query_{builder.datahub_guid(query)}", + partition=f"Query_{builder.datahub_guid(pre_json_transform(query))}", ) batchSpec = BatchSpec( diff --git a/metadata-ingestion/tests/unit/test_mcp_builder.py b/metadata-ingestion/tests/unit/test_mcp_builder.py index 23f2bddc2084e..561b782ef9e46 100644 --- a/metadata-ingestion/tests/unit/test_mcp_builder.py +++ b/metadata-ingestion/tests/unit/test_mcp_builder.py @@ -1,5 +1,4 @@ import datahub.emitter.mcp_builder as builder -from datahub.emitter.mce_builder import datahub_guid def test_guid_generator(): @@ -80,7 +79,7 @@ def test_guid_generators(): key = builder.SchemaKey( database="test", schema="Test", platform="mysql", instance="TestInstance" ) - guid_datahub = datahub_guid(key.dict(by_alias=True)) + guid_datahub = key.guid() guid = key.guid() assert guid == guid_datahub diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionAction.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionAction.pdl new file mode 100644 index 0000000000000..df6620b66bfd8 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionAction.pdl @@ -0,0 +1,22 @@ +namespace com.linkedin.assertion + +/** + * The Actions about an Assertion. + * In the future, we'll likely extend this model to support additional + * parameters or options related to the assertion actions. + */ +record AssertionAction { + /** + * The type of the Action + */ + type: enum AssertionActionType { + /** + * Raise an incident. + */ + RAISE_INCIDENT + /** + * Resolve open incidents related to the assertion. + */ + RESOLVE_INCIDENT + } +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionActions.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionActions.pdl new file mode 100644 index 0000000000000..61846c1ba9c12 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionActions.pdl @@ -0,0 +1,18 @@ +namespace com.linkedin.assertion + +/** + * The Actions about an Assertion + */ +@Aspect = { + "name": "assertionActions" +} +record AssertionActions { + /** + * Actions to be executed on successful assertion run. + */ + onSuccess: array[AssertionAction] = [] + /** + * Actions to be executed on failed assertion run. + */ + onFailure: array[AssertionAction] = [] +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl index 77ee147a781e2..ae2a58028057b 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionInfo.pdl @@ -13,13 +13,58 @@ record AssertionInfo includes CustomProperties, ExternalReference { /** * Type of assertion. Assertion types can evolve to span Datasets, Flows (Pipelines), Models, Features etc. */ + @Searchable = { } type: enum AssertionType { - // A single-dataset assertion. When this is the value, the datasetAssertion field will be populated. + /** + * A single-dataset assertion. When this is the value, the datasetAssertion field will be populated. + */ DATASET + + /** + * A freshness assertion, or an assertion which indicates when a particular operation should occur + * to an asset. + */ + FRESHNESS + + /** + * A volume assertion, or an assertion which indicates how much data should be available for a + * particular asset. + */ + VOLUME + + /** + * A schema or structural assertion. + * + * Would have named this SCHEMA but the codegen for PDL does not allow this (reserved word). + */ + DATA_SCHEMA } /** - * Dataset Assertion information when type is DATASET + * A Dataset Assertion definition. This field is populated when the type is DATASET. */ datasetAssertion: optional DatasetAssertionInfo + + /** + * An Freshness Assertion definition. This field is populated when the type is FRESHNESS. + */ + freshnessAssertion: optional FreshnessAssertionInfo + + /** + * An Volume Assertion definition. This field is populated when the type is VOLUME. + */ + volumeAssertion: optional VolumeAssertionInfo + + /** + * An schema Assertion definition. This field is populated when the type is DATASET_SCHEMA + */ + schemaAssertion: optional SchemaAssertionInfo + + /** + * The source or origin of the Assertion definition. + * + * If the source type of the Assertion is EXTERNAL, it is expected to have a corresponding dataPlatformInstance aspect detailing + * the platform where it was ingested from. + */ + source: optional AssertionSource } \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl index decbfc08263de..ded84e1969153 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResult.pdl @@ -5,10 +5,15 @@ namespace com.linkedin.assertion */ record AssertionResult { /** - * The final result, e.g. either SUCCESS or FAILURE. + * The final result, e.g. either SUCCESS, FAILURE, or ERROR. */ @TimeseriesField = {} + @Searchable = {} type: enum AssertionResultType { + /** + * The Assertion has not yet been fully evaluated + */ + INIT /** * The Assertion Succeeded */ @@ -17,6 +22,10 @@ record AssertionResult { * The Assertion Failed */ FAILURE + /** + * The Assertion encountered an Error + */ + ERROR } /** @@ -45,8 +54,13 @@ record AssertionResult { nativeResults: optional map[string, string] /** - * URL where full results are available + * External URL where full results are available. Only present when assertion source is not native. */ externalUrl: optional string + /** + * The error object if AssertionResultType is an Error + */ + error: optional AssertionResultError + } \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResultError.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResultError.pdl new file mode 100644 index 0000000000000..e768fe8521942 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionResultError.pdl @@ -0,0 +1,45 @@ +namespace com.linkedin.assertion + +/** + * An error encountered when evaluating an AssertionResult + */ +record AssertionResultError { + /** + * The type of error encountered + */ + type: enum AssertionResultErrorType { + /** + * Source is unreachable + */ + SOURCE_CONNECTION_ERROR + /** + * Source query failed to execute + */ + SOURCE_QUERY_FAILED + /** + * Insufficient data to evaluate the assertion + */ + INSUFFICIENT_DATA + /** + * Invalid parameters were detected + */ + INVALID_PARAMETERS + /** + * Event type not supported by the specified source + */ + INVALID_SOURCE_TYPE + /** + * Unsupported platform + */ + UNSUPPORTED_PLATFORM + /** + * Unknown error + */ + UNKNOWN_ERROR + } + + /** + * Additional metadata depending on the type of error + */ + properties: optional map[string, string] +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl index 9e75f96fafd06..14f1204232740 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionRunEvent.pdl @@ -1,6 +1,7 @@ namespace com.linkedin.assertion -import com.linkedin.timeseries.TimeseriesAspectBase +import com.linkedin.timeseries.PartitionSpec +import com.linkedin.timeseries.TimeWindowSize import com.linkedin.common.ExternalReference import com.linkedin.common.Urn @@ -12,36 +13,31 @@ import com.linkedin.common.Urn "name": "assertionRunEvent", "type": "timeseries", } -record AssertionRunEvent includes TimeseriesAspectBase { +record AssertionRunEvent { + + /** + * The event timestamp field as epoch at UTC in milli seconds. + */ + @Searchable = { + "fieldName": "lastCompletedTime", + "fieldType": "DATETIME" + } + timestampMillis: long /** * Native (platform-specific) identifier for this run */ - //Multiple assertions could occur in same evaluator run runId: string - /* - * Urn of assertion which is evaluated - */ - @TimeseriesField = {} - assertionUrn: Urn - /* * Urn of entity on which the assertion is applicable */ - //example - dataset urn, if dataset is being asserted @TimeseriesField = {} asserteeUrn: Urn - - /** - * Specification of the batch which this run is evaluating - */ - batchSpec: optional BatchSpec /** * The status of the assertion run as per this timeseries event. */ - // Currently just supports COMPLETE, but should evolve to support other statuses like STARTED, RUNNING, etc. @TimeseriesField = {} status: enum AssertionRunStatus { /** @@ -59,4 +55,33 @@ record AssertionRunEvent includes TimeseriesAspectBase { * Runtime parameters of evaluation */ runtimeContext: optional map[string, string] + + /** + * Specification of the batch which this run is evaluating + */ + batchSpec: optional BatchSpec + + /* + * Urn of assertion which is evaluated + */ + @TimeseriesField = {} + assertionUrn: Urn + + /** + * Granularity of the event if applicable + */ + eventGranularity: optional TimeWindowSize + + /** + * The optional partition specification. + */ + partitionSpec: optional PartitionSpec = { + "type":"FULL_TABLE", + "partition":"FULL_TABLE_SNAPSHOT" + } + + /** + * The optional messageId, if provided serves as a custom user-defined unique identifier for an aspect value. + */ + messageId: optional string } \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionSource.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionSource.pdl new file mode 100644 index 0000000000000..d8892c0c71c6f --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionSource.pdl @@ -0,0 +1,27 @@ +namespace com.linkedin.assertion + +/** + * The source of an assertion + */ +record AssertionSource { + /** + * The type of the Assertion Source + */ + @Searchable = { + "fieldName": "sourceType" + } + type: enum AssertionSourceType { + /** + * The assertion was defined natively on DataHub by a user. + */ + NATIVE + /** + * The assertion was defined and managed externally of DataHub. + */ + EXTERNAL + /** + * The assertion was inferred, e.g. from offline AI / ML models. + */ + INFERRED + } +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionStdAggregation.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionStdAggregation.pdl index b79b96f9379b0..968944165a1c8 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionStdAggregation.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionStdAggregation.pdl @@ -4,6 +4,7 @@ namespace com.linkedin.assertion * The function that is applied to the aggregation input (schema, rows, column values) before evaluating an operator. */ enum AssertionStdAggregation { + /** * Assertion is applied on number of rows. */ @@ -20,7 +21,7 @@ enum AssertionStdAggregation { COLUMN_COUNT /** - * Assertion is applied on individual column value. + * Assertion is applied on individual column value. (No aggregation) */ IDENTITY @@ -42,6 +43,13 @@ enum AssertionStdAggregation { /** * Assertion is applied on proportion of distinct values in column */ + UNIQUE_PROPORTION + + /** + * Assertion is applied on proportion of distinct values in column + * + * Deprecated! Use UNIQUE_PROPORTION instead. + */ UNIQUE_PROPOTION /** diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionValueChangeType.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionValueChangeType.pdl new file mode 100644 index 0000000000000..5a1ff4fa73ffb --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AssertionValueChangeType.pdl @@ -0,0 +1,16 @@ +namespace com.linkedin.assertion + +/** +* An enum to represent a type of change in an assertion value, metric, or measurement. +*/ +enum AssertionValueChangeType { + /** + * A change that is defined in absolute terms. + */ + ABSOLUTE + /** + * A change that is defined in relative terms using percentage change + * from the original value. + */ + PERCENTAGE +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/AuditLogSpec.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/AuditLogSpec.pdl new file mode 100644 index 0000000000000..4d5bf261cbf89 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/AuditLogSpec.pdl @@ -0,0 +1,18 @@ +namespace com.linkedin.assertion + +import com.linkedin.schema.SchemaFieldDataType + +/** +* Information about the Audit Log operation to use in evaluating an assertion. +**/ +record AuditLogSpec { + /** + * The list of operation types that should be monitored. If not provided, a default set will be used. + */ + operationTypes: optional array [string] + + /** + * Optional: The user name associated with the operation. + */ + userName: optional string +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl index c411c7ff8a572..2a8bf28f1ff11 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/DatasetAssertionInfo.pdl @@ -18,9 +18,10 @@ record DatasetAssertionInfo { /** * Scope of the Assertion. What part of the dataset does this assertion apply to? **/ + @Searchable = {} scope: enum DatasetAssertionScope { /** - * This assertion applies to dataset columns + * This assertion applies to dataset column(s) */ DATASET_COLUMN @@ -29,6 +30,11 @@ record DatasetAssertionInfo { */ DATASET_ROWS + /** + * This assertion applies to the storage size of the dataset + */ + DATASET_STORAGE_SIZE + /** * This assertion applies to the schema of the dataset */ @@ -41,7 +47,9 @@ record DatasetAssertionInfo { } /** - * One or more dataset schema fields that are targeted by this assertion + * One or more dataset schema fields that are targeted by this assertion. + * + * This field is expected to be provided if the assertion scope is DATASET_COLUMN. */ @Relationship = { "/*": { @@ -49,11 +57,18 @@ record DatasetAssertionInfo { "entityTypes": [ "schemaField" ] } } + @Searchable = { + "/*": { + "fieldType": "URN" + } + } fields: optional array[Urn] /** * Standardized assertion operator + * This field is left blank if there is no selected aggregation or metric for a particular column. */ + @Searchable = {} aggregation: optional AssertionStdAggregation /** diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/FixedIntervalSchedule.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/FixedIntervalSchedule.pdl new file mode 100644 index 0000000000000..c08c33ffb92d3 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/FixedIntervalSchedule.pdl @@ -0,0 +1,10 @@ +namespace com.linkedin.assertion + +import com.linkedin.common.Urn +import com.linkedin.timeseries.TimeWindowSize + +/** +* Attributes defining a relative fixed interval SLA schedule. +*/ +record FixedIntervalSchedule includes TimeWindowSize { +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessAssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessAssertionInfo.pdl new file mode 100644 index 0000000000000..4445a11ff40a7 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessAssertionInfo.pdl @@ -0,0 +1,53 @@ +namespace com.linkedin.assertion + +import com.linkedin.common.Urn +import com.linkedin.dataset.DatasetFilter + +/** +* Attributes defining a Freshness Assertion. +**/ +record FreshnessAssertionInfo { + /** + * The type of the freshness assertion being monitored. + */ + @Searchable = {} + type: enum FreshnessAssertionType { + /** + * An Freshness based on Operations performed on a particular Dataset (insert, update, delete, etc) and sourced from an audit log, as + * opposed to based on the highest watermark in a timestamp column (e.g. a query). Only valid when entity is of type "dataset". + */ + DATASET_CHANGE + /** + * An Freshness based on a successful execution of a Data Job. + */ + DATA_JOB_RUN + } + + /** + * The entity targeted by this Freshness check. + */ + @Searchable = { + "fieldType": "URN" + } + @Relationship = { + "name": "Asserts", + "entityTypes": [ "dataset", "dataJob" ] + } + entity: Urn + + /** + * Produce FAILURE Assertion Result if the asset is not updated on the cadence and within the time range described by the schedule. + */ + @Searchable = { + "/type": { + "fieldName": "scheduleType" + } + } + schedule: FreshnessAssertionSchedule + + /** + * A definition of the specific filters that should be applied, when performing monitoring. + * If not provided, there is no filter, and the full table is under consideration. + */ + filter: optional DatasetFilter +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessAssertionSchedule.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessAssertionSchedule.pdl new file mode 100644 index 0000000000000..a87342ad4f5ed --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessAssertionSchedule.pdl @@ -0,0 +1,66 @@ +namespace com.linkedin.assertion + +import com.linkedin.common.Urn + +/** +* Attributes defining a single Freshness schedule. +*/ +record FreshnessAssertionSchedule { + + /** + * The type of a Freshness Assertion Schedule. + * + * Once we support data-time-relative schedules (e.g. schedules relative to time partitions), + * we will add those schedule types here. + */ + type: enum FreshnessAssertionScheduleType { + /** + * An highly configurable recurring schedule which describes the times of events described + * by a CRON schedule, with the evaluation schedule assuming to be matching the cron schedule. + * + * In a CRON schedule type, we compute the look-back window to be the time between the last scheduled event + * and the current event (evaluation time). This means that the evaluation schedule must match exactly + * the schedule defined inside the cron schedule. + * + * For example, a CRON schedule defined as "0 8 * * *" would represent a schedule of "every day by 8am". Assuming + * that the assertion evaluation schedule is defined to match this, the freshness assertion would be evaluated in the following way: + * + * 1. Compute the "last scheduled occurrence" of the event using the CRON schedule. For example, yesterday at 8am. + * 2. Compute the bounds of a time window between the "last scheduled occurrence" (yesterday at 8am) until the "current occurrence" (today at 8am) + * 3. Verify that the target event has occurred within the CRON-interval window. + * 4. If the target event has occurred within the time window, then assertion passes. + * 5. If the target event has not occurred within the time window, then the assertion fails. + * + */ + CRON + /** + * A fixed interval which is used to compute a look-back window for use when evaluating the assertion relative + * to the Evaluation Time of the Assertion. + * + * To compute the valid look-back window, we subtract the fixed interval from the evaluation time. Then, we verify + * that the target event has occurred within that window. + * + * For example, a fixed interval of "24h" would represent a schedule of "in the last 24 hours". + * The 24 hour interval is relative to the evaluation time of the assertion. For example if we schedule the assertion + * to be evaluated each hour, we'd compute the result as follows: + * + * 1. Subtract the fixed interval from the current time (Evaluation time) to compute the bounds of a fixed look-back window. + * 2. Verify that the target event has occurred within the CRON-interval window. + * 3. If the target event has occurred within the time window, then assertion passes. + * 4. If the target event has not occurred within the time window, then the assertion fails. + * + */ + FIXED_INTERVAL + } + + /** + * A cron schedule. This field is required when type is CRON. + */ + cron: optional FreshnessCronSchedule + + /** + * A fixed interval schedule. This field is required when type is FIXED_INTERVAL. + */ + fixedInterval: optional FixedIntervalSchedule + +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessCronSchedule.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessCronSchedule.pdl new file mode 100644 index 0000000000000..d48900690c51d --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessCronSchedule.pdl @@ -0,0 +1,25 @@ +namespace com.linkedin.assertion + +/** +* Attributes defining a CRON-formatted schedule used for defining a freshness assertion. +*/ +record FreshnessCronSchedule { + /** + * A cron-formatted execution interval, as a cron string, e.g. 1 * * * * + */ + cron: string + + /** + * Timezone in which the cron interval applies, e.g. America/Los Angeles + */ + timezone: string + + /** + * An optional offset in milliseconds to SUBTRACT from the timestamp generated by the cron schedule + * to generate the lower bounds of the "freshness window", or the window of time in which an event must have occurred in order for the Freshness check + * to be considering passing. + * + * If left empty, the start of the SLA window will be the _end_ of the previously evaluated Freshness window. + */ + windowStartOffsetMs: optional long +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessFieldKind.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessFieldKind.pdl new file mode 100644 index 0000000000000..7b25589e500da --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessFieldKind.pdl @@ -0,0 +1,17 @@ +namespace com.linkedin.assertion + +enum FreshnessFieldKind { + /** + * Determine that a change has occurred by inspecting an last modified field which + * represents the last time at which a row was changed. + */ + LAST_MODIFIED, + /** + * Determine that a change has occurred by inspecting a field which should be tracked as the + * "high watermark" for the table. This should be an ascending number or date field. + * + * If rows with this column have not been added since the previous check + * then the Freshness Assertion will fail. + */ + HIGH_WATERMARK +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessFieldSpec.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessFieldSpec.pdl new file mode 100644 index 0000000000000..04acd1c71352d --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/FreshnessFieldSpec.pdl @@ -0,0 +1,14 @@ +namespace com.linkedin.assertion + +import com.linkedin.schema.SchemaFieldSpec + + +/** +* Lightweight spec used for referencing a particular schema field. +**/ +record FreshnessFieldSpec includes SchemaFieldSpec { + /** + * The type of the field being used to verify the Freshness Assertion. + */ + kind: optional FreshnessFieldKind +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentFieldTransformer.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentFieldTransformer.pdl new file mode 100644 index 0000000000000..d1d3e7b23b666 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentFieldTransformer.pdl @@ -0,0 +1,60 @@ +namespace com.linkedin.assertion + +/** +* The definition of the transformer function that should be applied to a given field / column value in a dataset +* in order to determine the segment or bucket that it belongs to, which in turn is used to evaluate +* volume assertions. +*/ +record IncrementingSegmentFieldTransformer { + /** + * A 'standard' transformer type. Note that not all source systems will support all operators. + */ + type: enum IncrementingSegmentFieldTransformerType { + /** + * Rounds a timestamp (in seconds) down to the start of the month. + */ + TIMESTAMP_MS_TO_MINUTE + + /** + * Rounds a timestamp (in milliseconds) down to the nearest hour. + */ + TIMESTAMP_MS_TO_HOUR + + /** + * Rounds a timestamp (in milliseconds) down to the start of the day. + */ + TIMESTAMP_MS_TO_DATE + + /** + * Rounds a timestamp (in milliseconds) down to the start of the month + */ + TIMESTAMP_MS_TO_MONTH + + /** + * Rounds a timestamp (in milliseconds) down to the start of the year + */ + TIMESTAMP_MS_TO_YEAR + + /** + * Rounds a numeric value down to the nearest integer. + */ + FLOOR + + /** + * Rounds a numeric value up to the nearest integer. + */ + CEILING + + /** + * A backdoor to provide a native operator type specific to a given source system like + * Snowflake, Redshift, BQ, etc. + */ + NATIVE + } + + /** + * The 'native' transformer type, useful as a back door if a custom operator is required. + * This field is required if the type is NATIVE. + */ + nativeType: optional string +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentRowCountChange.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentRowCountChange.pdl new file mode 100644 index 0000000000000..7c4c73f2ea887 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentRowCountChange.pdl @@ -0,0 +1,33 @@ +namespace com.linkedin.assertion + + +/** +* Attributes defining an INCREMENTING_SEGMENT_ROW_COUNT_CHANGE volume assertion. +*/ +record IncrementingSegmentRowCountChange { + /** + * A specification of how the 'segment' can be derived using a column and an optional transformer function. + */ + segment: IncrementingSegmentSpec + + /** + * The type of the value used to evaluate the assertion: a fixed absolute value or a relative percentage. + */ + type: AssertionValueChangeType + + /** + * The operator you'd like to apply to the row count value + * + * Note that only numeric operators are valid inputs: + * GREATER_THAN, GREATER_THAN_OR_EQUAL_TO, EQUAL_TO, LESS_THAN, LESS_THAN_OR_EQUAL_TO, + * BETWEEN. + */ + operator: AssertionStdOperator + + /** + * The parameters you'd like to provide as input to the operator. + * + * Note that only numeric parameter types are valid inputs: NUMBER. + */ + parameters: AssertionStdParameters +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentRowCountTotal.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentRowCountTotal.pdl new file mode 100644 index 0000000000000..6b035107aae09 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentRowCountTotal.pdl @@ -0,0 +1,27 @@ +namespace com.linkedin.assertion + +/** +* Attributes defining an INCREMENTING_SEGMENT_ROW_COUNT_TOTAL volume assertion. +*/ +record IncrementingSegmentRowCountTotal { + /** + * A specification of how the 'segment' can be derived using a column and an optional transformer function. + */ + segment: IncrementingSegmentSpec + + /** + * The operator you'd like to apply. + * + * Note that only numeric operators are valid inputs: + * GREATER_THAN, GREATER_THAN_OR_EQUAL_TO, EQUAL_TO, LESS_THAN, LESS_THAN_OR_EQUAL_TO, + * BETWEEN. + */ + operator: AssertionStdOperator + + /** + * The parameters you'd like to provide as input to the operator. + * + * Note that only numeric parameter types are valid inputs: NUMBER. + */ + parameters: AssertionStdParameters +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentSpec.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentSpec.pdl new file mode 100644 index 0000000000000..eddd0c3da3df7 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/IncrementingSegmentSpec.pdl @@ -0,0 +1,33 @@ +namespace com.linkedin.assertion + +import com.linkedin.schema.SchemaFieldSpec + +/** +* Core attributes required to identify an incrementing segment in a table. This type is mainly useful +* for tables that constantly increase with new rows being added on a particular cadence (e.g. fact or event tables) +* +* An incrementing segment represents a logical chunk of data which is INSERTED +* into a dataset on a regular interval, along with the presence of a constantly-incrementing column +* value such as an event time, date partition, or last modified column. +* +* An incrementing segment is principally identified by 2 key attributes combined: +* +* 1. A field or column that represents the incrementing value. New rows that are inserted will be identified using this column. +* Note that the value of this column may not by itself represent the "bucket" or the "segment" in which the row falls. +* +* 2. [Optional] An transformer function that may be applied to the selected column value in order +* to obtain the final "segment identifier" or "bucket identifier". Rows that have the same value after applying the transformation +* will be grouped into the same segment, using which the final value (e.g. row count) will be determined. +*/ +record IncrementingSegmentSpec { + /** + * The field to use to generate segments. It must be constantly incrementing as new rows are inserted. + */ + field: SchemaFieldSpec + + /** + * Optional transformer function to apply to the field in order to obtain the final segment or bucket identifier. + * If not provided, then no operator will be applied to the field. (identity function) + */ + transformer: optional IncrementingSegmentFieldTransformer +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/RowCountChange.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/RowCountChange.pdl new file mode 100644 index 0000000000000..85a915066f584 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/RowCountChange.pdl @@ -0,0 +1,27 @@ +namespace com.linkedin.assertion + +/** +* Attributes defining a ROW_COUNT_CHANGE volume assertion. +*/ +record RowCountChange { + /** + * The type of the value used to evaluate the assertion: a fixed absolute value or a relative percentage. + */ + type: AssertionValueChangeType + + /** + * The operator you'd like to apply. + * + * Note that only numeric operators are valid inputs: + * GREATER_THAN, GREATER_THAN_OR_EQUAL_TO, EQUAL_TO, LESS_THAN, LESS_THAN_OR_EQUAL_TO, + * BETWEEN. + */ + operator: AssertionStdOperator + + /** + * The parameters you'd like to provide as input to the operator. + * + * Note that only numeric parameter types are valid inputs: NUMBER. + */ + parameters: AssertionStdParameters +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/RowCountTotal.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/RowCountTotal.pdl new file mode 100644 index 0000000000000..f691f15f62e04 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/RowCountTotal.pdl @@ -0,0 +1,22 @@ +namespace com.linkedin.assertion + +/** +* Attributes defining a ROW_COUNT_TOTAL volume assertion. +*/ +record RowCountTotal { + /** + * The operator you'd like to apply. + * + * Note that only numeric operators are valid inputs: + * GREATER_THAN, GREATER_THAN_OR_EQUAL_TO, EQUAL_TO, LESS_THAN, LESS_THAN_OR_EQUAL_TO, + * BETWEEN. + */ + operator: AssertionStdOperator + + /** + * The parameters you'd like to provide as input to the operator. + * + * Note that only numeric parameter types are valid inputs: NUMBER. + */ + parameters: AssertionStdParameters +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/SchemaAssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/SchemaAssertionInfo.pdl new file mode 100644 index 0000000000000..fd246e0c7cfc4 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/SchemaAssertionInfo.pdl @@ -0,0 +1,29 @@ +namespace com.linkedin.assertion + +import com.linkedin.common.Urn +import com.linkedin.schema.SchemaMetadata + +/** +* Attributes that are applicable to schema assertions +**/ +record SchemaAssertionInfo { + /** + * The entity targeted by the assertion + */ + @Searchable = { + "fieldType": "URN" + } + @Relationship = { + "name": "Asserts", + "entityTypes": [ "dataset", "dataJob" ] + } + entity: Urn + + /** + * A definition of the expected structure for the asset + * + * Note that many of the fields of this model, especially those related to metadata (tags, terms) + * will go unused in this context. + */ + schema: SchemaMetadata +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/assertion/VolumeAssertionInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/assertion/VolumeAssertionInfo.pdl new file mode 100644 index 0000000000000..327b76f95762e --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/assertion/VolumeAssertionInfo.pdl @@ -0,0 +1,82 @@ +namespace com.linkedin.assertion + +import com.linkedin.common.Urn +import com.linkedin.dataset.DatasetFilter + +/** +* Attributes defining a dataset Volume Assertion +*/ +record VolumeAssertionInfo { + /** + * The type of the freshness assertion being monitored. + */ + @Searchable = {} + type: enum VolumeAssertionType { + /** + * A volume assertion that is evaluated against the total row count of a dataset. + */ + ROW_COUNT_TOTAL + /** + * A volume assertion that is evaluated against an incremental row count of a dataset, + * or a row count change. + */ + ROW_COUNT_CHANGE + /** + * A volume assertion that checks the latest "segment" in a table based on an incrementing + * column to check whether it's row count falls into a particular range. + * + * This can be used to monitor the row count of an incrementing date-partition column segment. + */ + INCREMENTING_SEGMENT_ROW_COUNT_TOTAL + /** + * A volume assertion that compares the row counts in neighboring "segments" or "partitions" + * of an incrementing column. + * This can be used to track changes between subsequent date partition + * in a table, for example. + */ + INCREMENTING_SEGMENT_ROW_COUNT_CHANGE + } + + /** + * The entity targeted by this Volume check. + */ + @Searchable = { + "fieldType": "URN" + } + @Relationship = { + "name": "Asserts", + "entityTypes": [ "dataset" ] + } + entity: Urn + + /** + * Produce FAILURE Assertion Result if the row count of the asset does not meet specific requirements. + * Required if type is 'ROW_COUNT_TOTAL' + */ + rowCountTotal: optional RowCountTotal + + /** + * Produce FAILURE Assertion Result if the delta row count of the asset does not meet specific requirements + * within a given period of time. + * Required if type is 'ROW_COUNT_CHANGE' + */ + rowCountChange: optional RowCountChange + + /** + * Produce FAILURE Assertion Result if the asset's latest incrementing segment row count total + * does not meet specific requirements. Required if type is 'INCREMENTING_SEGMENT_ROW_COUNT_TOTAL' + */ + incrementingSegmentRowCountTotal: optional IncrementingSegmentRowCountTotal + + /** + * Produce FAILURE Assertion Result if the asset's incrementing segment row count delta + * does not meet specific requirements. Required if type is 'INCREMENTING_SEGMENT_ROW_COUNT_CHANGE' + */ + incrementingSegmentRowCountChange: optional IncrementingSegmentRowCountChange + + /** + * A definition of the specific filters that should be applied, when performing monitoring. + * If not provided, there is no filter, and the full table is under consideration. + */ + filter: optional DatasetFilter +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataContractProperties.pdl b/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataContractProperties.pdl new file mode 100644 index 0000000000000..a623f585df30c --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataContractProperties.pdl @@ -0,0 +1,59 @@ +namespace com.linkedin.datacontract + +import com.linkedin.common.Urn + +/** + * Information about a data contract + */ +@Aspect = { + "name": "dataContractProperties" +} +record DataContractProperties { + /** + * The entity that this contract is associated with. Currently, we only support Dataset contracts, but + * in the future we may also support Data Product level contracts. + */ + @Relationship = { + "name": "ContractFor", + "entityTypes": [ "dataset" ] + } + entity: Urn + + /** + * An optional set of schema contracts. If this is a dataset contract, there will only be one. + */ + @Relationship = { + "/*/assertion": { + "name": "IncludesSchemaAssertion", + "entityTypes": [ "assertion" ] + } + } + schema: optional array[SchemaContract] + + /** + * An optional set of FRESHNESS contracts. If this is a dataset contract, there will only be one. + */ + @Relationship = { + "/*/assertion": { + "name": "IncludesFreshnessAssertion", + "entityTypes": [ "assertion" ] + } + } + freshness: optional array[FreshnessContract] + + /** + * An optional set of Data Quality contracts, e.g. table and column level contract constraints. + */ + @Relationship = { + "/*/assertion": { + "name": "IncludesDataQualityAssertion", + "entityTypes": [ "assertion" ] + } + } + dataQuality: optional array[DataQualityContract] + + /** + * YAML-formatted contract definition + */ + rawContract: optional string +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataContractStatus.pdl b/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataContractStatus.pdl new file mode 100644 index 0000000000000..d61fb191ae53d --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataContractStatus.pdl @@ -0,0 +1,27 @@ +namespace com.linkedin.datacontract + +import com.linkedin.common.Urn +import com.linkedin.common.CustomProperties + +/** + * Information about the status of a data contract + */ +@Aspect = { + "name": "dataContractStatus" +} +record DataContractStatus includes CustomProperties { + /** + * The latest state of the data contract + */ + @Searchable = {} + state: enum DataContractState { + /** + * The data contract is active. + */ + ACTIVE + /** + * The data contract is pending implementation. + */ + PENDING + } +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataQualityContract.pdl b/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataQualityContract.pdl new file mode 100644 index 0000000000000..273d2c2a56f95 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/datacontract/DataQualityContract.pdl @@ -0,0 +1,16 @@ +namespace com.linkedin.datacontract + +import com.linkedin.common.Urn + + +/** + * A data quality contract pertaining to a physical data asset + * Data Quality contracts are used to make assertions about data quality metrics for a physical data asset + */ +record DataQualityContract { + /** + * The assertion representing the Data Quality contract. + * E.g. a table or column-level assertion. + */ + assertion: Urn +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/datacontract/FreshnessContract.pdl b/metadata-models/src/main/pegasus/com/linkedin/datacontract/FreshnessContract.pdl new file mode 100644 index 0000000000000..8cfa66846d505 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/datacontract/FreshnessContract.pdl @@ -0,0 +1,13 @@ +namespace com.linkedin.datacontract + +import com.linkedin.common.Urn + +/** + * A contract pertaining to the operational SLAs of a physical data asset + */ +record FreshnessContract { + /** + * The assertion representing the SLA contract. + */ + assertion: Urn +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/datacontract/SchemaContract.pdl b/metadata-models/src/main/pegasus/com/linkedin/datacontract/SchemaContract.pdl new file mode 100644 index 0000000000000..6c11e0da5b128 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/datacontract/SchemaContract.pdl @@ -0,0 +1,13 @@ +namespace com.linkedin.datacontract + +import com.linkedin.common.Urn + +/** + * Expectations for a logical schema + */ +record SchemaContract { + /** + * The assertion representing the schema contract. + */ + assertion: Urn +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/dataset/DatasetFilter.pdl b/metadata-models/src/main/pegasus/com/linkedin/dataset/DatasetFilter.pdl new file mode 100644 index 0000000000000..6823398f79f3d --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/dataset/DatasetFilter.pdl @@ -0,0 +1,30 @@ +namespace com.linkedin.dataset + +/** + * A definition of filters that should be used when + * querying an external Dataset or Table. + * + * Note that this models should NOT be used for working with + * search / filter on DataHub Platform itself. + */ +record DatasetFilter { + /** + * How the partition will be represented in this model. + * + * In the future, we'll likely add support for more structured + * predicates. + */ + type: enum DatasetFilterType { + /** + * The partition is represented as a an opaque, raw SQL + * clause. + */ + SQL + } + + /** + * The raw where clause string which will be used for monitoring. + * Required if the type is SQL. + */ + sql: optional string +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/key/DataContractKey.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/key/DataContractKey.pdl new file mode 100644 index 0000000000000..f1d4a709cd6bf --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/key/DataContractKey.pdl @@ -0,0 +1,14 @@ +namespace com.linkedin.metadata.key + +/** + * Key for a Data Contract + */ +@Aspect = { + "name": "dataContractKey" +} +record DataContractKey { + /** + * Unique id for the contract + */ + id: string +} \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/schema/SchemaFieldSpec.pdl b/metadata-models/src/main/pegasus/com/linkedin/schema/SchemaFieldSpec.pdl new file mode 100644 index 0000000000000..e875ff7a84403 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/schema/SchemaFieldSpec.pdl @@ -0,0 +1,21 @@ +namespace com.linkedin.schema + +/** +* Lightweight spec used for referencing a particular schema field. +**/ +record SchemaFieldSpec { + /** + * The field path + */ + path: string + + /** + * The DataHub standard schema field type. + */ + type: string + + /** + * The native field type + */ + nativeType: string +} \ No newline at end of file diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index 56fc5f6568eb7..11d0f74305d7b 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -262,6 +262,7 @@ entities: - assertionInfo - dataPlatformInstance - assertionRunEvent + - assertionActions - status - name: dataHubRetention category: internal @@ -457,4 +458,12 @@ entities: aspects: - ownershipTypeInfo - status + - name: dataContract + category: core + keyAspect: dataContractKey + aspects: + - dataContractProperties + - dataContractStatus + - status + events: