Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make DQX compatible with Serverless #147

Merged
merged 24 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/acceptance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,41 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ARM_CLIENT_ID: ${{ secrets.ARM_CLIENT_ID }}
ARM_TENANT_ID: ${{ secrets.ARM_TENANT_ID }}

serverless_integration:
if: github.event_name == 'pull_request' && github.event.pull_request.draft == false
environment: tool
runs-on: larger
env:
DATABRICKS_SERVERLESS_COMPUTE_ID: auto
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Install Python
uses: actions/setup-python@v5
with:
cache: 'pip'
cache-dependency-path: '**/pyproject.toml'
python-version: '3.10'

- name: Install hatch
run: pip install hatch==1.9.4

- name: Fetch relevant branches
run: |
git fetch origin $GITHUB_BASE_REF:$GITHUB_BASE_REF
git fetch origin $GITHUB_HEAD_REF:$GITHUB_HEAD_REF

- name: Run integration tests on serverless cluster
uses: databrickslabs/sandbox/acceptance@acceptance/v0.4.2
with:
vault_uri: ${{ secrets.VAULT_URI }}
timeout: 2h
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ARM_CLIENT_ID: ${{ secrets.ARM_CLIENT_ID }}
ARM_TENANT_ID: ${{ secrets.ARM_TENANT_ID }}
DATABRICKS_SERVERLESS_COMPUTE_ID: ${{ env.DATABRICKS_SERVERLESS_COMPUTE_ID }}
13 changes: 10 additions & 3 deletions demos/dqx_demo_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,24 @@
# profile the input data
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)
print(summary_stats)
print(yaml.safe_dump(summary_stats))
print(profiles)

# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # with default level "error"
print(yaml.safe_dump(checks))

# generate DLT expectations
# generate Delta Live Table (DLT) expectations
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
print(dlt_expectations)

# save generated checks in a workspace file
Expand Down
66 changes: 58 additions & 8 deletions docs/dqx/docs/dev/contributing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,29 @@ make lint
make test
```

Setup required environment variables for executing integration tests and code coverage using the command line.
Note that integration tests are run automatically when you create a Pull Request in Github.
You can also run them from a local machine by configuring authentication to a Databricks workspace as below:
### Local setup for integration tests and code coverage

Note that integration tests and code coverage are run automatically when you create a Pull Request in Github.
You can also trigger the tests from a local machine by configuring authentication to a Databricks workspace.
You can use any Unity Catalog enabled Databricks workspace.

#### Using terminal

If you want to run integration tests and code coverage from your local machine, you need to set up the following environment variables:
```shell
export DATABRICKS_HOST=https://<workspace-url>
export DATABRICKS_CLUSTER_ID=<cluster-id>
# set either service principal credentials

# either use PAT token for authentication
export DATABRICKS_TOKEN=<pat-token>
Comment on lines -102 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't recommend PATs - they're considered bad practice since the introduction of OAuth

Copy link
Contributor Author

@mwojtyczka mwojtyczka Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, Thanks for the review, i will correct in the next PR


# or use service principal OAuth token for authentication
export DATABRICKS_CLIENT_ID=<client-id>
export DATABRICKS_CLIENT_SECRET=<client-secret>
# or PAT token
export DATABRICKS_TOKEN=<pat-token>

# Optionally enable serverless compute to be used for the tests.
# Note that we run integration tests on standard and serverless compute clusters as part of the CI/CD pipelines
export DATABRICKS_SERVERLESS_COMPUTE_ID=auto
```

Run integration tests with the following command:
Expand All @@ -115,10 +127,41 @@ Calculate test coverage and display report in html:
```shell
make coverage
```
#### Using IDE

If you want to be able to run integration tests from your IDE, you must setup `.env` or `~/.databricks/debug-env.json` file
If you want to run integration tests from your IDE, you must setup `.env` or `~/.databricks/debug-env.json` file
(see [instructions](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture)).
The name of the debug environment that you define must be `ws`.
The name of the debug environment that you must define is `ws` (see `debug_env_name` fixture in the `conftest.py`).

**Minimal Configuration**

Create the `~/.databricks/debug-env.json` with the following content, replacing the placeholders:
```json
{
"ws": {
"DATABRICKS_TOKEN": "<PAT-token>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>"
}
}
```
You must provide an existing cluster, but it will be auto-started for you as part of the tests.

**Running Tests on Serverless Compute**

Integration tests are executed on both standard and serverless compute clusters as part of the CI/CD pipelines.
To run integration tests on serverless compute, add the `DATABRICKS_SERVERLESS_COMPUTE_ID` field to your debug configuration:
```json
{
"ws": {
"DATABRICKS_TOKEN": "<PAT-token>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>",
"DATABRICKS_SERVERLESS_COMPUTE_ID": "auto"
}
}
```
When `DATABRICKS_SERVERLESS_COMPUTE_ID` is set the `DATABRICKS_CLUSTER_ID` is ignored, and tests will run on serverless compute.

## Running CLI from the local repo

Expand Down Expand Up @@ -180,6 +223,13 @@ You can configure Git to sign all commits with your GPG key by default: `git con
request description to [automatically link it](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue)
to an existing issue.

