From 17613f3ff64f0f2ffebbf123a448ce0ba1370e36 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Wed, 12 Feb 2025 12:19:10 +0100 Subject: [PATCH] Fixed parsing error when loading checks from a file (#165) ## Changes Fixing SQL Expression Parsing Error. Refactored tests to avoid code duplication and increase maintenance. ### Linked issues Resolves #162 ### Tests - [x] manually tested - [x] added unit tests - [x] added integration tests --- docs/dqx/docs/guide.mdx | 13 + pyproject.toml | 2 +- src/databricks/labs/dqx/base.py | 8 +- src/databricks/labs/dqx/engine.py | 114 +++++++- src/databricks/labs/dqx/utils.py | 23 +- tests/__init__.py | 0 tests/conftest.py | 273 ++++++++++++++++++ tests/integration/conftest.py | 29 +- tests/integration/test_apply_checks.py | 58 +++- tests/integration/test_cli.py | 5 +- tests/integration/test_dlt_rules_generator.py | 2 +- tests/integration/test_installation.py | 4 +- .../integration/test_load_checks_from_file.py | 55 +++- tests/test_data/checks.json | 5 - tests/test_data/checks.yml | 23 -- tests/unit/conftest.py | 11 - .../unit/test_load_checks_from_local_file.py | 45 ++- tests/unit/test_save_checks_in_local_file.py | 6 +- 18 files changed, 530 insertions(+), 146 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py delete mode 100644 tests/test_data/checks.json delete mode 100644 tests/test_data/checks.yml diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 732e8d6a..0b194507 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -60,6 +60,10 @@ print(dlt_expectations) You can optionally install DQX in the workspace, see the [Installation Guide](/docs/installation#dqx-installation-in-a-databricks-workspace). As part of the installation, a config, dashboards and profiler workflow is installed. The workflow can be run manually in the workspace UI or using the CLI as below. +DQX operates at the moment exclusively at the pySpark dataframe level and does not interact directly with databases or storage systems. +DQX does not persist data after performing quality checks, meaning users must handle data storage themselves. +Since DQX does not manage the input location, output table, or quarantine table, it is the user's responsibility to store or persist the processed data as needed. + Open the config to check available run configs and adjust the settings if needed: ```commandline databricks labs dqx open-remote-config @@ -164,6 +168,11 @@ Fields: ### Loading and execution methods +Checks can be loaded from a file in the installation folder, workspace, or local file system. If the checks file contains invalid json or yaml syntax, the engine will raise an error. +The checks can be applied using `apply_checks_by_metadata_and_split` or `apply_checks_by_metadata` methods. The checks are validated automatically as part of these methods. +If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_by_metadata_and_split`. +If you want to report issues as additional columns, use `apply_checks_by_metadata`. + #### Method 1: Loading checks from a workspace file in the installation folder If DQX is installed in the workspace, you can load checks based on the run configuration: @@ -229,6 +238,10 @@ valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks) ### Quality rules defined as code +Check can be defined in the code and applied using `apply_checks_and_split` or `apply_checks` methods. +If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_and_split`. +If you want to report issues as additional columns, use `apply_checks`. + #### Method 1: Using DQX classes ```python diff --git a/pyproject.toml b/pyproject.toml index 2df80299..aeb562f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -243,7 +243,7 @@ argument-naming-style = "snake_case" # Regular expression matching correct argument names. Overrides argument-naming- # style. If left empty, argument names will be checked with the set naming style. -argument-rgx = "[a-z_][a-z0-9_]{2,30}$" +argument-rgx = "[a-z_][a-z0-9_]{2,40}$" # Naming style matching correct attribute names. attr-naming-style = "snake_case" diff --git a/src/databricks/labs/dqx/base.py b/src/databricks/labs/dqx/base.py index 651e6962..e706534e 100644 --- a/src/databricks/labs/dqx/base.py +++ b/src/databricks/labs/dqx/base.py @@ -131,22 +131,22 @@ def get_valid(self, df: DataFrame) -> DataFrame: @staticmethod @abc.abstractmethod - def load_checks_from_local_file(path: str) -> list[dict]: + def load_checks_from_local_file(filepath: str) -> list[dict]: """ Load checks (dq rules) from a file (json or yml) in the local file system. This does not require installation of DQX in the workspace. The returning checks can be used as input for `apply_checks_by_metadata` function. - :param path: path to a file containing the checks. + :param filepath: path to a file containing the checks. :return: list of dq rules """ @staticmethod @abc.abstractmethod - def save_checks_in_local_file(checks: list[dict], path: str): + def save_checks_in_local_file(checks: list[dict], filepath: str): """ Save checks (dq rules) to yml file in the local file system. :param checks: list of dq rules to save - :param path: path to a file containing the checks. + :param filepath: path to a file containing the checks. """ diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index ce2a6658..f6c9fc20 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -116,27 +116,27 @@ def get_valid(self, df: DataFrame) -> DataFrame: ) @staticmethod - def load_checks_from_local_file(path: str) -> list[dict]: - if not path: - raise ValueError("filename must be provided") + def load_checks_from_local_file(filepath: str) -> list[dict]: + if not filepath: + raise ValueError("filepath must be provided") try: - checks = Installation.load_local(list[dict[str, str]], Path(path)) + checks = Installation.load_local(list[dict[str, str]], Path(filepath)) return deserialize_dicts(checks) except FileNotFoundError: - msg = f"Checks file {path} missing" + msg = f"Checks file {filepath} missing" raise FileNotFoundError(msg) from None @staticmethod - def save_checks_in_local_file(checks: list[dict], path: str): - if not path: - raise ValueError("filename must be provided") + def save_checks_in_local_file(checks: list[dict], filepath: str): + if not filepath: + raise ValueError("filepath must be provided") try: - with open(path, 'w', encoding="utf-8") as file: + with open(filepath, 'w', encoding="utf-8") as file: yaml.safe_dump(checks, file) except FileNotFoundError: - msg = f"Checks file {path} missing" + msg = f"Checks file {filepath} missing" raise FileNotFoundError(msg) from None @staticmethod @@ -391,34 +391,110 @@ def __init__( self._engine = engine or DQEngineCore(workspace_client, extra_params) def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: + """Applies data quality checks to a given dataframe. + + :param df: dataframe to check + :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class. + :return: dataframe with errors and warning reporting columns + """ return self._engine.apply_checks(df, checks) def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]: + """Applies data quality checks to a given dataframe and split it into two ("good" and "bad"), + according to the data quality checks. + + :param df: dataframe to check + :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class. + :return: two dataframes - "good" which includes warning rows but no reporting columns, and "data" having + error and warning rows and corresponding reporting columns + """ return self._engine.apply_checks_and_split(df, checks) def apply_checks_by_metadata_and_split( self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None ) -> tuple[DataFrame, DataFrame]: + """Wrapper around `apply_checks_and_split` for use in the metadata-driven pipelines. The main difference + is how the checks are specified - instead of using functions directly, they are described as function name plus + arguments. + + :param df: dataframe to check + :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - + it will be used as an error/warning message, or `null` if it's evaluated to `false` + * `name` - name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), + and `warn` (data is going into both dataframes) + :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module). + If not specified, then only built-in functions are used for the checks. + :return: two dataframes - "good" which includes warning rows but no reporting columns, and "bad" having + error and warning rows and corresponding reporting columns + """ return self._engine.apply_checks_by_metadata_and_split(df, checks, glbs) def apply_checks_by_metadata( self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None ) -> DataFrame: + """Wrapper around `apply_checks` for use in the metadata-driven pipelines. The main difference + is how the checks are specified - instead of using functions directly, they are described as function name plus + arguments. + + :param df: dataframe to check + :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - + it will be used as an error/warning message, or `null` if it's evaluated to `false` + * `name` - name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), + and `warn` (data is going into both dataframes) + :param glbs: dictionary with functions mapping (eg. ``globals()`` of calling module). + If not specified, then only built-in functions are used for the checks. + :return: dataframe with errors and warning reporting columns + """ return self._engine.apply_checks_by_metadata(df, checks, glbs) @staticmethod def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: + """ + Validate the input dict to ensure they conform to expected structure and types. + + Each check can be a dictionary. The function validates + the presence of required keys, the existence and callability of functions, and the types + of arguments passed to these functions. + + :param checks: List of checks to apply to the dataframe. Each check should be a dictionary. + :param glbs: Optional dictionary of global functions that can be used in checks. + + :return ValidationStatus: The validation status. + """ return DQEngineCore.validate_checks(checks, glbs) def get_invalid(self, df: DataFrame) -> DataFrame: + """ + Get records that violate data quality checks (records with warnings and errors). + @param df: input DataFrame. + @return: dataframe with error and warning rows and corresponding reporting columns. + """ return self._engine.get_invalid(df) def get_valid(self, df: DataFrame) -> DataFrame: + """ + Get records that don't violate data quality checks (records with warnings but no errors). + @param df: input DataFrame. + @return: dataframe with warning rows but no reporting columns. + """ return self._engine.get_valid(df) @staticmethod - def load_checks_from_local_file(path: str) -> list[dict]: - return DQEngineCore.load_checks_from_local_file(path) + def load_checks_from_local_file(filepath: str) -> list[dict]: + """ + Load checks (dq rules) from a file (json or yml) in the local filesystem. + + :param filepath: path to the file containing the checks. + :return: list of dq rules or raise an error if checks file is missing or is invalid. + """ + parsed_checks = DQEngineCore.load_checks_from_local_file(filepath) + if not parsed_checks: + raise ValueError(f"Invalid or no checks in file: {filepath}") + return parsed_checks def load_checks_from_workspace_file(self, workspace_path: str) -> list[dict]: """Load checks (dq rules) from a file (json or yml) in the workspace. @@ -426,14 +502,17 @@ def load_checks_from_workspace_file(self, workspace_path: str) -> list[dict]: The returning checks can be used as input for `apply_checks_by_metadata` function. :param workspace_path: path to the file in the workspace. - :return: list of dq rules. + :return: list of dq rules or raise an error if checks file is missing or is invalid. """ workspace_dir = os.path.dirname(workspace_path) filename = os.path.basename(workspace_path) installation = Installation(self.ws, "dqx", install_folder=workspace_dir) logger.info(f"Loading quality rules (checks) from {workspace_path} in the workspace.") - return self._load_checks_from_file(installation, filename) + parsed_checks = self._load_checks_from_file(installation, filename) + if not parsed_checks: + raise ValueError(f"Invalid or no checks in workspace file: {workspace_path}") + return parsed_checks def load_checks_from_installation( self, run_config_name: str | None = "default", product_name: str = "dqx", assume_user: bool = True @@ -445,14 +524,17 @@ def load_checks_from_installation( :param run_config_name: name of the run (config) to use :param product_name: name of the product/installation directory :param assume_user: if True, assume user installation - :return: list of dq rules + :return: list of dq rules or raise an error if checks file is missing or is invalid. """ installation = self._get_installation(assume_user, product_name) run_config = self._load_run_config(installation, run_config_name) filename = run_config.checks_file or "checks.yml" logger.info(f"Loading quality rules (checks) from {installation.install_folder()}/{filename} in the workspace.") - return self._load_checks_from_file(installation, filename) + parsed_checks = self._load_checks_from_file(installation, filename) + if not parsed_checks: + raise ValueError(f"Invalid or no checks in workspace file: {installation.install_folder()}/{filename}") + return parsed_checks @staticmethod def save_checks_in_local_file(checks: list[dict], path: str): diff --git a/src/databricks/labs/dqx/utils.py b/src/databricks/labs/dqx/utils.py index 885906ec..aa366bf9 100644 --- a/src/databricks/labs/dqx/utils.py +++ b/src/databricks/labs/dqx/utils.py @@ -1,5 +1,5 @@ import re -import yaml +import ast from pyspark.sql import Column from pyspark.sql import SparkSession @@ -49,12 +49,21 @@ def read_input_data(spark: SparkSession, input_location: str | None, input_forma def deserialize_dicts(checks: list[dict[str, str]]) -> list[dict]: """ - deserialize string fields instances containing dictionaries + Deserialize string fields instances containing dictionaries. + This is needed as nested dictionaries from installation files are loaded as strings. @param checks: list of checks @return: """ - for item in checks: - for key, value in item.items(): - if value.startswith("{") and value.endswith("}"): - item[key] = yaml.safe_load(value.replace("'", '"')) - return checks + + def parse_nested_fields(obj): + """Recursively parse all string representations of dictionaries.""" + if isinstance(obj, str): + if obj.startswith("{") and obj.endswith("}"): + parsed_obj = ast.literal_eval(obj) + return parse_nested_fields(parsed_obj) + return obj + if isinstance(obj, dict): + return {k: parse_nested_fields(v) for k, v in obj.items()} + return obj + + return [parse_nested_fields(check) for check in checks] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..f8dcda26 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,273 @@ +import os +import pytest +from databricks.labs.pytester.fixtures.baseline import factory +from databricks.sdk.service.workspace import ImportFormat + + +@pytest.fixture +def checks_yml_content(): + return """- criticality: error + check: + function: is_not_null + arguments: + col_names: + - col1 + - col2 +- name: col_col3_is_null_or_empty + criticality: error + check: + function: is_not_null_and_not_empty + arguments: + col_name: col3 + trim_strings: true +- criticality: warn + check: + function: value_is_in_list + arguments: + col_name: col4 + allowed: + - 1 + - 2 +- criticality: error + check: + function: sql_expression + arguments: + expression: col1 like "Team %" +- criticality: error + check: + function: sql_expression + arguments: + expression: col2 like 'Team %' + """ + + +@pytest.fixture +def checks_json_content(): + return """[ + { + "criticality": "error", + "check": { + "function": "is_not_null", + "arguments": { + "col_names": ["col1", "col2"] + } + } + }, + { + "name": "col_col3_is_null_or_empty", + "criticality": "error", + "check": { + "function": "is_not_null_and_not_empty", + "arguments": { + "col_name": "col3", + "trim_strings": true + } + } + }, + { + "criticality": "warn", + "check": { + "function": "value_is_in_list", + "arguments": { + "col_name": "col4", + "allowed": [1, 2] + } + } + }, + { + "criticality": "error", + "check": {"function": "sql_expression", "arguments": {"expression": "col1 like \\"Team %\\""}} + }, + { + "criticality": "error", + "check": {"function": "sql_expression", "arguments": {"expression": "col2 like 'Team %'"}} + } +] + """ + + +@pytest.fixture +def checks_yml_invalid_content(): + """This YML has wrong indentation for the function field.""" + return """- criticality: error + check: +function: is_not_null_and_not_empty + arguments: + col_names: + col1 + - col2 + """ + + +@pytest.fixture +def checks_json_invalid_content(): + """This JSON is missing a comma after criticality field.""" + return """[ + { + "criticality": "error" + "function": "is_not_null_and_not_empty", + "check": { + "arguments": { + "col_names": ["col1", "col2"] + } + } + } +] + """ + + +@pytest.fixture +def expected_checks(): + return [ + { + "criticality": "error", + "check": {"function": "is_not_null", "arguments": {"col_names": ["col1", "col2"]}}, + }, + { + "name": "col_col3_is_null_or_empty", + "criticality": "error", + "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "col3", "trim_strings": True}}, + }, + { + "criticality": "warn", + "check": {"function": "value_is_in_list", "arguments": {"col_name": "col4", "allowed": [1, 2]}}, + }, + { + "criticality": "error", + "check": {"function": "sql_expression", "arguments": {"expression": "col1 like \"Team %\""}}, + }, + { + "criticality": "error", + "check": {"function": "sql_expression", "arguments": {"expression": "col2 like 'Team %'"}}, + }, + ] + + +@pytest.fixture +def make_local_check_file_as_yml(checks_yml_content): + file_path = 'checks.yml' + with open(file_path, 'w', encoding="utf-8") as f: + f.write(checks_yml_content) + yield file_path + if os.path.exists(file_path): + os.remove(file_path) + + +@pytest.fixture +def make_local_check_file_as_json(checks_json_content): + file_path = 'checks.json' + with open(file_path, 'w', encoding="utf-8") as f: + f.write(checks_json_content) + yield file_path + if os.path.exists(file_path): + os.remove(file_path) + + +@pytest.fixture +def make_invalid_local_check_file_as_yml(checks_yml_invalid_content): + file_path = 'invalid_checks.yml' + with open(file_path, 'w', encoding="utf-8") as f: + f.write(checks_yml_invalid_content) + yield file_path + if os.path.exists(file_path): + os.remove(file_path) + + +@pytest.fixture +def make_invalid_local_check_file_as_json(checks_json_invalid_content): + file_path = 'invalid_checks.json' + with open(file_path, 'w', encoding="utf-8") as f: + f.write(checks_json_invalid_content) + yield file_path + if os.path.exists(file_path): + os.remove(file_path) + + +@pytest.fixture +def make_check_file_as_yml(ws, make_directory, checks_yml_content): + def create(**kwargs): + if kwargs["install_dir"]: + workspace_file_path = kwargs["install_dir"] + "/checks.yml" + else: + folder = make_directory() + workspace_file_path = str(folder.absolute()) + "/checks.yml" + + ws.workspace.upload( + path=workspace_file_path, format=ImportFormat.AUTO, content=checks_yml_content.encode(), overwrite=True + ) + + return workspace_file_path + + def delete(workspace_file_path: str) -> None: + ws.workspace.delete(workspace_file_path) + + yield from factory("file", create, delete) + + +@pytest.fixture +def make_check_file_as_json(ws, make_directory, checks_json_content): + def create(**kwargs): + if kwargs["install_dir"]: + workspace_file_path = kwargs["install_dir"] + "/checks.json" + else: + folder = make_directory() + workspace_file_path = str(folder.absolute()) + "/checks.json" + + ws.workspace.upload( + path=workspace_file_path, format=ImportFormat.AUTO, content=checks_json_content.encode(), overwrite=True + ) + + return workspace_file_path + + def delete(workspace_file_path: str) -> None: + ws.workspace.delete(workspace_file_path) + + yield from factory("file", create, delete) + + +@pytest.fixture +def make_invalid_check_file_as_yml(ws, make_directory, checks_yml_invalid_content): + def create(**kwargs): + if kwargs["install_dir"]: + workspace_file_path = kwargs["install_dir"] + "/checks.yml" + else: + folder = make_directory() + workspace_file_path = str(folder.absolute()) + "/checks.yml" + + ws.workspace.upload( + path=workspace_file_path, + format=ImportFormat.AUTO, + content=checks_yml_invalid_content.encode(), + overwrite=True, + ) + + return workspace_file_path + + def delete(workspace_file_path: str) -> None: + ws.workspace.delete(workspace_file_path) + + yield from factory("file", create, delete) + + +@pytest.fixture +def make_invalid_check_file_as_json(ws, make_directory, checks_json_invalid_content): + def create(**kwargs): + if kwargs["install_dir"]: + workspace_file_path = kwargs["install_dir"] + "/checks.json" + else: + folder = make_directory() + workspace_file_path = str(folder.absolute()) + "/checks.json" + + ws.workspace.upload( + path=workspace_file_path, + format=ImportFormat.AUTO, + content=checks_json_invalid_content.encode(), + overwrite=True, + ) + + return workspace_file_path + + def delete(workspace_file_path: str) -> None: + ws.workspace.delete(workspace_file_path) + + yield from factory("file", create, delete) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c2244492..87989009 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,16 +1,15 @@ import os import logging import threading -from pathlib import Path + from collections.abc import Callable, Generator from functools import cached_property from dataclasses import replace from unittest.mock import patch import pytest -from databricks.labs.pytester.fixtures.baseline import factory + from databricks.labs.dqx.contexts.workflows import RuntimeContext from databricks.labs.dqx.__about__ import __version__ -from databricks.sdk.service.workspace import ImportFormat from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.wheels import ProductInfo from databricks.labs.dqx.config import WorkspaceConfig, RunConfig @@ -50,28 +49,6 @@ def set_utc_timezone(): os.environ.pop("TZ") -@pytest.fixture -def make_check_file_as_yaml(ws, make_random, make_directory): - def create(**kwargs): - base_path = str(Path(__file__).resolve().parent.parent) - local_file_path = base_path + "/test_data/checks.yml" - if kwargs["install_dir"]: - workspace_file_path = kwargs["install_dir"] + "/checks.yml" - else: - folder = make_directory() - workspace_file_path = str(folder.absolute()) + "/checks.yml" - - with open(local_file_path, "rb") as f: - ws.workspace.upload(path=workspace_file_path, format=ImportFormat.AUTO, content=f.read(), overwrite=True) - - return workspace_file_path - - def delete(workspace_file_path: str) -> None: - ws.workspace.delete(workspace_file_path) - - yield from factory("file", create, delete) - - class CommonUtils: def __init__(self, env_or_skip_fixture, ws): self._env_or_skip = env_or_skip_fixture @@ -218,7 +195,7 @@ def webbrowser_open(): @pytest.fixture def setup_workflows(installation_ctx: MockInstallationContext, make_schema, make_table): """ - Setup the workflows for the tests + Set up the workflows for the tests Existing cluster can be used by adding: run_config.override_clusters = {Task.job_cluster: installation_ctx.workspace_client.config.cluster_id} diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 0ec48a41..040e0733 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -1,4 +1,3 @@ -from pathlib import Path import pyspark.sql.functions as F import pytest from pyspark.sql import Column @@ -10,8 +9,10 @@ ) from databricks.labs.dqx.rule import DQRule, DQRuleColSet, ColumnArguments + SCHEMA = "a: int, b: int, c: int" -EXPECTED_SCHEMA = SCHEMA + ", _errors: map, _warnings: map" +REPORTING_COLUMNS = ", _errors: map, _warnings: map" +EXPECTED_SCHEMA = SCHEMA + REPORTING_COLUMNS EXPECTED_SCHEMA_WITH_CUSTOM_NAMES = SCHEMA + ", dq_errors: map, dq_warnings: map" @@ -450,37 +451,37 @@ def test_apply_checks_by_metadata_with_filter(ws, spark): assert_df_equality(checked, expected, ignore_nullable=True) -def test_apply_checks_from_json_file_by_metadata(ws, spark): +def test_apply_checks_from_json_file_by_metadata(ws, spark, make_local_check_file_as_json): dq_engine = DQEngine(ws) schema = "col1: int, col2: int, col3: int, col4 int" test_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema) - base_path = str(Path(__file__).resolve().parent.parent) - checks = DQEngine.load_checks_from_local_file(base_path + "/test_data/checks.json") + check_file = make_local_check_file_as_json + checks = DQEngine.load_checks_from_local_file(check_file) actual = dq_engine.apply_checks_by_metadata(test_df, checks) expected = spark.createDataFrame( [[1, 3, 3, 1, None, None], [2, None, 4, 1, {"col_col2_is_null": "Column col2 is null"}, None]], - schema + ", _errors: map, _warnings: map", + schema + REPORTING_COLUMNS, ) assert_df_equality(actual, expected, ignore_nullable=True) -def test_apply_checks_from_yml_file_by_metadata(ws, spark): +def test_apply_checks_from_yml_file_by_metadata(ws, spark, make_local_check_file_as_yml): dq_engine = DQEngine(ws) schema = "col1: int, col2: int, col3: int, col4 int" test_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema) - base_path = str(Path(__file__).resolve().parent.parent) - checks = DQEngine.load_checks_from_local_file(base_path + "/test_data/checks.yml") + check_file = make_local_check_file_as_yml + checks = DQEngine.load_checks_from_local_file(check_file) actual = dq_engine.apply_checks_by_metadata(test_df, checks) expected = spark.createDataFrame( [[1, 3, 3, 1, None, None], [2, None, 4, 1, {"col_col2_is_null": "Column col2 is null"}, None]], - schema + ", _errors: map, _warnings: map", + schema + REPORTING_COLUMNS, ) assert_df_equality(actual, expected, ignore_nullable=True) @@ -658,3 +659,40 @@ def test_apply_checks_by_metadata_with_custom_column_naming_fallback_to_default( EXPECTED_SCHEMA, ), ) + + +def test_apply_checks_with_sql_expression(ws, spark): + dq_engine = DQEngine(ws) + schema = "col1: string, col2: string" + test_df = spark.createDataFrame([["str1", "str2"], ["val1", "val2"]], schema) + + checks = [ + { + "criticality": "error", + "check": {"function": "sql_expression", "arguments": {"expression": "col1 like \"val%\""}}, + }, + { + "criticality": "error", + "check": {"function": "sql_expression", "arguments": {"expression": "col2 like 'val%'"}}, + }, + ] + + checked = dq_engine.apply_checks_by_metadata(test_df, checks) + + expected_schema = schema + REPORTING_COLUMNS + expected = spark.createDataFrame( + [ + ["str1", "str2", None, None], + [ + "val1", + "val2", + { + "col_col1_like_val_": "Value matches expression: col1 like \"val%\"", + "col_col2_like_val_": "Value matches expression: col2 like 'val%'", + }, + None, + ], + ], + expected_schema, + ) + assert_df_equality(checked, expected, ignore_nullable=True) diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index 5758cd0a..63e8bdf0 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -1,9 +1,9 @@ import logging from dataclasses import dataclass - import yaml -from integration.conftest import contains_expected_workflows import pytest + +from tests.integration.conftest import contains_expected_workflows from databricks.labs.dqx.cli import ( open_remote_config, installations, @@ -15,7 +15,6 @@ ) from databricks.labs.dqx.config import WorkspaceConfig from databricks.sdk.errors import NotFound - from databricks.labs.dqx.engine import DQEngine logger = logging.getLogger(__name__) diff --git a/tests/integration/test_dlt_rules_generator.py b/tests/integration/test_dlt_rules_generator.py index 10a32c76..dc7c6ba2 100644 --- a/tests/integration/test_dlt_rules_generator.py +++ b/tests/integration/test_dlt_rules_generator.py @@ -1,5 +1,5 @@ import pytest -from integration.test_rules_generator import test_rules +from tests.integration.test_rules_generator import test_rules from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator from databricks.labs.dqx.profiler.profiler import DQProfile diff --git a/tests/integration/test_installation.py b/tests/integration/test_installation.py index b51de6ec..4ea04b42 100644 --- a/tests/integration/test_installation.py +++ b/tests/integration/test_installation.py @@ -2,8 +2,7 @@ from unittest import skip from unittest.mock import patch, create_autospec import pytest - -from integration.conftest import contains_expected_workflows +from tests.integration.conftest import contains_expected_workflows import databricks from databricks.labs.dqx.installer.mixins import InstallationMixin from databricks.labs.dqx.installer.workflows_installer import WorkflowsDeployment @@ -20,6 +19,7 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.service.dashboards import LifecycleState + logger = logging.getLogger(__name__) diff --git a/tests/integration/test_load_checks_from_file.py b/tests/integration/test_load_checks_from_file.py index 53c92c38..2f65e984 100644 --- a/tests/integration/test_load_checks_from_file.py +++ b/tests/integration/test_load_checks_from_file.py @@ -23,36 +23,75 @@ def test_load_checks_from_installation_when_checks_file_does_not_exist_in_worksp ) -def test_load_checks_from_file(ws, installation_ctx, make_check_file_as_yaml): +def test_load_checks_from_yml_file(ws, installation_ctx, make_check_file_as_yml, expected_checks): installation_ctx.installation.save(installation_ctx.config) install_dir = installation_ctx.installation.install_folder() - make_check_file_as_yaml(install_dir=install_dir) + make_check_file_as_yml(install_dir=install_dir) checks = DQEngine(ws).load_checks_from_workspace_file( workspace_path=f"{install_dir}/{installation_ctx.config.get_run_config().checks_file}" ) - assert checks, "Checks were not loaded correctly" + assert checks == expected_checks, "Checks were not loaded correctly" -def test_load_checks_from_user_installation(ws, installation_ctx, make_check_file_as_yaml): +def test_load_checks_from_json_file(ws, installation_ctx, make_check_file_as_json, expected_checks): installation_ctx.installation.save(installation_ctx.config) - make_check_file_as_yaml(install_dir=installation_ctx.installation.install_folder()) + install_dir = installation_ctx.installation.install_folder() + make_check_file_as_json(install_dir=install_dir) + + checks = DQEngine(ws).load_checks_from_workspace_file(workspace_path=f"{install_dir}/checks.json") + + assert checks == expected_checks, "Checks were not loaded correctly" + + +def test_load_invalid_checks_from_yml_file(ws, installation_ctx, make_invalid_check_file_as_yml, expected_checks): + installation_ctx.installation.save(installation_ctx.config) + install_dir = installation_ctx.installation.install_folder() + workspace_file_path = make_invalid_check_file_as_yml(install_dir=install_dir) + with pytest.raises(ValueError, match=f"Invalid or no checks in workspace file: {workspace_file_path}"): + DQEngine(ws).load_checks_from_workspace_file( + workspace_path=f"{install_dir}/{installation_ctx.config.get_run_config().checks_file}" + ) + + +def test_load_invalid_checks_from_json_file(ws, installation_ctx, make_invalid_check_file_as_json, expected_checks): + installation_ctx.installation.save(installation_ctx.config) + install_dir = installation_ctx.installation.install_folder() + workspace_file_path = make_invalid_check_file_as_json(install_dir=install_dir) + with pytest.raises(ValueError, match=f"Invalid or no checks in workspace file: {workspace_file_path}"): + DQEngine(ws).load_checks_from_workspace_file(workspace_path=f"{install_dir}/checks.json") + + +def test_load_checks_from_user_installation(ws, installation_ctx, make_check_file_as_yml, expected_checks): + installation_ctx.installation.save(installation_ctx.config) + make_check_file_as_yml(install_dir=installation_ctx.installation.install_folder()) checks = DQEngine(ws).load_checks_from_installation( run_config_name="default", assume_user=True, product_name=installation_ctx.installation.product() ) - assert checks, "Checks were not loaded correctly" + assert checks == expected_checks, "Checks were not loaded correctly" + + +def test_load_invalid_checks_from_user_installation( + ws, installation_ctx, make_invalid_check_file_as_yml, expected_checks +): + installation_ctx.installation.save(installation_ctx.config) + workspace_file_path = make_invalid_check_file_as_yml(install_dir=installation_ctx.installation.install_folder()) + with pytest.raises(ValueError, match=f"Invalid or no checks in workspace file: {workspace_file_path}"): + DQEngine(ws).load_checks_from_installation( + run_config_name="default", assume_user=True, product_name=installation_ctx.installation.product() + ) -def test_load_checks_from_global_installation(ws, installation_ctx, make_check_file_as_yaml): +def test_load_checks_from_global_installation(ws, installation_ctx, make_check_file_as_yml): product_name = installation_ctx.product_info.product_name() install_dir = f"/Shared/{product_name}" # patch the global installation to existing folder to avoid access permission issues in the workspace with patch.object(Installation, '_global_installation', return_value=install_dir): installation_ctx.installation = Installation.assume_global(ws, product_name) installation_ctx.installation.save(installation_ctx.config) - make_check_file_as_yaml(install_dir=install_dir) + make_check_file_as_yml(install_dir=install_dir) checks = DQEngine(ws).load_checks_from_installation( run_config_name="default", assume_user=False, product_name=product_name ) diff --git a/tests/test_data/checks.json b/tests/test_data/checks.json deleted file mode 100644 index 0300fb9c..00000000 --- a/tests/test_data/checks.json +++ /dev/null @@ -1,5 +0,0 @@ -[ - {"criticality":"error","check":{"function":"is_not_null","arguments":{"col_names":["col1","col2"]}}}, - {"name":"col_col3_is_null_or_empty","criticality":"error","check":{"function":"is_not_null_and_not_empty","arguments":{"col_name":"col3", "trim_strings": true}}}, - {"criticality":"warn","check":{"function":"value_is_in_list","arguments":{"col_name":"col4","allowed":[1,2]}}} -] \ No newline at end of file diff --git a/tests/test_data/checks.yml b/tests/test_data/checks.yml deleted file mode 100644 index d9e0fc4d..00000000 --- a/tests/test_data/checks.yml +++ /dev/null @@ -1,23 +0,0 @@ ---- -- criticality: error - check: - function: is_not_null - arguments: - col_names: - - col1 - - col2 -- name: col_col3_is_null_or_empty - criticality: error - check: - function: is_not_null_and_not_empty - arguments: - col_name: col3 - trim_strings: true -- criticality: warn - check: - function: value_is_in_list - arguments: - col_name: col4 - allowed: - - 1 - - 2 diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index a4c5499d..c88780aa 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,5 +1,3 @@ -import os -from pathlib import Path from pyspark.sql import SparkSession import pytest @@ -7,12 +5,3 @@ @pytest.fixture def spark_local(): return SparkSession.builder.appName("DQX Test").remote("sc://localhost").getOrCreate() - - -@pytest.fixture -def temp_yml_file(): - base_path = str(Path(__file__).resolve().parent.parent) - file_path = base_path + "/test_data/checks-local-test.yml" - yield file_path - if os.path.exists(file_path): - os.remove(file_path) diff --git a/tests/unit/test_load_checks_from_local_file.py b/tests/unit/test_load_checks_from_local_file.py index 9c052591..81fca144 100644 --- a/tests/unit/test_load_checks_from_local_file.py +++ b/tests/unit/test_load_checks_from_local_file.py @@ -1,40 +1,33 @@ -from pathlib import Path import pytest from databricks.labs.dqx.engine import DQEngine -EXPECTED_CHECKS = [ - { - "criticality": "error", - "check": {"function": "is_not_null", "arguments": {"col_names": ["col1", "col2"]}}, - }, - { - "name": "col_col3_is_null_or_empty", - "criticality": "error", - "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "col3", "trim_strings": True}}, - }, - { - "criticality": "warn", - "check": {"function": "value_is_in_list", "arguments": {"col_name": "col4", "allowed": [1, 2]}}, - }, -] -BASE_PATH = str(Path(__file__).resolve().parent.parent) - - -def test_load_checks_from_local_file_json(): - file = BASE_PATH + "/test_data/checks.json" +def test_load_checks_from_local_file_json(make_local_check_file_as_json, expected_checks): + file = make_local_check_file_as_json checks = DQEngine.load_checks_from_local_file(file) - assert checks == EXPECTED_CHECKS, "The loaded checks do not match the expected checks." + assert checks == expected_checks, "The loaded checks do not match the expected checks." -def test_load_checks_from_local_file_yml(temp_yml_file): - file = BASE_PATH + "/test_data/checks.yml" +def test_load_checks_from_local_file_yml(make_local_check_file_as_yml, expected_checks): + file = make_local_check_file_as_yml checks = DQEngine.load_checks_from_local_file(file) - assert checks == EXPECTED_CHECKS, "The loaded checks do not match the expected checks." + assert checks == expected_checks, "The loaded checks do not match the expected checks." + + +def test_load_invalid_checks_from_local_file_json(make_invalid_local_check_file_as_json, expected_checks): + file = make_invalid_local_check_file_as_json + with pytest.raises(ValueError, match=f"Invalid or no checks in file: {file}"): + DQEngine.load_checks_from_local_file(file) + + +def test_load_invalid_checks_from_local_file_yml(make_invalid_local_check_file_as_yml, expected_checks): + file = make_invalid_local_check_file_as_yml + with pytest.raises(ValueError, match=f"Invalid or no checks in file: {file}"): + DQEngine.load_checks_from_local_file(file) def test_load_checks_from_local_file_when_filename_is_empty(): - with pytest.raises(ValueError, match="filename must be provided"): + with pytest.raises(ValueError, match="filepath must be provided"): DQEngine.load_checks_from_local_file("") diff --git a/tests/unit/test_save_checks_in_local_file.py b/tests/unit/test_save_checks_in_local_file.py index afcccb35..38eafd76 100644 --- a/tests/unit/test_save_checks_in_local_file.py +++ b/tests/unit/test_save_checks_in_local_file.py @@ -7,13 +7,13 @@ ] -def test_save_checks_to_local_file(temp_yml_file): - file = temp_yml_file +def test_save_checks_to_local_file(make_local_check_file_as_yml): + file = make_local_check_file_as_yml DQEngine.save_checks_in_local_file(TEST_CHECKS, file) checks = DQEngine.load_checks_from_local_file(file) assert checks == TEST_CHECKS, "The loaded checks do not match the expected checks." def test_save_checks_to_local_file_when_filename_is_empty(): - with pytest.raises(ValueError, match="filename must be provided"): + with pytest.raises(ValueError, match="filepath must be provided"): DQEngine.save_checks_in_local_file(TEST_CHECKS, "")