From b7d09cbe05d4968f8c749d2544f4817f568a8118 Mon Sep 17 00:00:00 2001 From: subhankarb Date: Sat, 19 Oct 2024 23:48:22 +0530 Subject: [PATCH] add delta validation --- README.md | 45 +++++++------ dcs_core/cli/cli.py | 2 +- dcs_core/core/common/models/configuration.py | 66 ++++++++++++++++--- dcs_core/core/common/models/validation.py | 15 ++++- .../configuration/configuration_parser.py | 1 + dcs_core/core/validation/base.py | 57 ++++++++++++++++ dcs_core/core/validation/manager.py | 55 +++++++++++----- .../core/validation/reliability_validation.py | 28 +++++++- .../postgres/example_postgres_config.yaml | 44 ++----------- examples/docker-compose.yaml | 2 +- .../configuration/test_configuration_v1.py | 41 ++++++++++++ 11 files changed, 267 insertions(+), 89 deletions(-) diff --git a/README.md b/README.md index 1baee79e..4530078f 100644 --- a/README.md +++ b/README.md @@ -73,11 +73,8 @@ pip install dcs-core -U ### Create the config file With a simple config file, you can generate data quality reports for your data sources. Below is the sample config example. -For more details, please visit the [config guide](https://docs.datachecks.io/configuration/metric_configuration/) +For more details, please visit the [config guide](https://docs.datachecks.io/dcs-oss/config/validation-config) -

- why_data_observability -