If you have not signed your commits initially, you can re-apply all of them and sign as follows:
```shell
git reset --soft HEAD~<how-many-commit-go-back>
git commit -S --reuse-message=ORIG_HEAD
git push -f origin <remote-branch-name>
```

## Troubleshooting

If you encounter any package dependency errors after `git pull`, run `make clean`
10 changes: 9 additions & 1 deletion docs/dqx/docs/guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ dq_engine.save_checks_in_installation(checks, run_config_name="default")

# generate DLT expectations
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
print(dlt_expectations)
```

### Using CLI
Expand Down
59 changes: 57 additions & 2 deletions docs/dqx/docs/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ The following quality rules / functions are currently available:
| is_not_null_and_not_empty | Check if input column is not null or empty | col_name: column name to check; trim_strings: boolean flag to trim spaces from strings |
| value_is_in_list | Check if the provided value is present in the input column. | col_name: column name to check; allowed: list of allowed values |
| value_is_not_null_and_is_in_list | Check if provided value is present if the input column is not null | col_name: column name to check; allowed: list of allowed values |
| is_not_null_and_not_empty_array | Check if input array column is not null or empty | col_name: column name to check |
| is_in_range | Check if input column is in the provided range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit; max_limit: max limit |
| is_not_in_range | Check if input column is not within defined range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit value; max_limit: max limit value |
| not_less_than | Check if input column is not less than the provided limit | col_name: column name to check; limit: limit value |
| not_greater_than | Check if input column is not greater than the provided limit | col_name: column name to check; limit: limit value |
| is_valid_date | Check if input column is a valid date | col_name: column name to check; date_format: date format (e.g. 'yyyy-mm-dd') |
| is_valid_timestamp | Check if input column is a valid timestamp | col_name: column name to check; timestamp_format: timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') |
| not_in_future | Check if input column defined as date is not in the future (future defined as current_timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used |
| not_in_near_future | Check if input column defined as date is not in the near future (near future defined as grater than current timestamp but less than current timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used |
| is_older_than_n_days | Check if input column is older than n number of days | col_name: column name to check; days: number of days; curr_date: current date, if not provided current_date() function is used |
| is_older_than_col2_for_n_days | Check if one column is not older than another column by n number of days | col_name1: first column name to check; col_name2: second column name to check; days: number of days |
| regex_match | Check if input column matches a given regex | col_name: column name to check; regex: regex to check; negate: if the condition should be negated (true) or not |
| sql_expression | Check if input column is matches the provided sql expression, eg. a = 'str1', a > b | expression: sql expression to check; msg: optional message to output; name: optional name of the resulting column; negate: if the condition should be negated |
| is_not_null_and_not_empty_array | Check if input array column is not null or empty | col_name: column name to check |

You can check implementation details of the rules [here](https://github.com/databrickslabs/dqx/blob/main/src/databricks/labs/dqx/col_functions.py).

Expand Down Expand Up @@ -132,7 +134,7 @@ The following table outlines the available methods and their functionalities:
Testing applications that use DQEngine requires proper initialization of the Databricks workspace client. Detailed guidance on authentication for the workspace client is available [here](https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html#default-authentication-flow).

For testing, we recommend:
* [pytester fixtures](https://github.com/databrickslabs/pytester) to setup Databricks remote Spark session and workspace client. For pytester to be able to authenticate to a workspace you need to use [debug_env_name fixture](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture). We recommend using the `~/.databricks/debug-env.json` file to store different sets of environment variables.
* [pytester fixtures](https://github.com/databrickslabs/pytester) to setup Databricks remote Spark session and workspace client. For pytester to be able to authenticate to a workspace you need to use [debug_env_name fixture](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture). We recommend using the `~/.databricks/debug-env.json` file to store different sets of environment variables (see more details below).
* [chispa](https://github.com/MrPowers/chispa) for asserting Spark DataFrames.

These libraries are also used internally for testing DQX.
Expand Down Expand Up @@ -167,6 +169,59 @@ def test_dq(ws, spark): # use ws and spark pytester fixtures to initialize works
assert_df_equality(df, expected_df)
```

#### Setting up Databricks workspace client authentication in a terminal

If you want to run the tests from your local machine in the terminal, you need to set up the following environment variables:
```shell
export DATABRICKS_HOST=https://<workspace-url>
export DATABRICKS_CLUSTER_ID=<cluster-id>

# either use PAT token for authentication
export DATABRICKS_TOKEN=<pat-token>

# or use service principal OAuth token for authentication
export DATABRICKS_CLIENT_ID=<client-id>
export DATABRICKS_CLIENT_SECRET=<client-secret>

# Optionally enable serverless compute to be used for the tests
export DATABRICKS_SERVERLESS_COMPUTE_ID=auto
```

