Skip to content

Commit

Permalink
Make profiler and generator dependent on UC and introduce run configu…
Browse files Browse the repository at this point in the history
…rations (#68)

## Changes
* Made profiler and generator dependent on UC/Databricks
* Introduced RunConfig to be able to specify multiple configurations in
the config to be able to run the project (e.g. profiler) for different
use cases
* Refactored tests
* Improved Readme
* Added new examples to demos

### Tests
- [x] manually tested the whole project on Databricks including demos
- [x] added unit tests
- [x] added integration tests
  • Loading branch information
mwojtyczka authored Nov 29, 2024
1 parent ed835bb commit 61f1b7d
Show file tree
Hide file tree
Showing 22 changed files with 1,378 additions and 1,019 deletions.
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ This section provides a step-by-step guide to set up and start working on the pr

Go through the [prerequisites](./README.md#prerequisites) and clone the [dqx github repo](https://github.com/databrickslabs/dqx).

To begin, install [Hatch](https://github.com/pypa/hatch), which is our build tool.

On MacOSX, this is achieved using the following:
```shell
brew install hatch
```

Run the following command to create the default environment and install development dependencies, assuming you've already cloned the github repo.
```shell
make dev
Expand Down Expand Up @@ -111,6 +118,7 @@ make coverage
## Running CLI from the local repo

Once you clone the repo locally and install Databricks CLI you can run labs CLI commands.
Similar to other databricks cli commands we can specify profile to use with `--profile`.

Authenticate your current machine to your Databricks Workspace:
```commandline
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
all: clean lint fmt test coverage
all: clean dev lint fmt test integration coverage

clean:
rm -fr .venv clean htmlcov .mypy_cache .pytest_cache .ruff_cache .coverage coverage.xml
Expand Down
266 changes: 168 additions & 98 deletions README.md

Large diffs are not rendered by default.

123 changes: 104 additions & 19 deletions demos/dqx_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# MAGIC ### Installation DQX in the workspace
# MAGIC
# MAGIC Install DQX in the workspace as per the instructions [here](https://github.com/databrickslabs/dqx?tab=readme-ov-file#installation).
# MAGIC Use default filename for data quality rules.

# COMMAND ----------

Expand All @@ -33,14 +34,30 @@

# COMMAND ----------

from databricks.labs.dqx.profiler.profiler import profile
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.sdk import WorkspaceClient
import yaml

schema = "col1: int, col2: int, col3: int, col4 int"
input_df = spark.createDataFrame([[1, 3, 3, 1], [2, None, 4, 1]], schema)

summary_stats, checks = profile(input_df)
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)
display(summary_stats)
display(checks)
display(profiles)

# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # with default level "error"
print(yaml.safe_dump(checks))

# generate DLT expectations
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)
display(dlt_expectations)

# COMMAND ----------

Expand All @@ -50,8 +67,8 @@
# COMMAND ----------

import yaml
from databricks.labs.dqx.engine import apply_checks_by_metadata, apply_checks_by_metadata_and_split

from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

checks = yaml.safe_load("""
- criticality: "error"
Expand Down Expand Up @@ -85,7 +102,8 @@
#valid_df, quarantined_df = apply_checks_by_metadata_and_split(input_df, checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = apply_checks_by_metadata(input_df, checks)
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
display(valid_and_quarantined_df)

# COMMAND ----------
Expand All @@ -96,8 +114,8 @@
# COMMAND ----------

from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list
from databricks.labs.dqx.engine import DQRule, DQRuleColSet, apply_checks, apply_checks_and_split

from databricks.labs.dqx.engine import DQEngine, DQRule, DQRuleColSet
from databricks.sdk import WorkspaceClient

checks = DQRuleColSet( # define rule for multiple columns at once
columns=["col1", "col2"],
Expand All @@ -119,7 +137,8 @@
#valid_df, quarantined_df = apply_checks_and_split(input_df, checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = apply_checks(input_df, checks)
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)

display(valid_and_quarantined_df)

Expand Down Expand Up @@ -190,19 +209,21 @@

# COMMAND ----------

from databricks.labs.dqx.engine import apply_checks_by_metadata, apply_checks_by_metadata_and_split
from databricks.labs.dqx.engine import load_checks_from_file
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.blueprint.installation import Installation

# use check file specified in the default installation config ('config.yml')
# if filename provided it's a relative path to the workspace installation directory
ws = WorkspaceClient()
installation = Installation.current(ws, "dqx", assume_user=True)
checks = load_checks_from_file(installation)
dq_engine = DQEngine(WorkspaceClient())

# load checks from the default run configuration
checks = dq_engine.load_checks_from_installation(assume_user=True)
#or load from checks from a workspace file
# checks = dq_engine.load_checks_from_workspace_file(workspace_file_path)
print(checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = apply_checks_by_metadata(input_df, checks)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
display(valid_and_quarantined_df)

# COMMAND ----------
Expand All @@ -222,7 +243,6 @@
# Define our Data Quality cheks
import yaml


checks = yaml.safe_load("""
- check:
function: "is_not_null"
Expand Down Expand Up @@ -279,11 +299,14 @@

# COMMAND ----------

from databricks.labs.dqx.engine import apply_checks_by_metadata_and_split
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

# Apply checks when processing to silver layer
bronze = spark.read.format("delta").load(bronze_path)
silver, quarantine = apply_checks_by_metadata_and_split(bronze, checks)
silver, quarantine = dq_engine.apply_checks_by_metadata_and_split(bronze, checks)

# COMMAND ----------

Expand All @@ -292,3 +315,65 @@
# COMMAND ----------

display(quarantine)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Create own custom checks

# COMMAND ----------

# MAGIC %md
# MAGIC ### Create custom check function

# COMMAND ----------

import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.col_functions import make_condition

def ends_with_foo(col_name: str) -> Column:
column = F.col(col_name)
return make_condition(column.endswith("foo"), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo")

# COMMAND ----------

# MAGIC %md
# MAGIC ### Apply custom check function

# COMMAND ----------

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import *

# use built-in, custom and sql expression checks
checks = yaml.safe_load(
"""
- criticality: "error"
check:
function: "is_not_null_and_not_empty"
arguments:
col_name: "col1"
- criticality: "error"
check:
function: "ends_with_foo"
arguments:
col_name: "col1"
- criticality: "error"
check:
function: "sql_expression"
arguments:
expression: "col1 LIKE 'str%'"
msg: "col1 starts with 'str'"
"""
)

schema = "col1: string"
input_df = spark.createDataFrame([["str1"], ["foo"], ["str3"]], schema)

dq_engine = DQEngine(WorkspaceClient())

valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
display(valid_and_quarantined_df)
14 changes: 9 additions & 5 deletions demos/dqx_dlt_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Databricks notebook source
# 1. Install DQX in the workspace as per the instructions here: https://github.com/databrickslabs/dqx?tab=readme-ov-file#installation
# Use default filename for data quality rules.

# 2. Install DQX in the cluster
user_name = "[email protected]" # cannot dynamically retrieve user name as "System-User" is always returned: spark.sql('select current_user() as user').collect()[0]['user']
Expand Down Expand Up @@ -27,7 +28,8 @@
# COMMAND ----------

import dlt
from databricks.labs.dqx.engine import apply_checks_by_metadata, get_invalid, get_valid
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

# COMMAND ----------

Expand All @@ -39,7 +41,7 @@ def bronze():

# COMMAND ----------

# Define our Data Quality cheks
# Define Data Quality checks
import yaml


Expand Down Expand Up @@ -117,24 +119,26 @@ def bronze():

# COMMAND ----------

dq_engine = DQEngine(WorkspaceClient())

# Read data from Bronze and apply checks
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return apply_checks_by_metadata(df, checks)
return dq_engine.apply_checks_by_metadata(df, checks)

# COMMAND ----------

# # get rows without errors or warnings, and drop auxiliary columns
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
return get_valid(df)
return dq_engine.get_valid(df)

# COMMAND ----------

# get only rows with errors or warnings
@dlt.table
def quarantine():
df = dlt.read_stream("bronze_dq_check")
return get_invalid(df)
return dq_engine.get_invalid(df)
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ bad-functions = ["map", "input"]
# ignored-parents =

# Maximum number of arguments for function / method.
max-args = 9
max-positional-arguments=9
max-args = 10
max-positional-arguments=10

# Maximum number of attributes for a class (see R0902).
max-attributes = 11
Expand Down
26 changes: 26 additions & 0 deletions src/databricks/labs/dqx/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import abc
from typing import final
from functools import cached_property
from databricks.sdk import WorkspaceClient


class DQEngineBase(abc.ABC):
def __init__(self, workspace_client: WorkspaceClient):
self._workspace_client = workspace_client

@cached_property
def ws(self) -> WorkspaceClient:
"""
Cached property to verify and return the workspace client.
"""
return self._verify_workspace_client(self._workspace_client)

@staticmethod
@final
def _verify_workspace_client(ws: WorkspaceClient) -> WorkspaceClient:
"""
Verifies the Databricks workspace client configuration.
"""
# make sure Unity Catalog is accessible in the current Databricks workspace
ws.catalogs.list()
return ws
1 change: 0 additions & 1 deletion src/databricks/labs/dqx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def installations(w: WorkspaceClient):
config = installation.load(WorkspaceConfig)
all_users.append(
{
"input": config.input_location,
"version": config.__version__,
"path": installation.install_folder(),
}
Expand Down
38 changes: 31 additions & 7 deletions src/databricks/labs/dqx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@

from databricks.sdk.core import Config

__all__ = ["WorkspaceConfig"]
__all__ = ["WorkspaceConfig", "RunConfig"]


@dataclass
class RunConfig:
"""Configuration class for the data quality checks"""

name: str = "default" # name of the run configuration
input_locations: str | None = None # input data path or a table
input_format: str | None = "delta" # input data format
output_table: str | None = None # output data table
quarantine_table: str | None = None # quarantined data table
checks_file: str | None = "checks.yml" # file containing quality rules / checks
profile_summary_stats_file: str | None = "profile_summary_stats.yml" # file containing profile summary statistics


@dataclass
Expand All @@ -12,12 +25,23 @@ class WorkspaceConfig:
__file__ = "config.yml"
__version__ = 2

run_configs: list[RunConfig]
log_level: str | None = "INFO"
connect: Config | None = None

input_location: str | None = None # input data path or a table
output_location: str | None = None # output data path or a table
quarantine_location: str | None = None # quarantined data path or a table
curated_location: str | None = None # curated data path or a table
checks_file: str | None = None # name of the file containing quality rules / checks
profile_summary_stats_file: str | None = None # name of the file containing profile summary statistics
def get_run_config(self, run_config_name: str | None = None) -> RunConfig:
"""Get the run configuration for a given run name, or the default configuration if no run name is provided.
:param run_config_name: The name of the run configuration to get.
:return: The run configuration.
"""
if not self.run_configs:
raise ValueError("No run configurations available")

if not run_config_name:
return self.run_configs[0]

for run in self.run_configs:
if run.name == run_config_name:
return run

raise ValueError("No run configurations available")
Loading

0 comments on commit 61f1b7d

Please sign in to comment.