### Run from CLI @@ -93,33 +90,35 @@ dcs-core inspect -C config.yaml dcs-core inspect -C config.yaml --html-report ``` -Please visit the [Quick Start Guide](https://docs.datachecks.io/getting_started/) +Please visit the [Quick Start Guide](https://docs.datachecks.io/dcs-oss/introduction/getting-started) ## Supported Data Sources Datachecks supports sql and search data sources. Below are the list of supported data sources. -| Data Source | Type | Supported | -| ----------------------------------------------------------------------- | ---------------------- | ---------- | -| [Postgres](https://docs.datachecks.io/integrations/postgres/) | Transactional Database | :thumbsup: | -| [MySql](https://docs.datachecks.io/integrations/mysql/) | Transactional Database | :thumbsup: | -| [MS SQL Server](https://docs.datachecks.io/integrations/mssql/) | Transactional Database | :thumbsup: | -| [OpenSearch](https://docs.datachecks.io/integrations/opensearch/) | Search Engine | :thumbsup: | -| [Elasticsearch](https://docs.datachecks.io/integrations/elasticsearch/) | Search Engine | :thumbsup: | -| [GCP BigQuery](https://docs.datachecks.io/integrations/bigquery/) | Data Warehouse | :thumbsup: | -| [DataBricks](https://docs.datachecks.io/integrations/databricks/) | Data Warehouse | :thumbsup: | -| [Snowflake](https://docs.datachecks.io/integrations/snowflake/) | Data Warehouse | :thumbsup: | -| AWS RedShift | Data Warehouse | :x: | +| Data Source | Type | Supported | +|---------------------------------------------------------------------------------------|------------------------|------------| +| [Postgres](https://docs.datachecks.io/dcs-oss/integrations/transactional/postgres) | Transactional Database | :thumbsup: | +| [MySql](https://docs.datachecks.io/dcs-oss/integrations/transactional/mysql) | Transactional Database | :thumbsup: | +| [MS SQL Server](https://docs.datachecks.io/dcs-oss/integrations/transactional/mssql) | Transactional Database | :thumbsup: | +| [Oracle](https://docs.datachecks.io/dcs-oss/integrations/transactional/oracle) | Transactional Database | :thumbsup: | +| [DB2](https://docs.datachecks.io/dcs-oss/integrations/transactional/db2) | Transactional Database | :thumbsup: | +| [OpenSearch](https://docs.datachecks.io/dcs-oss/integrations/search/opensearch) | Search Engine | :thumbsup: | +| [Elasticsearch](https://docs.datachecks.io/dcs-oss/integrations/search/elasticsearch) | Search Engine | :thumbsup: | +| [GCP BigQuery](https://docs.datachecks.io/dcs-oss/integrations/warehouse/bigquery) | Data Warehouse | :thumbsup: | +| [DataBricks](https://docs.datachecks.io/dcs-oss/integrations/warehouse/databricks) | Data Warehouse | :thumbsup: | +| [Snowflake](https://docs.datachecks.io/dcs-oss/integrations/warehouse/snowflake) | Data Warehouse | :thumbsup: | +| [AWS RedShift](https://docs.datachecks.io/dcs-oss/integrations/warehouse/redshift) | Data Warehouse | :thumbsup: | ## Metric Types -| Validation Funtions | Description | -|-----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------| -| **[Reliability](https://docs.datachecks.io/validations/reliability/)** | Reliability functions detect whether tables/indices/collections are updating with timely data | -| **[Numeric Distribution](https://docs.datachecks.io/validations/numeric_distribution/)** | Numeric Distribution functions detect changes in the numeric distributions i.e. of values, variance, skew and more | -| **[Uniqueness](https://docs.datachecks.io/validations/uniqueness/)** | Uniqueness functions detect when data constraints are breached like duplicates, number of distinct values etc | -| **[Completeness](https://docs.datachecks.io/validations/completeness/)** | Completeness functions detect when there are missing values in datasets i.e. Null, empty value | -| **[Validity](https://docs.datachecks.io/validations/validity/)** | Validity functions detect whether data is formatted correctly and represents a valid value | +| Validation Funtions | Description | +|------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------| +| **[Reliability](https://docs.datachecks.io/dcs-oss/validations/reliability)** | Reliability functions detect whether tables/indices/collections are updating with timely data | +| **[Numeric Distribution](https://docs.datachecks.io/dcs-oss/validations/distribution)** | Numeric Distribution functions detect changes in the numeric distributions i.e. of values, variance, skew and more | +| **[Uniqueness](https://docs.datachecks.io/dcs-oss/validations/uniqueness)** | Uniqueness functions detect when data constraints are breached like duplicates, number of distinct values etc | +| **[Completeness](https://docs.datachecks.io/dcs-oss/validations/completeness)** | Completeness functions detect when there are missing values in datasets i.e. Null, empty value | +| **[Validity](https://docs.datachecks.io/dcs-oss/validations/pattern-matching)** | Validity functions detect whether data is formatted correctly and represents a valid value | ## Overview diff --git a/dcs_core/cli/cli.py b/dcs_core/cli/cli.py index 8b12e2e8..6261f25d 100644 --- a/dcs_core/cli/cli.py +++ b/dcs_core/cli/cli.py @@ -80,7 +80,7 @@ def inspect( Starts the datachecks inspection """ try: - is_file_exists = os.path.isfile(config_path) + is_file_exists = os.path.exists(config_path) if not is_file_exists: raise Exception( f"Invalid value for '-C' / '--config-path': File '{config_path}' does not exist." diff --git a/dcs_core/core/common/models/configuration.py b/dcs_core/core/common/models/configuration.py index 020990b6..87378cc8 100644 --- a/dcs_core/core/common/models/configuration.py +++ b/dcs_core/core/common/models/configuration.py @@ -16,6 +16,8 @@ from enum import Enum from typing import Any, Dict, List, Optional, Union +from markdown_it.rules_block import reference + from dcs_core.core.common.models.data_source_resource import Field, Index, Table from dcs_core.core.common.models.metric import MetricsType from dcs_core.core.common.models.validation import ( @@ -99,6 +101,21 @@ class ValidationConfig: query: Optional[str] = None regex: Optional[str] = None values: Optional[List] = None + ref: Optional[str] = None + + def _ref_field_validation(self): + if self.ref is not None: + reference_resources = self.ref.strip().split(".") + if len(reference_resources) < 2 or len(reference_resources) > 3: + raise ValueError( + "ref field should be in the format of .." + ) + self._ref_data_source_name = reference_resources[0] + self._ref_dataset_name = reference_resources[1] + self._ref_field_name = None + + if len(reference_resources) == 3: + self._ref_field_name = reference_resources[2] def _on_field_validation(self): if self.on is None: @@ -108,15 +125,25 @@ def _on_field_validation(self): ValidationFunction.COUNT_ROWS, ValidationFunction.COUNT_DOCUMENTS, ValidationFunction.CUSTOM_SQL, - ValidationFunction.COMPARE_COUNT_ROWS, + ValidationFunction.DELTA_COUNT_ROWS, ] - if self.on.strip() not in dataset_validation_functions: + + if self.on.strip().startswith("delta"): + self._is_delta_validation = True + on_statement = re.search(r"^delta\s+(.+)", self.on.strip()).group(1) + else: + self._is_delta_validation = False + on_statement = self.on.strip() + + if on_statement not in dataset_validation_functions: self._validation_function_type = ValidationFunctionType.FIELD - if not re.match(r"^(\w+)\(([ \w-]+)\)$", self.on.strip()): - raise ValueError(f"on field must be a valid function, was {self.on}") + if not re.match(r"^(\w+)\(([ \w-]+)\)$", on_statement): + raise ValueError( + f"on field must be a valid function, was {on_statement}" + ) else: column_validation_function = re.search( - r"^(\w+)\(([ \w-]+)\)$", self.on.strip() + r"^(\w+)\(([ \w-]+)\)$", on_statement ).group(1) if column_validation_function not in [v for v in ValidationFunction]: @@ -130,23 +157,46 @@ def _on_field_validation(self): ) self._validation_function = ValidationFunction( - column_validation_function + on_statement + if not self._is_delta_validation + else f"delta_{on_statement}" ) self._validation_field_name = re.search( - r"^(\w+)\(([ \w-]+)\)$", self.on.strip() + r"^(\w+)\(([ \w-]+)\)$", on_statement ).group(2) else: self._validation_function_type = ValidationFunctionType.DATASET - self._validation_function = ValidationFunction(self.on) + self._validation_function = ValidationFunction( + on_statement + if not self._is_delta_validation + else f"delta_{on_statement}" + ) self._validation_field_name = None def __post_init__(self): self._on_field_validation() + self._ref_field_validation() @property def get_validation_function(self) -> ValidationFunction: return ValidationFunction(self._validation_function) + @property + def get_is_delta_validation(self): + return self._is_delta_validation + + @property + def get_ref_data_source_name(self): + return self._ref_data_source_name if self.ref is not None else None + + @property + def get_ref_dataset_name(self): + return self._ref_dataset_name if self.ref is not None else None + + @property + def get_ref_field_name(self): + return self._ref_field_name if self.ref is not None else None + @property def get_validation_function_type(self) -> ValidationFunctionType: return self._validation_function_type diff --git a/dcs_core/core/common/models/validation.py b/dcs_core/core/common/models/validation.py index 94e4f4a5..4e706b9e 100644 --- a/dcs_core/core/common/models/validation.py +++ b/dcs_core/core/common/models/validation.py @@ -182,7 +182,7 @@ class ValidationFunction(str, Enum): PERCENT_LONGITUDE = "percent_longitude" # CROSS Validation - COMPARE_COUNT_ROWS = "compare_count_rows" + DELTA_COUNT_ROWS = "delta_count_rows" # Failed rows FAILED_ROWS = "failed_rows" @@ -201,3 +201,16 @@ class ValidationInfo: is_valid: Optional[bool] = None reason: Optional[str] = None tags: Dict[str, str] = None + + +@dataclass +class DeltaValidationInfo(ValidationInfo): + """ + DeltaValidationInfo is a dataclass that represents the difference between two validation info. + """ + + source_value: Union[int, float] = None + reference_value: Union[int, float] = None + reference_datasource_name: str = None + reference_dataset: str = None + reference_field: Optional[str] = None diff --git a/dcs_core/core/configuration/configuration_parser.py b/dcs_core/core/configuration/configuration_parser.py index a705fdc0..4a24b798 100644 --- a/dcs_core/core/configuration/configuration_parser.py +++ b/dcs_core/core/configuration/configuration_parser.py @@ -145,6 +145,7 @@ def parse(self, config: Dict) -> Dict[str, ValidationConfigByDataset]: query=value.get("query"), regex=value.get("regex"), values=value.get("values"), + ref=value.get("ref"), ) validation_dict[validation_name] = validation_config diff --git a/dcs_core/core/validation/base.py b/dcs_core/core/validation/base.py index 288a01d4..4e42e767 100644 --- a/dcs_core/core/validation/base.py +++ b/dcs_core/core/validation/base.py @@ -27,6 +27,7 @@ ) from dcs_core.core.common.models.validation import ( ConditionType, + DeltaValidationInfo, ValidationFunction, ValidationInfo, ) @@ -171,3 +172,59 @@ def get_validation_info(self, **kwargs) -> Union[ValidationInfo, None]: traceback.print_exc(file=sys.stdout) logger.error(f"Failed to generate metric {self.name}: {str(e)}") return None + + +class DeltaValidation(Validation, ABC): + def __init__( + self, + name: str, + validation_config: ValidationConfig, + data_source: DataSource, + dataset_name: str, + reference_data_source: DataSource, + reference_dataset_name: str, + reference_field_name: str = None, + **kwargs, + ): + super().__init__(name, validation_config, data_source, dataset_name, **kwargs) + self.reference_data_source = reference_data_source + self.reference_dataset_name = reference_dataset_name + self.reference_field_name = reference_field_name + + @abstractmethod + def _generate_reference_metric_value(self, **kwargs) -> Union[float, int]: + pass + + def get_validation_info(self, **kwargs) -> Union[ValidationInfo, None]: + try: + metric_value = self._generate_metric_value(**kwargs) + reference_metric_value = self._generate_reference_metric_value(**kwargs) + delta_value = abs(metric_value - reference_metric_value) + + tags = { + "name": self.name, + } + + value = DeltaValidationInfo( + name=self.name, + identity=self.get_validation_identity(), + data_source_name=self.data_source.data_source_name, + dataset=self.dataset_name, + validation_function=self.validation_config.get_validation_function, + field=self.field_name, + value=delta_value, + source_value=metric_value, + reference_value=reference_metric_value, + reference_datasource_name=self.reference_data_source.data_source_name, + reference_dataset=self.reference_dataset_name, + timestamp=datetime.datetime.utcnow(), + tags=tags, + ) + if self.threshold is not None: + value.is_valid, value.reason = self._validate_threshold(delta_value) + + return value + except Exception as e: + traceback.print_exc(file=sys.stdout) + logger.error(f"Failed to generate metric {self.name}: {str(e)}") + return None diff --git a/dcs_core/core/validation/manager.py b/dcs_core/core/validation/manager.py index d4d85e63..172c0b99 100644 --- a/dcs_core/core/validation/manager.py +++ b/dcs_core/core/validation/manager.py @@ -20,7 +20,7 @@ ) from dcs_core.core.common.models.validation import ValidationFunction from dcs_core.core.datasource.manager import DataSourceManager -from dcs_core.core.validation.base import Validation +from dcs_core.core.validation.base import DeltaValidation, Validation from dcs_core.core.validation.completeness_validation import ( # noqa F401 this is used in globals CountAllSpaceValidation, CountEmptyStringValidation, @@ -54,6 +54,7 @@ from dcs_core.core.validation.reliability_validation import ( # noqa F401 this is used in globals CountDocumentsValidation, CountRowValidation, + DeltaCountRowValidation, FreshnessValueMetric, ) from dcs_core.core.validation.uniqueness_validation import ( # noqa F401 this is used in globals @@ -126,6 +127,7 @@ class ValidationManager: ValidationFunction.CUSTOM_SQL.value: "CustomSqlValidation", ValidationFunction.COUNT_DOCUMENTS.value: "CountDocumentsValidation", ValidationFunction.COUNT_ROWS.value: "CountRowValidation", + ValidationFunction.DELTA_COUNT_ROWS.value: "DeltaCountRowValidation", ValidationFunction.FRESHNESS.value: "FreshnessValueMetric", ValidationFunction.COUNT_UUID.value: "CountUUIDValidation", ValidationFunction.PERCENT_UUID.value: "PercentUUIDValidation", @@ -230,22 +232,45 @@ def build_validations(self): ) in validation_by_dataset.validations.items(): data_source = self.data_source_manager.get_data_source(data_source_name) params = {} - validation: Validation = globals()[ - self.VALIDATION_CLASS_MAPPING[ + if validation_config.get_is_delta_validation: + reference_data_source = self.data_source_manager.get_data_source( + validation_config.get_ref_data_source_name + ) + base_class_name = self.VALIDATION_CLASS_MAPPING[ validation_config.get_validation_function ] - ]( - name=validation_name, - data_source=data_source, - dataset_name=dataset_name, - validation_name=validation_name, - validation_config=validation_config, - field_name=validation_config.get_validation_field_name, - **params, - ) - self.validations[data_source_name][dataset_name][ - validation_name - ] = validation + validation: DeltaValidation = globals()[base_class_name]( + name=validation_name, + data_source=data_source, + dataset_name=dataset_name, + validation_name=validation_name, + validation_config=validation_config, + field_name=validation_config.get_validation_field_name, + reference_data_source=reference_data_source, + reference_dataset_name=validation_config.get_ref_dataset_name, + reference_field_name=validation_config.get_ref_field_name, + **params, + ) + self.validations[data_source_name][dataset_name][ + validation_name + ] = validation + else: + validation: Validation = globals()[ + self.VALIDATION_CLASS_MAPPING[ + validation_config.get_validation_function + ] + ]( + name=validation_name, + data_source=data_source, + dataset_name=dataset_name, + validation_name=validation_name, + validation_config=validation_config, + field_name=validation_config.get_validation_field_name, + **params, + ) + self.validations[data_source_name][dataset_name][ + validation_name + ] = validation def add_validation(self, validation: Validation): data_source_name = validation.data_source.data_source_name diff --git a/dcs_core/core/validation/reliability_validation.py b/dcs_core/core/validation/reliability_validation.py index b53dbfee..d8889839 100644 --- a/dcs_core/core/validation/reliability_validation.py +++ b/dcs_core/core/validation/reliability_validation.py @@ -11,9 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Union + from dcs_core.core.datasource.search_datasource import SearchIndexDataSource from dcs_core.core.datasource.sql_datasource import SQLDataSource -from dcs_core.core.validation.base import Validation +from dcs_core.core.validation.base import DeltaValidation, Validation class CountDocumentsValidation(Validation): @@ -47,6 +49,30 @@ def _generate_metric_value(self): raise ValueError("Invalid data source type") +class DeltaCountRowValidation(DeltaValidation): + """ + RowCountMetrics is a class that represents a metric that is generated by a data source. + """ + + def _generate_reference_metric_value(self, **kwargs) -> Union[float, int]: + if isinstance(self.reference_data_source, SQLDataSource): + return self.reference_data_source.query_get_row_count( + table=self.reference_dataset_name, + filters=self.where_filter if self.where_filter else None, + ) + else: + raise ValueError("Invalid data source type") + + def _generate_metric_value(self): + if isinstance(self.data_source, SQLDataSource): + return self.data_source.query_get_row_count( + table=self.dataset_name, + filters=self.where_filter if self.where_filter else None, + ) + else: + raise ValueError("Invalid data source type") + + class FreshnessValueMetric(Validation): """ FreshnessMetric is a class that represents a metric that is generated by a data source. diff --git a/examples/configurations/postgres/example_postgres_config.yaml b/examples/configurations/postgres/example_postgres_config.yaml index bc27090b..c9dba19d 100644 --- a/examples/configurations/postgres/example_postgres_config.yaml +++ b/examples/configurations/postgres/example_postgres_config.yaml @@ -137,42 +137,8 @@ validations for iris_pgsql.dcs_iris: on: percent_valid_regex(species) regex: "^(setosa|virginica)$" -validations for product_db.products: - - percentage uuid for product_id: - on: percent_uuid(product_id) - threshold: "> 90" - - count uuid for product_id: - on: count_uuid(product_id) - threshold: "> 100" - - # String Length Validations - - species string length max: - on: string_length_max(species) - threshold: "<= 20" - - - species string length min: - on: string_length_min(species) - threshold: ">= 5" - - - species string length average: - on: string_length_average(species) - threshold: ">= 10" - - -# Geolocation Validations - - location latitude count: - on: count_latitude(latitude) - threshold: "> 100" - - - location latitude percentage: - on: percent_latitude(latitude) - threshold: "> 80" - - - location longitude count: - on: count_longitude(longitude) - threshold: "> 100" - - - location longitude percentage: - on: percent_longitude(longitude) - threshold: "> 80" - + # delta validation + - reference count difference count rows: + on: delta count_rows + threshold: < 10 + ref: iris_pgsql.dcs_diamond \ No newline at end of file diff --git a/examples/docker-compose.yaml b/examples/docker-compose.yaml index 3cf9321f..5f89a11d 100644 --- a/examples/docker-compose.yaml +++ b/examples/docker-compose.yaml @@ -28,7 +28,7 @@ services: dcs-test-postgres: container_name: dcs-test-postgres - image: postgres + image: postgres:16 environment: POSTGRES_DB: dcs_db POSTGRES_USER: postgres diff --git a/tests/core/configuration/test_configuration_v1.py b/tests/core/configuration/test_configuration_v1.py index 4cd96e00..8d806ba4 100644 --- a/tests/core/configuration/test_configuration_v1.py +++ b/tests/core/configuration/test_configuration_v1.py @@ -1177,3 +1177,44 @@ def test_should_parse_percent_date_not_in_future(): .get_validation_function == ValidationFunction.PERCENT_DATE_NOT_IN_FUTURE ) + + +def test_should_read_delta_validation_config(): + yaml_string = """ + validations for source.table: + - test: + on: delta count_rows + threshold: "<10" + ref: source1.table1 + """ + configuration = load_configuration_from_yaml_str(yaml_string) + assert ( + configuration.validations["source.table"] + .validations["test"] + .get_validation_function + == ValidationFunction.DELTA_COUNT_ROWS + ) + assert ( + configuration.validations["source.table"] + .validations["test"] + .get_ref_data_source_name + == "source1" + ) + assert ( + configuration.validations["source.table"] + .validations["test"] + .get_ref_dataset_name + == "table1" + ) + + +def test_should_throw_error_on_delta_function(): + yaml_string = """ + validations for source.table: + - test: + on: delta count_documents + threshold: "<10" + ref: source1.table1 + """ + with pytest.raises(Exception): + load_configuration_from_yaml_str(yaml_string)