Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added profiling job #72

Merged
merged 31 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ body:
id: os
attributes:
label: Operating System
description: Which operating system do you have UCX installed on?
description: Which operating system do you have DQX installed on?
options:
- macOS
- Linux
Expand Down
32 changes: 23 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Simplified Data Quality checking at Scale for PySpark Workloads on streaming and
* [Uninstall DQX from the Databricks workspace](#uninstall-dqx-from-the-databricks-workspace)
* [How to use it](#how-to-use-it)
* [Demos](#demos)
* [Data Profiling](#data-profiling)
* [Data Profiling and Quality Rules Generation](#data-profiling-and-quality-rules-generation)
* [In Python](#in-python)
* [Using CLI](#using-cli)
* [Validating quality rules (checks)](#validating-quality-rules--checks-)
Expand Down Expand Up @@ -125,6 +125,7 @@ and other configuration options.
The cli command will install the following components in the workspace:
- A Python [wheel file](https://peps.python.org/pep-0427/) with the library packaged.
- DQX configuration file ('config.yml').
- Profiling workflow for generating quality rule candidates.
- Quality dashboard for monitoring to display information about the data quality issues.

DQX configuration file can contain multiple run configurations defining specific set of input, output and quarantine locations etc.
Expand All @@ -136,7 +137,7 @@ run_config:
- name: default
checks_file: checks.yml
curated_location: main.dqx.curated
input_locations: main.dqx.input
input_location: main.dqx.input
output_location: main.dqx.output
profile_summary_stats_file: profile_summary_stats.yml
quarantine_location: main.dqx.quarantine
Expand All @@ -152,6 +153,11 @@ by setting 'DQX_FORCE_INSTALL' environment variable. The following options are a
* `DQX_FORCE_INSTALL=global databricks labs install dqx`: will force the installation to be for root only (`/Applications/dqx`)
* `DQX_FORCE_INSTALL=user databricks labs install dqx`: will force the installation to be for user only (`/Users/<user>/.dqx`)

To list all installed dqx workflows in the workspace and their latest run state, execute the following command:
```commandline
databricks labs dqx workflows
```

### Install the tool on the Databricks cluster

After you install the tool on the workspace, you need to install the DQX package on a Databricks cluster.
Expand Down Expand Up @@ -212,7 +218,7 @@ you can upload the following notebooks in the Databricks workspace to try it out
* [DQX Demo Notebook](demos/dqx_demo.py) - demonstrates how to use DQX for data quality checks.
* [DQX DLT Demo Notebook](demos/dqx_dlt_demo.py) - demonstrates how to use DQX with Delta Live Tables (DLT).

## Data Profiling
## Data Profiling and Quality Rules Generation

Data profiling is run to profile the input data and generate quality rule candidates with summary statistics.
The generated rules/checks are input for the quality checking (see [Adding quality checks to the application](#adding-quality-checks-to-the-application)).
Expand Down Expand Up @@ -246,19 +252,27 @@ dlt_expectations = dlt_generator.generate_dlt_rules(profiles)
### Using CLI

You must install DQX in the workspace before (see [installation](#installation-in-a-databricks-workspace)).
As part of the installation, profiler workflow is installed. It can be run manually in the workspace UI or using the CLI as below.

Run profiling job:
Run profiler workflow:
```commandline
databricks labs dqx profile --run-config "default"
```

If run config is not provided, the "default" run config will be used. The run config is used to select specific run configuration from 'config.yml'.
You will find the generated quality rule candidates and summary statistics in the installation folder as defined in the run config.
If run config is not provided, the "default" run config will be used. The run config is used to select specific run configuration from the 'config.yml'.

The following DQX configuration from 'config.yml' will be used by default:
The following DQX configuration from 'config.yml' are used:
- 'input_location': input data as a path or a table.
- 'input_format': input data format.
- 'checks_file': relative location of the generated quality rule candidates (default: `checks.yml`). Can be json or yaml file.
- 'profile_summary_stats_file': relative location of the summary statistics (default: `profile_summary.yml`). Can be json or yaml file.
- 'input_format': input data format. Required if input data is a path.
- 'checks_file': relative location of the generated quality rule candidates (default: `checks.yml`).
- 'profile_summary_stats_file': relative location of the summary statistics (default: `profile_summary.yml`).

Logs are be printed in the console and saved in the installation folder.
To show the saved logs from the latest profiler workflow run, visit the Databricks workspace UI or execute the following command:
```commandline
databricks labs dqx logs --workflow profiler
```

## Validating quality rules (checks)

Expand Down
10 changes: 6 additions & 4 deletions demos/dqx_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# MAGIC %md
# MAGIC ### Installation DQX in the workspace
# MAGIC
# MAGIC Install DQX in the workspace as per the instructions [here](https://github.com/databrickslabs/dqx?tab=readme-ov-file#installation).
# MAGIC Install DQX in the workspace (default user installation) as per the instructions [here](https://github.com/databrickslabs/dqx?tab=readme-ov-file#installation).
# MAGIC Use default filename for data quality rules.

# COMMAND ----------
Expand All @@ -17,11 +17,13 @@

# COMMAND ----------

import subprocess
import glob
import os

user_name = spark.sql('select current_user() as user').collect()[0]['user']
pip_install_path = f"/Workspace/Users/{user_name}/.dqx/wheels/databricks_labs_dqx-*.whl"
%pip install {pip_install_path}
dqx_wheel_files = glob.glob(f"/Workspace/Users/{user_name}/.dqx/wheels/databricks_labs_dqx-*.whl")
dqx_latest_wheel = max(dqx_wheel_files, key=os.path.getctime)
%pip install {dqx_latest_wheel}

# COMMAND ----------

Expand Down
10 changes: 7 additions & 3 deletions demos/dqx_dlt_demo.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# Databricks notebook source
# 1. Install DQX in the workspace as per the instructions here: https://github.com/databrickslabs/dqx?tab=readme-ov-file#installation
# 1. Install DQX in the workspace (default user installation) as per the instructions here: https://github.com/databrickslabs/dqx?tab=readme-ov-file#installation
# Use default filename for data quality rules.

# 2. Install DQX in the cluster
import glob
import os

user_name = "[email protected]" # cannot dynamically retrieve user name as "System-User" is always returned: spark.sql('select current_user() as user').collect()[0]['user']
pip_install_path = f"/Workspace/Users/{user_name}/.dqx/wheels/databricks_labs_dqx-*.whl"
%pip install {pip_install_path}
dqx_wheel_files = glob.glob(f"/Workspace/Users/{user_name}/.dqx/wheels/databricks_labs_dqx-*.whl")
dqx_latest_wheel = max(dqx_wheel_files, key=os.path.getctime)
%pip install {dqx_latest_wheel}

# COMMAND ----------

Expand Down
22 changes: 19 additions & 3 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
name: dqx
description: Common libraries for Databricks Labs
install:
script: src/databricks/labs/dqx/install.py
script: src/databricks/labs/dqx/installer/install.py
uninstall:
script: src/databricks/labs/dqx/uninstall.py
script: src/databricks/labs/dqx/installer/uninstall.py
entrypoint: src/databricks/labs/dqx/cli.py
min_python: 3.10
commands:
Expand All @@ -23,4 +23,20 @@ commands:
description: Run config to use
table_template: |-
{{range .}}{{.error}}
{{end}}
{{end}}
- name: profile
description: Profile input data and generate quality rule (checks) candidates
flags:
- name: run-config
description: (Optional) Selects run configuration from installation config. If not provided, use the "default" run configuration.
- name: workflows
description: Show deployed workflows and their latest run state
table_template: |-
Workflow\tWorkflow ID\tState\tStarted
{{range .}}{{.workflow}}\t{{.workflow_id}}\t{{.state}}\t{{.started}}
{{end}}
- name: logs
description: Show logs from the latest job run
flags:
- name: workflow
description: Name of the workflow to show logs for, e.g. profiler
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ dependencies = ["databricks-labs-blueprint>=0.9.1,<0.10",
"databricks-sdk~=0.30",
"databricks-labs-lsql>=0.5,<0.13"]

[project.entry-points.databricks]
runtime = "databricks.labs.dqx.runtime:main"

[project.urls]
Issues = "https://github.com/databrickslabs/dqx/issues"
Source = "https://github.com/databrickslabs/dqx"
Expand Down Expand Up @@ -76,8 +79,8 @@ path = ".venv"

[tool.hatch.envs.default.scripts]
test = "pytest -n 10 --cov src --cov-report=xml --timeout 30 tests/unit --durations 20"
coverage = "pytest -n 10 --cov src tests/ --timeout 240 --cov-report=html --durations 20"
integration = "pytest -n 10 --timeout 240 --cov src tests/integration --durations 20"
coverage = "pytest -n 10 --cov src tests/ --timeout 480 --cov-report=html --durations 20"
integration = "pytest -n 10 --timeout 480 --cov src tests/integration --durations 20"
fmt = ["black . --extend-exclude 'demos/'",
"ruff check . --fix",
"mypy . --exclude 'demos/*'",
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/dqx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
r"(?:\+(?P<build>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$"
)

# Add ucx/<version> for projects depending on ucx as a library
# Add dqx/<version> for projects depending on dqx as a library
ua.with_extra("dqx", __version__)

# Add ucx/<version> for re-packaging of ucx, where product name is omitted
# Add dqx/<version> for re-packaging of dqx, where product name is omitted
ua.with_product("dqx", __version__)
41 changes: 41 additions & 0 deletions src/databricks/labs/dqx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,46 @@ def validate_checks(
return errors_list


@dqx.command
def profile(w: WorkspaceClient, *, run_config: str = "default", ctx: WorkspaceContext | None = None) -> None:
"""
Profile input data and generate quality rule (checks) candidates.

:param w: The WorkspaceClient instance to use for accessing the workspace.
:param run_config: The name of the run configuration to use.
:param ctx: The WorkspaceContext instance to use for accessing the workspace.
"""
ctx = ctx or WorkspaceContext(w)
ctx.deployed_workflows.run_workflow("profiler", run_config)


@dqx.command
def workflows(w: WorkspaceClient, *, ctx: WorkspaceContext | None = None):
"""
Show deployed workflows and their state

:param w: The WorkspaceClient instance to use for accessing the workspace.
:param ctx: The WorkspaceContext instance to use for accessing the workspace.
"""
ctx = ctx or WorkspaceContext(w)
logger.info("Fetching deployed jobs...")
latest_job_status = ctx.deployed_workflows.latest_job_status()
print(json.dumps(latest_job_status))
return latest_job_status


@dqx.command
def logs(w: WorkspaceClient, *, workflow: str | None = None, ctx: WorkspaceContext | None = None):
"""
Show logs of the latest job run.

:param w: The WorkspaceClient instance to use for accessing the workspace.
:param workflow: The name of the workflow to show logs for.
:param ctx: The WorkspaceContext instance to use for accessing the workspace
"""
ctx = ctx or WorkspaceContext(w)
ctx.deployed_workflows.relay_logs(workflow)


if __name__ == "__main__":
dqx()
4 changes: 3 additions & 1 deletion src/databricks/labs/dqx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ class RunConfig:
"""Configuration class for the data quality checks"""

name: str = "default" # name of the run configuration
input_locations: str | None = None # input data path or a table
input_location: str | None = None # input data path or a table
input_format: str | None = "delta" # input data format
output_table: str | None = None # output data table
quarantine_table: str | None = None # quarantined data table
checks_file: str | None = "checks.yml" # file containing quality rules / checks
profile_summary_stats_file: str | None = "profile_summary_stats.yml" # file containing profile summary statistics
override_clusters: dict[str, str] | None = None
spark_conf: dict[str, str] | None = None


@dataclass
Expand Down
12 changes: 5 additions & 7 deletions src/databricks/labs/dqx/contexts/application.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import abc
import logging
from datetime import timedelta
from functools import cached_property

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.blueprint.tui import Prompts
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2
from databricks.labs.dqx.installer.workflows_installer import DeployedWorkflows
from databricks.sdk import WorkspaceClient

from databricks.labs.dqx.config import WorkspaceConfig
Expand Down Expand Up @@ -57,10 +57,6 @@ def installation(self):
def config(self) -> WorkspaceConfig:
return self.installation.load(WorkspaceConfig)

@cached_property
def verify_timeout(self):
return timedelta(minutes=2)

@cached_property
def wheels(self):
return WheelsV2(self.installation, self.product_info)
Expand All @@ -69,12 +65,14 @@ def wheels(self):
def install_state(self):
return InstallState.from_installation(self.installation)

@cached_property
def deployed_workflows(self) -> DeployedWorkflows:
return DeployedWorkflows(self.workspace_client, self.install_state)


class CliContext(GlobalContext, abc.ABC):
"""
Abstract base class for global context, providing common properties and methods for workspace management.

:param named_parameters: Optional dictionary of named parameters.
"""

@cached_property
Expand Down
91 changes: 0 additions & 91 deletions src/databricks/labs/dqx/contexts/workflow_task.py

This file was deleted.

Loading
Loading