Skip to content

Commit

Permalink
Added Databricks CLI validation checks command (#70)
Browse files Browse the repository at this point in the history
## Changes
* Added validation check cli command
* Updated demos
* Corrected profiler

### Linked issues

Resolves #33 

### Tests

- [x] manually tested
- [x] added unit tests
- [x] added integration tests
  • Loading branch information
mwojtyczka authored Dec 11, 2024
1 parent fd69f8f commit 2e48fe3
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 107 deletions.
10 changes: 7 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,19 @@ Configure auth to Databricks workspace for integration testing by configuring cr
If you want to run the tests from an IDE you must setup `.env` or `~/.databricks/debug-env.json` file
(see [instructions](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture)).

To run the integration tests from a command line you need to setup environment variables first:
Setup required environment variables for executing integration tests and code coverage:
```shell
export DATABRICKS_CLIENT_ID=<client-id>
export DATABRICKS_HOST=https://<workspace-url>
export DATABRICKS_CLUSTER_ID=<cluster-id>
# set either service principal credentials
export DATABRICKS_CLIENT_ID=<client-id>
export DATABRICKS_CLIENT_SECRET=<client-secret>
export DATABRICKS_HOST=https://<workspace-url>
# or PAT token
export DATABRICKS_TOKEN=<pat-token>
```

Run integration tests with the following command:
```shell
make integration
```

Expand Down
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,22 +262,26 @@ The following DQX configuration from 'config.yml' will be used by default:

## Validating quality rules (checks)

If you manually adjust the generated rules or create your own configuration, you can validate them before using in your application
(see [Adding quality checks to the application](#adding-quality-checks-to-the-application)).
If you manually adjust the generated rules or create your own configuration, you can validate them before using:

### In Python

```python
from databricks.labs.dqx.utils import validate_rules
from databricks.labs.dqx.engine import DQEngine
valid = validate_rules(checks) # returns boolean
status = DQEngine.validate_checks(checks)
print(status)
```

The checks validated automatically when applied as part of the
`apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods
(see [Quality rules defined as config](#quality-rules-defined-as-config)).

### Using CLI

Validate checks stored in the installation folder:
```commandline
databricks labs dqx validate --run-config "default"
databricks labs dqx validate-checks --run-config "default"
```

The following DQX configuration from 'config.yml' will be used by default:
Expand Down Expand Up @@ -339,6 +343,8 @@ valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
```

Check are validated automatically as part of the `apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods.

**Method 2: load checks from a workspace file**

The checks can also be loaded from any file in the Databricks workspace:
Expand Down Expand Up @@ -605,7 +611,7 @@ Feel free to submit a PR to DQX with your own check so that other can benefit fr

# Contribution

See contribution guidance [here](CONTRIBUTING.md).
See contribution guidance [here](CONTRIBUTING.md) on how to contribute to the project (build, test, and submit a PR).

[[back to top](#databricks-labs-dqx)]

Expand Down
36 changes: 31 additions & 5 deletions demos/dqx_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)
display(summary_stats)
display(profiles)
print(summary_stats)
print(profiles)

# generate DQX quality rules/checks
generator = DQGenerator(ws)
Expand All @@ -57,7 +57,33 @@
# generate DLT expectations
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)
display(dlt_expectations)
print(dlt_expectations)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Validate quality checks

# COMMAND ----------

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

checks = yaml.safe_load("""
- criticality: "invalid_criticality"
check:
function: "is_not_null"
arguments:
col_names:
- "col1"
- "col2"
""")

dq_engine = DQEngine(WorkspaceClient())
status = dq_engine.validate_checks(checks)
print(status.has_errors)
display(status.errors)

# COMMAND ----------

Expand Down Expand Up @@ -98,10 +124,10 @@
schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes, checks are validated automatically
#valid_df, quarantined_df = apply_checks_by_metadata_and_split(input_df, checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`), checks are validated automatically
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
display(valid_and_quarantined_df)
Expand Down
8 changes: 8 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,12 @@ commands:
table_template: |-
Path\tVersion\Input
{{range .}}{{.path}}\t{{.version}}\t{{.input}}
{{end}}
- name: validate-checks
description: Validate checks
flags:
- name: run-config
description: Run config to use
table_template: |-
{{range .}}{{.error}}
{{end}}
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ python="3.10"
path = ".venv"

[tool.hatch.envs.default.scripts]
test = "pytest -n 2 --cov src --cov-report=xml --timeout 30 tests/unit --durations 20"
coverage = "pytest -n 2 --cov src tests/ --timeout 30 --cov-report=html --durations 20"
integration = "pytest -n 10 --cov src tests/integration --durations 20"
test = "pytest -n 10 --cov src --cov-report=xml --timeout 30 tests/unit --durations 20"
coverage = "pytest -n 10 --cov src tests/ --timeout 240 --cov-report=html --durations 20"
integration = "pytest -n 10 --timeout 240 --cov src tests/integration --durations 20"
fmt = ["black . --extend-exclude 'demos/'",
"ruff check . --fix",
"mypy . --exclude 'demos/*'",
Expand Down
46 changes: 36 additions & 10 deletions src/databricks/labs/dqx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,36 @@

from databricks.labs.dqx.config import WorkspaceConfig
from databricks.labs.dqx.contexts.workspace_cli import WorkspaceContext
from databricks.labs.dqx.engine import DQEngine

dqx = App(__file__)
logger = get_logger(__file__)

CANT_FIND_DQX_MSG = (
"Couldn't find DQX configuration in the user's home folder. "
"Make sure the current user has configured and installed DQX."
)


@dqx.command
def open_remote_config(w: WorkspaceClient):
def open_remote_config(w: WorkspaceClient, *, ctx: WorkspaceContext | None = None):
"""
Opens remote configuration in the browser.
:param w: The WorkspaceClient instance to use for accessing the workspace.
:param ctx: The WorkspaceContext instance to use for accessing the workspace.
"""
ctx = WorkspaceContext(w)
workspace_link = ctx.installation.workspace_link("config.yml")
ctx = ctx or WorkspaceContext(w)
workspace_link = ctx.installation.workspace_link(WorkspaceConfig.__file__)
webbrowser.open(workspace_link)


@dqx.command
def installations(w: WorkspaceClient):
def installations(w: WorkspaceClient, *, product_name: str = "dqx") -> list[dict]:
"""
Show installations by different users on the same workspace.
:param w: The WorkspaceClient instance to use for accessing the workspace.
:param product_name: The name of the product to search for in the installation folder.
"""
logger.info("Fetching installations...")
all_users = []
for installation in Installation.existing(w, "dqx"):
for installation in Installation.existing(w, product_name):
try:
config = installation.load(WorkspaceConfig)
all_users.append(
Expand All @@ -53,7 +51,35 @@ def installations(w: WorkspaceClient):
continue
except SerdeError:
continue

print(json.dumps(all_users))
return all_users


@dqx.command
def validate_checks(
w: WorkspaceClient, *, run_config: str = "default", ctx: WorkspaceContext | None = None
) -> list[dict]:
"""
Validate checks stored in the installation directory as a file.
:param w: The WorkspaceClient instance to use for accessing the workspace.
:param run_config: The name of the run configuration to use.
:param ctx: The WorkspaceContext instance to use for accessing the workspace.
"""
ctx = ctx or WorkspaceContext(w)
config = ctx.installation.load(WorkspaceConfig)
checks_file = f"{ctx.installation.install_folder()}/{config.get_run_config(run_config).checks_file}"
dq_engine = DQEngine(w)
checks = dq_engine.load_checks_from_workspace_file(checks_file)
status = dq_engine.validate_checks(checks)

errors_list = []
if status.has_errors:
errors_list = [{"error": error} for error in status.errors]

print(json.dumps(errors_list))
return errors_list


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion src/databricks/labs/dqx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ class WorkspaceConfig:
log_level: str | None = "INFO"
connect: Config | None = None

def get_run_config(self, run_config_name: str | None = None) -> RunConfig:
def get_run_config(self, run_config_name: str | None = "default") -> RunConfig:
"""Get the run configuration for a given run name, or the default configuration if no run name is provided.
:param run_config_name: The name of the run configuration to get.
:return: The run configuration.
:raises ValueError: If no run configurations are available or if the specified run configuration name is
not found.
"""
if not self.run_configs:
raise ValueError("No run configurations available")
Expand Down
Loading

0 comments on commit 2e48fe3

Please sign in to comment.