Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli/datacontract): Add data quality assertion support #8968

Merged
merged 12 commits into from
Oct 13, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Optional

from datahub.configuration import ConfigModel


class BaseAssertion(ConfigModel):
description: Optional[str] = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from typing import Optional, Union

from typing_extensions import Literal, Protocol

from datahub.configuration import ConfigModel
from datahub.metadata.schema_classes import (
AssertionStdOperatorClass,
AssertionStdParameterClass,
AssertionStdParametersClass,
AssertionStdParameterTypeClass,
)


class Operator(Protocol):
"""Specification for an assertion operator.

This class exists only for documentation (not used in typing checking).
"""

operator: str

def id(self) -> str:
...

def generate_parameters(self) -> AssertionStdParametersClass:
...


def _generate_assertion_std_parameter(
value: Union[str, int, float]
) -> AssertionStdParameterClass:
if isinstance(value, str):
return AssertionStdParameterClass(
value=value, type=AssertionStdParameterTypeClass.STRING
)
elif isinstance(value, (int, float)):
return AssertionStdParameterClass(
value=str(value), type=AssertionStdParameterTypeClass.NUMBER
)
else:
raise ValueError(
f"Unsupported assertion parameter {value} of type {type(value)}"
)


Param = Union[str, int, float]


def _generate_assertion_std_parameters(
value: Optional[Param] = None,
min_value: Optional[Param] = None,
max_value: Optional[Param] = None,
) -> AssertionStdParametersClass:
return AssertionStdParametersClass(
value=_generate_assertion_std_parameter(value) if value else None,
minValue=_generate_assertion_std_parameter(min_value) if min_value else None,
maxValue=_generate_assertion_std_parameter(max_value) if max_value else None,
)


class EqualToOperator(ConfigModel):
type: Literal["equal_to"]
value: Union[str, int, float]

operator: str = AssertionStdOperatorClass.EQUAL_TO

def id(self) -> str:
return f"{self.type}-{self.value}"

def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class BetweenOperator(ConfigModel):
type: Literal["between"]
min: Union[int, float]
max: Union[int, float]

operator: str = AssertionStdOperatorClass.BETWEEN

def id(self) -> str:
return f"{self.type}-{self.min}-{self.max}"

def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(
min_value=self.min, max_value=self.max
)


class LessThanOperator(ConfigModel):
type: Literal["less_than"]
value: Union[int, float]

operator: str = AssertionStdOperatorClass.LESS_THAN

def id(self) -> str:
return f"{self.type}-{self.value}"

def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class GreaterThanOperator(ConfigModel):
type: Literal["greater_than"]
value: Union[int, float]

operator: str = AssertionStdOperatorClass.GREATER_THAN

def id(self) -> str:
return f"{self.type}-{self.value}"

def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class LessThanOrEqualToOperator(ConfigModel):
type: Literal["less_than_or_equal_to"]
value: Union[int, float]

operator: str = AssertionStdOperatorClass.LESS_THAN_OR_EQUAL_TO

def id(self) -> str:
return f"{self.type}-{self.value}"

def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class GreaterThanOrEqualToOperator(ConfigModel):
type: Literal["greater_than_or_equal_to"]
value: Union[int, float]

operator: str = AssertionStdOperatorClass.GREATER_THAN_OR_EQUAL_TO

def id(self) -> str:
return f"{self.type}-{self.value}"

def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class NotNullOperator(ConfigModel):
type: Literal["not_null"]

operator: str = AssertionStdOperatorClass.NOT_NULL

def id(self) -> str:
return f"{self.type}"

def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters()


Operators = Union[
EqualToOperator,
BetweenOperator,
LessThanOperator,
LessThanOrEqualToOperator,
GreaterThanOperator,
GreaterThanOrEqualToOperator,
NotNullOperator,
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing_extensions import Literal

import datahub.emitter.mce_builder as builder
from datahub.api.entities.datacontract.assertion import BaseAssertion
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also support Volume Assertions + Freshness Assertions?

Or are these already supported

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Freshness is supported I believe, or at least a subset of it. Volume not yet

from datahub.api.entities.datacontract.assertion_operator import Operators
from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
Expand All @@ -14,12 +16,15 @@
AssertionStdParametersClass,
AssertionStdParameterTypeClass,
AssertionTypeClass,
AssertionValueChangeTypeClass,
DatasetAssertionInfoClass,
DatasetAssertionScopeClass,
SqlAssertionInfoClass,
SqlAssertionTypeClass,
)


