Skip to content

Commit

Permalink
add delta validation
Browse files Browse the repository at this point in the history
  • Loading branch information
subhankarb committed Oct 20, 2024
1 parent ba9e850 commit b7d09cb
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 89 deletions.
45 changes: 22 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,8 @@ pip install dcs-core -U
### Create the config file

With a simple config file, you can generate data quality reports for your data sources. Below is the sample config example.
For more details, please visit the [config guide](https://docs.datachecks.io/configuration/metric_configuration/)
For more details, please visit the [config guide](https://docs.datachecks.io/dcs-oss/config/validation-config)

<p align="center">
<img alt="why_data_observability" src="docs/assets/datachecks_config.png" width="800">
</p>

### Run from CLI

Expand All @@ -93,33 +90,35 @@ dcs-core inspect -C config.yaml
dcs-core inspect -C config.yaml --html-report
```

Please visit the [Quick Start Guide](https://docs.datachecks.io/getting_started/)
Please visit the [Quick Start Guide](https://docs.datachecks.io/dcs-oss/introduction/getting-started)

## Supported Data Sources

Datachecks supports sql and search data sources. Below are the list of supported data sources.

| Data Source | Type | Supported |
| ----------------------------------------------------------------------- | ---------------------- | ---------- |
| [Postgres](https://docs.datachecks.io/integrations/postgres/) | Transactional Database | :thumbsup: |
| [MySql](https://docs.datachecks.io/integrations/mysql/) | Transactional Database | :thumbsup: |
| [MS SQL Server](https://docs.datachecks.io/integrations/mssql/) | Transactional Database | :thumbsup: |
| [OpenSearch](https://docs.datachecks.io/integrations/opensearch/) | Search Engine | :thumbsup: |
| [Elasticsearch](https://docs.datachecks.io/integrations/elasticsearch/) | Search Engine | :thumbsup: |
| [GCP BigQuery](https://docs.datachecks.io/integrations/bigquery/) | Data Warehouse | :thumbsup: |
| [DataBricks](https://docs.datachecks.io/integrations/databricks/) | Data Warehouse | :thumbsup: |
| [Snowflake](https://docs.datachecks.io/integrations/snowflake/) | Data Warehouse | :thumbsup: |
| AWS RedShift | Data Warehouse | :x: |
| Data Source | Type | Supported |
|---------------------------------------------------------------------------------------|------------------------|------------|
| [Postgres](https://docs.datachecks.io/dcs-oss/integrations/transactional/postgres) | Transactional Database | :thumbsup: |
| [MySql](https://docs.datachecks.io/dcs-oss/integrations/transactional/mysql) | Transactional Database | :thumbsup: |
| [MS SQL Server](https://docs.datachecks.io/dcs-oss/integrations/transactional/mssql) | Transactional Database | :thumbsup: |
| [Oracle](https://docs.datachecks.io/dcs-oss/integrations/transactional/oracle) | Transactional Database | :thumbsup: |
| [DB2](https://docs.datachecks.io/dcs-oss/integrations/transactional/db2) | Transactional Database | :thumbsup: |
| [OpenSearch](https://docs.datachecks.io/dcs-oss/integrations/search/opensearch) | Search Engine | :thumbsup: |
| [Elasticsearch](https://docs.datachecks.io/dcs-oss/integrations/search/elasticsearch) | Search Engine | :thumbsup: |
| [GCP BigQuery](https://docs.datachecks.io/dcs-oss/integrations/warehouse/bigquery) | Data Warehouse | :thumbsup: |
| [DataBricks](https://docs.datachecks.io/dcs-oss/integrations/warehouse/databricks) | Data Warehouse | :thumbsup: |
| [Snowflake](https://docs.datachecks.io/dcs-oss/integrations/warehouse/snowflake) | Data Warehouse | :thumbsup: |
| [AWS RedShift](https://docs.datachecks.io/dcs-oss/integrations/warehouse/redshift) | Data Warehouse | :thumbsup: |

## Metric Types

| Validation Funtions | Description |
|-----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------|
| **[Reliability](https://docs.datachecks.io/validations/reliability/)** | Reliability functions detect whether tables/indices/collections are updating with timely data |
| **[Numeric Distribution](https://docs.datachecks.io/validations/numeric_distribution/)** | Numeric Distribution functions detect changes in the numeric distributions i.e. of values, variance, skew and more |
| **[Uniqueness](https://docs.datachecks.io/validations/uniqueness/)** | Uniqueness functions detect when data constraints are breached like duplicates, number of distinct values etc |
| **[Completeness](https://docs.datachecks.io/validations/completeness/)** | Completeness functions detect when there are missing values in datasets i.e. Null, empty value |
| **[Validity](https://docs.datachecks.io/validations/validity/)** | Validity functions detect whether data is formatted correctly and represents a valid value |
| Validation Funtions | Description |
|------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------|
| **[Reliability](https://docs.datachecks.io/dcs-oss/validations/reliability)** | Reliability functions detect whether tables/indices/collections are updating with timely data |
| **[Numeric Distribution](https://docs.datachecks.io/dcs-oss/validations/distribution)** | Numeric Distribution functions detect changes in the numeric distributions i.e. of values, variance, skew and more |
| **[Uniqueness](https://docs.datachecks.io/dcs-oss/validations/uniqueness)** | Uniqueness functions detect when data constraints are breached like duplicates, number of distinct values etc |
| **[Completeness](https://docs.datachecks.io/dcs-oss/validations/completeness)** | Completeness functions detect when there are missing values in datasets i.e. Null, empty value |
| **[Validity](https://docs.datachecks.io/dcs-oss/validations/pattern-matching)** | Validity functions detect whether data is formatted correctly and represents a valid value |

## Overview

Expand Down
2 changes: 1 addition & 1 deletion dcs_core/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def inspect(
Starts the datachecks inspection
"""
try:
is_file_exists = os.path.isfile(config_path)
is_file_exists = os.path.exists(config_path)
if not is_file_exists:
raise Exception(
f"Invalid value for '-C' / '--config-path': File '{config_path}' does not exist."
Expand Down
66 changes: 58 additions & 8 deletions dcs_core/core/common/models/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from markdown_it.rules_block import reference

from dcs_core.core.common.models.data_source_resource import Field, Index, Table
from dcs_core.core.common.models.metric import MetricsType
from dcs_core.core.common.models.validation import (
Expand Down Expand Up @@ -99,6 +101,21 @@ class ValidationConfig:
query: Optional[str] = None
regex: Optional[str] = None
values: Optional[List] = None
ref: Optional[str] = None

def _ref_field_validation(self):
if self.ref is not None:
reference_resources = self.ref.strip().split(".")
if len(reference_resources) < 2 or len(reference_resources) > 3:
raise ValueError(
"ref field should be in the format of <datasource_name>.<dataset_name>.<field_name>"
)
self._ref_data_source_name = reference_resources[0]
self._ref_dataset_name = reference_resources[1]
self._ref_field_name = None

if len(reference_resources) == 3:
self._ref_field_name = reference_resources[2]

def _on_field_validation(self):
if self.on is None:
Expand All @@ -108,15 +125,25 @@ def _on_field_validation(self):
ValidationFunction.COUNT_ROWS,
ValidationFunction.COUNT_DOCUMENTS,
ValidationFunction.CUSTOM_SQL,
ValidationFunction.COMPARE_COUNT_ROWS,
ValidationFunction.DELTA_COUNT_ROWS,
]
if self.on.strip() not in dataset_validation_functions:

if self.on.strip().startswith("delta"):
self._is_delta_validation = True
on_statement = re.search(r"^delta\s+(.+)", self.on.strip()).group(1)
else:
self._is_delta_validation = False
on_statement = self.on.strip()

if on_statement not in dataset_validation_functions:
self._validation_function_type = ValidationFunctionType.FIELD
if not re.match(r"^(\w+)\(([ \w-]+)\)$", self.on.strip()):
raise ValueError(f"on field must be a valid function, was {self.on}")
if not re.match(r"^(\w+)\(([ \w-]+)\)$", on_statement):
raise ValueError(
f"on field must be a valid function, was {on_statement}"
)
else:
column_validation_function = re.search(
r"^(\w+)\(([ \w-]+)\)$", self.on.strip()
r"^(\w+)\(([ \w-]+)\)$", on_statement
).group(1)

if column_validation_function not in [v for v in ValidationFunction]:
Expand All @@ -130,23 +157,46 @@ def _on_field_validation(self):
)

self._validation_function = ValidationFunction(
column_validation_function
on_statement
if not self._is_delta_validation
else f"delta_{on_statement}"
)
self._validation_field_name = re.search(
r"^(\w+)\(([ \w-]+)\)$", self.on.strip()
r"^(\w+)\(([ \w-]+)\)$", on_statement
).group(2)
else:
self._validation_function_type = ValidationFunctionType.DATASET
self._validation_function = ValidationFunction(self.on)
self._validation_function = ValidationFunction(
on_statement
if not self._is_delta_validation
else f"delta_{on_statement}"
)
self._validation_field_name = None

def __post_init__(self):
self._on_field_validation()
self._ref_field_validation()

@property
def get_validation_function(self) -> ValidationFunction:
return ValidationFunction(self._validation_function)

@property
def get_is_delta_validation(self):
return self._is_delta_validation

@property
def get_ref_data_source_name(self):
return self._ref_data_source_name if self.ref is not None else None

@property
def get_ref_dataset_name(self):
return self._ref_dataset_name if self.ref is not None else None

@property
def get_ref_field_name(self):
return self._ref_field_name if self.ref is not None else None

@property
def get_validation_function_type(self) -> ValidationFunctionType:
return self._validation_function_type
Expand Down
15 changes: 14 additions & 1 deletion dcs_core/core/common/models/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class ValidationFunction(str, Enum):
PERCENT_LONGITUDE = "percent_longitude"

# CROSS Validation
COMPARE_COUNT_ROWS = "compare_count_rows"
DELTA_COUNT_ROWS = "delta_count_rows"

# Failed rows
FAILED_ROWS = "failed_rows"
Expand All @@ -201,3 +201,16 @@ class ValidationInfo:
is_valid: Optional[bool] = None
reason: Optional[str] = None
tags: Dict[str, str] = None


@dataclass
class DeltaValidationInfo(ValidationInfo):
"""
DeltaValidationInfo is a dataclass that represents the difference between two validation info.
"""

source_value: Union[int, float] = None
reference_value: Union[int, float] = None
reference_datasource_name: str = None
reference_dataset: str = None
reference_field: Optional[str] = None
1 change: 1 addition & 0 deletions dcs_core/core/configuration/configuration_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def parse(self, config: Dict) -> Dict[str, ValidationConfigByDataset]:
query=value.get("query"),
regex=value.get("regex"),
values=value.get("values"),
ref=value.get("ref"),
)
validation_dict[validation_name] = validation_config

Expand Down
57 changes: 57 additions & 0 deletions dcs_core/core/validation/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from dcs_core.core.common.models.validation import (
ConditionType,
DeltaValidationInfo,
ValidationFunction,
ValidationInfo,
)
Expand Down Expand Up @@ -171,3 +172,59 @@ def get_validation_info(self, **kwargs) -> Union[ValidationInfo, None]:
traceback.print_exc(file=sys.stdout)
logger.error(f"Failed to generate metric {self.name}: {str(e)}")
return None


class DeltaValidation(Validation, ABC):
def __init__(
self,
name: str,
validation_config: ValidationConfig,
data_source: DataSource,
dataset_name: str,
reference_data_source: DataSource,
reference_dataset_name: str,
reference_field_name: str = None,
**kwargs,
):
super().__init__(name, validation_config, data_source, dataset_name, **kwargs)
self.reference_data_source = reference_data_source
self.reference_dataset_name = reference_dataset_name
self.reference_field_name = reference_field_name

@abstractmethod
def _generate_reference_metric_value(self, **kwargs) -> Union[float, int]:
pass

def get_validation_info(self, **kwargs) -> Union[ValidationInfo, None]:
try:
metric_value = self._generate_metric_value(**kwargs)
reference_metric_value = self._generate_reference_metric_value(**kwargs)
delta_value = abs(metric_value - reference_metric_value)

tags = {
"name": self.name,
}

value = DeltaValidationInfo(
name=self.name,
identity=self.get_validation_identity(),
data_source_name=self.data_source.data_source_name,
dataset=self.dataset_name,
validation_function=self.validation_config.get_validation_function,
field=self.field_name,
value=delta_value,
source_value=metric_value,
reference_value=reference_metric_value,
reference_datasource_name=self.reference_data_source.data_source_name,
reference_dataset=self.reference_dataset_name,
timestamp=datetime.datetime.utcnow(),
tags=tags,
)
if self.threshold is not None:
value.is_valid, value.reason = self._validate_threshold(delta_value)

return value
except Exception as e:
traceback.print_exc(file=sys.stdout)
logger.error(f"Failed to generate metric {self.name}: {str(e)}")
return None
55 changes: 40 additions & 15 deletions dcs_core/core/validation/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from dcs_core.core.common.models.validation import ValidationFunction
from dcs_core.core.datasource.manager import DataSourceManager
from dcs_core.core.validation.base import Validation
from dcs_core.core.validation.base import DeltaValidation, Validation
from dcs_core.core.validation.completeness_validation import ( # noqa F401 this is used in globals
CountAllSpaceValidation,
CountEmptyStringValidation,
Expand Down Expand Up @@ -54,6 +54,7 @@
from dcs_core.core.validation.reliability_validation import ( # noqa F401 this is used in globals
CountDocumentsValidation,
CountRowValidation,
DeltaCountRowValidation,
FreshnessValueMetric,
)
from dcs_core.core.validation.uniqueness_validation import ( # noqa F401 this is used in globals
Expand Down Expand Up @@ -126,6 +127,7 @@ class ValidationManager:
ValidationFunction.CUSTOM_SQL.value: "CustomSqlValidation",
ValidationFunction.COUNT_DOCUMENTS.value: "CountDocumentsValidation",
ValidationFunction.COUNT_ROWS.value: "CountRowValidation",
ValidationFunction.DELTA_COUNT_ROWS.value: "DeltaCountRowValidation",
ValidationFunction.FRESHNESS.value: "FreshnessValueMetric",
ValidationFunction.COUNT_UUID.value: "CountUUIDValidation",
ValidationFunction.PERCENT_UUID.value: "PercentUUIDValidation",
Expand Down Expand Up @@ -230,22 +232,45 @@ def build_validations(self):
) in validation_by_dataset.validations.items():
data_source = self.data_source_manager.get_data_source(data_source_name)
params = {}
validation: Validation = globals()[
self.VALIDATION_CLASS_MAPPING[
if validation_config.get_is_delta_validation:
reference_data_source = self.data_source_manager.get_data_source(
validation_config.get_ref_data_source_name
)
base_class_name = self.VALIDATION_CLASS_MAPPING[
validation_config.get_validation_function
]
](
name=validation_name,
data_source=data_source,
dataset_name=dataset_name,
validation_name=validation_name,
validation_config=validation_config,
field_name=validation_config.get_validation_field_name,
**params,
)
self.validations[data_source_name][dataset_name][
validation_name
] = validation
validation: DeltaValidation = globals()[base_class_name](
name=validation_name,
data_source=data_source,
dataset_name=dataset_name,
validation_name=validation_name,
validation_config=validation_config,
field_name=validation_config.get_validation_field_name,
reference_data_source=reference_data_source,
reference_dataset_name=validation_config.get_ref_dataset_name,
reference_field_name=validation_config.get_ref_field_name,
**params,
)
self.validations[data_source_name][dataset_name][
validation_name
] = validation
else:
validation: Validation = globals()[
self.VALIDATION_CLASS_MAPPING[
validation_config.get_validation_function
]
](
name=validation_name,
data_source=data_source,
dataset_name=dataset_name,
validation_name=validation_name,
validation_config=validation_config,
field_name=validation_config.get_validation_field_name,
**params,
)
self.validations[data_source_name][dataset_name][
validation_name
] = validation

def add_validation(self, validation: Validation):
data_source_name = validation.data_source.data_source_name
Expand Down
Loading

0 comments on commit b7d09cb

Please sign in to comment.