-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Dataframes: Refactor examples about pandas and Dask
- Loading branch information
Showing
22 changed files
with
563 additions
and
140 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
name: Dask | ||
|
||
on: | ||
pull_request: | ||
branches: ~ | ||
paths: | ||
- '.github/workflows/df-dask.yml' | ||
- 'by-dataframe/dask/**' | ||
- 'requirements.txt' | ||
push: | ||
branches: [ main ] | ||
paths: | ||
- '.github/workflows/df-dask.yml' | ||
- 'by-dataframe/dask/**' | ||
- 'requirements.txt' | ||
|
||
# Allow job to be triggered manually. | ||
workflow_dispatch: | ||
|
||
# Run job each night after CrateDB nightly has been published. | ||
schedule: | ||
- cron: '0 3 * * *' | ||
|
||
# Cancel in-progress jobs when pushing to the same branch. | ||
concurrency: | ||
cancel-in-progress: true | ||
group: ${{ github.workflow }}-${{ github.ref }} | ||
|
||
jobs: | ||
test: | ||
name: " | ||
Python: ${{ matrix.python-version }} | ||
CrateDB: ${{ matrix.cratedb-version }} | ||
on ${{ matrix.os }}" | ||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
os: [ 'ubuntu-latest' ] | ||
python-version: [ '3.11', '3.12' ] | ||
cratedb-version: [ 'nightly' ] | ||
|
||
services: | ||
cratedb: | ||
image: crate/crate:${{ matrix.cratedb-version }} | ||
ports: | ||
- 4200:4200 | ||
- 5432:5432 | ||
env: | ||
CRATE_HEAP_SIZE: 4g | ||
|
||
steps: | ||
|
||
- name: Acquire sources | ||
uses: actions/checkout@v4 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
architecture: x64 | ||
cache: 'pip' | ||
cache-dependency-path: | | ||
requirements.txt | ||
by-dataframe/dask/requirements.txt | ||
by-dataframe/dask/requirements-dev.txt | ||
- name: Install utilities | ||
run: | | ||
pip install -r requirements.txt | ||
- name: Validate by-dataframe/dask | ||
run: | | ||
ngr test --accept-no-venv by-dataframe/dask |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
name: pandas | ||
|
||
on: | ||
pull_request: | ||
branches: ~ | ||
paths: | ||
- '.github/workflows/df-pandas.yml' | ||
- 'by-dataframe/pandas/**' | ||
- 'requirements.txt' | ||
push: | ||
branches: [ main ] | ||
paths: | ||
- '.github/workflows/df-pandas.yml' | ||
- 'by-dataframe/pandas/**' | ||
- 'requirements.txt' | ||
|
||
# Allow job to be triggered manually. | ||
workflow_dispatch: | ||
|
||
# Run job each night after CrateDB nightly has been published. | ||
schedule: | ||
- cron: '0 3 * * *' | ||
|
||
# Cancel in-progress jobs when pushing to the same branch. | ||
concurrency: | ||
cancel-in-progress: true | ||
group: ${{ github.workflow }}-${{ github.ref }} | ||
|
||
jobs: | ||
test: | ||
name: " | ||
Python: ${{ matrix.python-version }} | ||
CrateDB: ${{ matrix.cratedb-version }} | ||
on ${{ matrix.os }}" | ||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
os: [ 'ubuntu-latest' ] | ||
python-version: [ '3.11', '3.12' ] | ||
cratedb-version: [ 'nightly' ] | ||
|
||
services: | ||
cratedb: | ||
image: crate/crate:${{ matrix.cratedb-version }} | ||
ports: | ||
- 4200:4200 | ||
- 5432:5432 | ||
env: | ||
CRATE_HEAP_SIZE: 4g | ||
|
||
steps: | ||
|
||
- name: Acquire sources | ||
uses: actions/checkout@v4 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
architecture: x64 | ||
cache: 'pip' | ||
cache-dependency-path: | | ||
requirements.txt | ||
by-dataframe/pandas/requirements.txt | ||
by-dataframe/pandas/requirements-dev.txt | ||
- name: Install utilities | ||
run: | | ||
pip install -r requirements.txt | ||
- name: Validate by-dataframe/pandas | ||
run: | | ||
ngr test --accept-no-venv by-dataframe/pandas |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
.idea | ||
.venv* | ||
__pycache__ | ||
.coverage | ||
.coverage* | ||
.DS_Store | ||
coverage.xml | ||
mlruns/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
# Connect to CrateDB and CrateDB Cloud using pandas | ||
|
||
|
||
## About | ||
Example programs demonstrating connectivity with [Dask] and [CrateDB]. | ||
|
||
This section and examples are mostly about [DataFrame operations with SQLAlchemy], | ||
specifically about how to insert data into [CrateDB] efficiently. | ||
|
||
|
||
## Usage | ||
|
||
The CrateDB Python driver provides a convenience function `insert_bulk`. It | ||
can be used like this: | ||
|
||
```python | ||
# CrateDB Cloud | ||
# DBURI = "crate://admin:<PASSWORD>@<CLUSTERNAME>.aks1.westeurope.azure.cratedb.net:4200?ssl=true" | ||
|
||
# CrateDB Self-Managed | ||
# DBURI = "crate://crate@localhost:4200/" | ||
|
||
import sqlalchemy as sa | ||
from crate.client.sqlalchemy.support import insert_bulk | ||
|
||
ddf.to_sql( | ||
"testdrive", | ||
uri=DBURI, | ||
index=False, | ||
if_exists="replace", | ||
chunksize=10_000, | ||
parallel=True, | ||
method=insert_bulk, | ||
) | ||
``` | ||
|
||
|
||
## Setup | ||
|
||
To start a CrateDB instance on your machine, invoke: | ||
```shell | ||
docker run -it --rm \ | ||
--publish=4200:4200 --publish=5432:5432 \ | ||
--env=CRATE_HEAP_SIZE=4g \ | ||
crate:latest -Cdiscovery.type=single-node | ||
``` | ||
|
||
Acquire `cratedb-example` repository, and set up sandbox: | ||
```shell | ||
git clone https://github.com/crate/cratedb-examples | ||
cd cratedb-examples | ||
python3 -m venv .venv | ||
source .venv/bin/activate | ||
``` | ||
|
||
Then, invoke the integration test cases: | ||
```shell | ||
ngr test by-dataframe/dask | ||
``` | ||
|
||
|
||
## Examples | ||
The `insert` example programs are about efficient data loading: | ||
```shell | ||
time python insert_dask.py | ||
time python insert_dask.py --mode=basic | ||
time python insert_dask.py --mode=bulk --bulk-size=20000 --num-records=75000 | ||
``` | ||
|
||
|
||
## Connect to CrateDB Cloud | ||
|
||
By default, the example programs will connect to CrateDB on `localhost`. | ||
In order to connect to any other database instance, for example to [CrateDB | ||
Cloud]: | ||
|
||
```shell | ||
export DBURI="crate://crate@localhost:4200/" | ||
export DBURI="crate://admin:<PASSWORD>@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true" | ||
time python insert_dask.py --dburi="${DBURI}" | ||
``` | ||
|
||
```{tip} | ||
For more information, please refer to the header sections of each of the | ||
provided example programs. | ||
``` | ||
|
||
|
||
## Tests | ||
|
||
To test the accompanied example programs all at once, invoke the software tests: | ||
```shell | ||
pytest | ||
``` | ||
|
||
|
||
[CrateDB]: https://github.com/crate/crate | ||
[CrateDB Cloud]: https://console.cratedb.cloud/ | ||
[Dask]: https://www.dask.org/ | ||
[DataFrame operations with SQLAlchemy]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
""" | ||
About | ||
===== | ||
Evaluate saving Dask DataFrames into CrateDB. | ||
Usage | ||
===== | ||
:: | ||
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate | ||
pip install --upgrade 'crate[sqlalchemy]' dask pandas | ||
python insert_dask.py | ||
""" | ||
import logging | ||
|
||
import click | ||
import colorlog | ||
import dask.dataframe as dd | ||
import sqlalchemy as sa | ||
from colorlog.escape_codes import escape_codes | ||
from crate.client.sqlalchemy.support import insert_bulk | ||
from dask.diagnostics import ProgressBar | ||
from pueblo.testing.pandas import makeTimeDataFrame | ||
from pueblo.util.logging import setup_logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
SQLALCHEMY_LOGGING = True | ||
|
||
|
||
def db_workload(dburi: str, mode: str, num_records: int, bulk_size: int): | ||
pbar = ProgressBar() | ||
pbar.register() | ||
|
||
# Create example Dask DataFrame for testing purposes. | ||
df = makeTimeDataFrame(nper=num_records, freq="S") | ||
ddf = dd.from_pandas(df, npartitions=4) | ||
|
||
# Save DataFrame into CrateDB efficiently. | ||
|
||
# Works. Takes ~3 seconds. | ||
if mode == "basic": | ||
ddf.to_sql("testdrive", uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True) | ||
|
||
# Works. Takes ~10 seconds. | ||
elif mode == "multi": | ||
ddf.to_sql("testdrive", uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method="multi") | ||
|
||
# Works. Takes ~2 seconds. | ||
elif mode == "bulk": | ||
ddf.to_sql("testdrive", uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method=insert_bulk) | ||
|
||
|
||
@click.command() | ||
@click.option("--dburi", type=str, default="crate://localhost:4200", required=False, help="SQLAlchemy database connection URI.") | ||
@click.option("--mode", type=click.Choice(['basic', 'multi', 'bulk']), default="bulk", required=False, help="Insert mode.") | ||
@click.option("--num-records", type=int, default=125_000, required=False, help="Number of records to insert.") | ||
@click.option("--bulk-size", type=int, default=10_000, required=False, help="Bulk size / chunk size.") | ||
@click.help_option() | ||
def main(dburi: str, mode: str, num_records: int, bulk_size: int): | ||
setup_logging() | ||
db_workload(dburi=dburi, mode=mode, num_records=num_records, bulk_size=bulk_size) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
[tool.pytest.ini_options] | ||
minversion = "2.0" | ||
addopts = """ | ||
-rfEX -p pytester --strict-markers --verbosity=3 | ||
--cov --cov-report=term-missing | ||
--capture=no | ||
""" | ||
log_level = "DEBUG" | ||
log_cli_level = "DEBUG" | ||
testpaths = ["*.py"] | ||
xfail_strict = true | ||
markers = [ | ||
] | ||
|
||
|
||
[tool.coverage.run] | ||
branch = false | ||
omit = [ | ||
"test*", | ||
] | ||
|
||
[tool.coverage.report] | ||
fail_under = 0 | ||
show_missing = true |
Oops, something went wrong.