class IdConfigMixin(ConfigModel):
class IdConfigMixin(BaseAssertion):
id_raw: Optional[str] = pydantic.Field(
default=None,
alias="id",
Expand All @@ -30,25 +35,32 @@ def generate_default_id(self) -> str:
raise NotImplementedError


class CustomSQLAssertion(IdConfigMixin, ConfigModel):
class CustomSQLAssertion(IdConfigMixin, BaseAssertion):
type: Literal["custom_sql"]

sql: str
operator: Operators = pydantic.Field(discriminator="type")

def generate_dataset_assertion_info(
self, entity_urn: str
) -> DatasetAssertionInfoClass:
return DatasetAssertionInfoClass(
dataset=entity_urn,
scope=DatasetAssertionScopeClass.UNKNOWN,
fields=[],
operator=AssertionStdOperatorClass._NATIVE_,
aggregation=AssertionStdAggregationClass._NATIVE_,
logic=self.sql,
def generate_default_id(self) -> str:
return f"{self.type}-{self.sql}-{self.operator.id()}"

def generate_assertion_info(self, entity_urn: str) -> AssertionInfoClass:
sql_assertion_info = SqlAssertionInfoClass(
entity=entity_urn,
statement=self.sql,
operator=self.operator.operator,
parameters=self.operator.generate_parameters(),
# TODO: Support other types of assertions
type=SqlAssertionTypeClass.METRIC,
changeType=AssertionValueChangeTypeClass.ABSOLUTE,
)
return AssertionInfoClass(
type=AssertionTypeClass.SQL,
sqlAssertion=sql_assertion_info,
description=self.description,
)


class ColumnUniqueAssertion(IdConfigMixin, ConfigModel):
class ColumnUniqueAssertion(IdConfigMixin, BaseAssertion):
type: Literal["unique"]

# TODO: support multiple columns?
Expand All @@ -57,10 +69,8 @@ class ColumnUniqueAssertion(IdConfigMixin, ConfigModel):
def generate_default_id(self) -> str:
return f"{self.type}-{self.column}"

def generate_dataset_assertion_info(
self, entity_urn: str
) -> DatasetAssertionInfoClass:
return DatasetAssertionInfoClass(
def generate_assertion_info(self, entity_urn: str) -> AssertionInfoClass:
dataset_assertion_info = DatasetAssertionInfoClass(
dataset=entity_urn,
scope=DatasetAssertionScopeClass.DATASET_COLUMN,
fields=[builder.make_schema_field_urn(entity_urn, self.column)],
Expand All @@ -72,6 +82,11 @@ def generate_dataset_assertion_info(
)
),
)
return AssertionInfoClass(
type=AssertionTypeClass.DATASET,
datasetAssertion=dataset_assertion_info,
description=self.description,
)


class DataQualityAssertion(ConfigModel):
Expand All @@ -92,16 +107,9 @@ def id(self) -> str:
def generate_mcp(
self, assertion_urn: str, entity_urn: str
) -> List[MetadataChangeProposalWrapper]:
dataset_assertion_info = self.__root__.generate_dataset_assertion_info(
entity_urn
)

return [
MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=AssertionInfoClass(
type=AssertionTypeClass.DATASET,
datasetAssertion=dataset_assertion_info,
),
aspect=self.__root__.generate_assertion_info(entity_urn),
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DataContract(ConfigModel):
freshness: Optional[FreshnessAssertion] = pydantic.Field(default=None)

# TODO: Add a validator to ensure that ids are unique
data_quality: Optional[List[DataQualityAssertion]] = None
data_quality: Optional[List[DataQualityAssertion]] = pydantic.Field(default=None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting - these two are equivalent right?


_original_yaml_dict: Optional[dict] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pydantic
from typing_extensions import Literal

from datahub.api.entities.datacontract.assertion import BaseAssertion
from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
Expand All @@ -21,7 +22,7 @@
)


class CronFreshnessAssertion(ConfigModel):
class CronFreshnessAssertion(BaseAssertion):
type: Literal["cron"]

cron: str = pydantic.Field(
Expand All @@ -32,12 +33,30 @@ class CronFreshnessAssertion(ConfigModel):
description="The timezone to use for the cron schedule. Defaults to UTC.",
)

def generate_freshness_assertion_schedule(self) -> FreshnessAssertionScheduleClass:
return FreshnessAssertionScheduleClass(
type=FreshnessAssertionScheduleTypeClass.CRON,
cron=FreshnessCronScheduleClass(
cron=self.cron,
timezone=self.timezone,
),
)


class FixedIntervalFreshnessAssertion(ConfigModel):
class FixedIntervalFreshnessAssertion(BaseAssertion):
type: Literal["interval"]

interval: timedelta

def generate_freshness_assertion_schedule(self) -> FreshnessAssertionScheduleClass:
return FreshnessAssertionScheduleClass(
type=FreshnessAssertionScheduleTypeClass.FIXED_INTERVAL,
fixedInterval=FixedIntervalScheduleClass(
unit=CalendarIntervalClass.SECOND,
multiple=int(self.interval.total_seconds()),
),
)


class FreshnessAssertion(ConfigModel):
__root__: Union[
Expand All @@ -51,36 +70,13 @@ def id(self):
def generate_mcp(
self, assertion_urn: str, entity_urn: str
) -> List[MetadataChangeProposalWrapper]:
freshness = self.__root__

if isinstance(freshness, CronFreshnessAssertion):
schedule = FreshnessAssertionScheduleClass(
type=FreshnessAssertionScheduleTypeClass.CRON,
cron=FreshnessCronScheduleClass(
cron=freshness.cron,
timezone=freshness.timezone,
),
)
elif isinstance(freshness, FixedIntervalFreshnessAssertion):
schedule = FreshnessAssertionScheduleClass(
type=FreshnessAssertionScheduleTypeClass.FIXED_INTERVAL,
fixedInterval=FixedIntervalScheduleClass(
unit=CalendarIntervalClass.SECOND,
multiple=int(freshness.interval.total_seconds()),
),
)
else:
raise ValueError(f"Unknown freshness type {freshness}")

assertionInfo = AssertionInfoClass(
aspect = AssertionInfoClass(
type=AssertionTypeClass.FRESHNESS,
freshnessAssertion=FreshnessAssertionInfoClass(
entity=entity_urn,
type=FreshnessAssertionTypeClass.DATASET_CHANGE,
schedule=schedule,
schedule=self.__root__.generate_freshness_assertion_schedule(),
),
description=self.__root__.description,
)

return [
MetadataChangeProposalWrapper(entityUrn=assertion_urn, aspect=assertionInfo)
]
return [MetadataChangeProposalWrapper(entityUrn=assertion_urn, aspect=aspect)]
Loading
Loading