#### Setting up Databricks workspace client authentication in an IDE

If you want to run the tests from your IDE, you must setup `.env` or `~/.databricks/debug-env.json` file
(see [instructions](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture)).
The name of the debug environment that you must define is `ws` (see `debug_env_name` fixture in the example above).

**Minimal Configuration**

Create the `~/.databricks/debug-env.json` with the following content, replacing the placeholders:
```json
{
"ws": {
"DATABRICKS_TOKEN": "<PAT-token>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>"
}
}
```
You must provide an existing cluster, but it will be auto-started for you as part of the tests.

**Running Tests on Serverless Compute**

To run the integration tests on serverless compute, add the `DATABRICKS_SERVERLESS_COMPUTE_ID` field to your debug configuration:
```json
{
"ws": {
"DATABRICKS_TOKEN": "<PAT-token>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>",
"DATABRICKS_SERVERLESS_COMPUTE_ID": "auto"
}
}
```
When `DATABRICKS_SERVERLESS_COMPUTE_ID` is set the `DATABRICKS_CLUSTER_ID` is ignored, and tests will run on serverless compute.

### Local testing with DQEngine

If workspace-level access is unavailable in your unit testing environment, you can perform local testing by installing the latest `pyspark` package and mocking the workspace client.
Expand Down
Binary file modified docs/dqx/static/img/dqx_lakehouse.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ path = "src/databricks/labs/dqx/__about__.py"
[tool.hatch.envs.default]
dependencies = [
"black~=24.3.0",
"chispa~=0.9.4",
"chispa~=0.10.1",
"coverage[toml]~=7.4.4",
"databricks-labs-pylint~=0.5",
"databricks-labs-pytester~=0.3.1",
"databricks-labs-pytester~=0.6.0",
"mypy~=1.9.0",
"pylint~=3.3.1",
"pylint-pytest==2.0.0a0",
Expand All @@ -69,7 +69,7 @@ dependencies = [
"ruff~=0.3.4",
"types-PyYAML~=6.0.12",
"types-requests~=2.31.0",
"databricks-connect~=15.4.3"
"databricks-connect~=15.4",
]

python="3.10"
Expand Down
13 changes: 9 additions & 4 deletions src/databricks/labs/dqx/col_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def is_not_null_and_not_empty(col_name: str, trim_strings: bool = False) -> Colu
column = F.col(col_name)
if trim_strings:
column = F.trim(column).alias(col_name)
condition = column.isNull() | (column == "") | (column == "null")
condition = column.isNull() | (column.try_cast("string") == F.lit(""))
return make_condition(condition, f"Column {col_name} is null or empty", f"{col_name}_is_null_or_empty")


Expand All @@ -48,6 +48,7 @@ def is_not_empty(col_name: str) -> Column:
:return: Column object for condition
"""
column = F.col(col_name)
column = column.try_cast("string")
return make_condition((column == ""), f"Column {col_name} is empty", f"{col_name}_is_empty")


Expand Down Expand Up @@ -76,7 +77,7 @@ def value_is_not_null_and_is_in_list(col_name: str, allowed: list) -> Column:
F.concat_ws(
"",
F.lit("Value "),
F.when(column.isNull(), F.lit("null")).otherwise(column),
F.when(column.isNull(), F.lit("null")).otherwise(column.try_cast("string")),
F.lit(" is not in the allowed list: ["),
F.concat_ws(", ", *allowed_cols),
F.lit("]"),
Expand Down Expand Up @@ -381,7 +382,7 @@ def is_valid_date(col_name: str, date_format: str | None = None) -> Column:
:return: Column object for condition
"""
str_col = F.col(col_name)
date_col = F.to_date(str_col) if date_format is None else F.to_date(str_col, date_format)
date_col = str_col.try_cast("date") if date_format is None else F.try_to_timestamp(str_col, F.lit(date_format))
condition = F.when(str_col.isNull(), F.lit(None)).otherwise(date_col.isNull())
condition_str = "' is not a valid date"
if date_format is not None:
Expand All @@ -401,7 +402,11 @@ def is_valid_timestamp(col_name: str, timestamp_format: str | None = None) -> Co
:return: Column object for condition
"""
str_col = F.col(col_name)
ts_col = F.to_timestamp(str_col) if timestamp_format is None else F.to_timestamp(str_col, timestamp_format)
ts_col = (
str_col.try_cast("timestamp")
if timestamp_format is None
else F.try_to_timestamp(str_col, F.lit(timestamp_format))
)
condition = F.when(str_col.isNull(), F.lit(None)).otherwise(ts_col.isNull())
condition_str = "' is not a valid timestamp"
if timestamp_format is not None:
Expand Down
Loading