Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframes: Refactor examples about pandas and Dask #265

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ updates:

# Languages.

- directory: "/by-dataframe/dask"
package-ecosystem: "pip"
schedule:
interval: "weekly"

- directory: "/by-dataframe/pandas"
package-ecosystem: "pip"
schedule:
interval: "weekly"

- directory: "/by-language/csharp-npgsql"
package-ecosystem: "nuget"
schedule:
Expand Down
74 changes: 74 additions & 0 deletions .github/workflows/df-dask.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: Dask

on:
pull_request:
branches: ~
paths:
- '.github/workflows/df-dask.yml'
- 'by-dataframe/dask/**'
- 'requirements.txt'
push:
branches: [ main ]
paths:
- '.github/workflows/df-dask.yml'
- 'by-dataframe/dask/**'
- 'requirements.txt'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:
test:
name: "
Python: ${{ matrix.python-version }}
CrateDB: ${{ matrix.cratedb-version }}
on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
python-version: [ '3.11', '3.12' ]
cratedb-version: [ 'nightly' ]

services:
cratedb:
image: crate/crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: |
requirements.txt
by-dataframe/dask/requirements.txt
by-dataframe/dask/requirements-dev.txt

- name: Install utilities
run: |
pip install -r requirements.txt

- name: Validate by-dataframe/dask
run: |
ngr test --accept-no-venv by-dataframe/dask
74 changes: 74 additions & 0 deletions .github/workflows/df-pandas.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: pandas

on:
pull_request:
branches: ~
paths:
- '.github/workflows/df-pandas.yml'
- 'by-dataframe/pandas/**'
- 'requirements.txt'
push:
branches: [ main ]
paths:
- '.github/workflows/df-pandas.yml'
- 'by-dataframe/pandas/**'
- 'requirements.txt'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:
test:
name: "
Python: ${{ matrix.python-version }}
CrateDB: ${{ matrix.cratedb-version }}
on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
python-version: [ '3.11', '3.12' ]
cratedb-version: [ 'nightly' ]

services:
cratedb:
image: crate/crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: |
requirements.txt
by-dataframe/pandas/requirements.txt
by-dataframe/pandas/requirements-dev.txt

- name: Install utilities
run: |
pip install -r requirements.txt

- name: Validate by-dataframe/pandas
run: |
ngr test --accept-no-venv by-dataframe/pandas
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
name: Python SQLAlchemy
name: SQLAlchemy

on:
pull_request:
branches: ~
paths:
- '.github/workflows/test-python-sqlalchemy.yml'
- '.github/workflows/sqlalchemy.yml'
- 'by-language/python-sqlalchemy/**'
- 'requirements.txt'
push:
branches: [ main ]
paths:
- '.github/workflows/test-python-sqlalchemy.yml'
- '.github/workflows/sqlalchemy.yml'
- 'by-language/python-sqlalchemy/**'
- 'requirements.txt'

Expand Down Expand Up @@ -46,6 +46,8 @@ jobs:
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.idea
.venv*
__pycache__
.coverage
.coverage*
.DS_Store
coverage.xml
mlruns/
Expand Down
101 changes: 101 additions & 0 deletions by-dataframe/dask/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Connect to CrateDB and CrateDB Cloud using pandas


## About
Example programs demonstrating connectivity with [Dask] and [CrateDB].

This section and examples are mostly about [DataFrame operations with SQLAlchemy],
specifically about how to insert data into [CrateDB] efficiently.


## Usage

The CrateDB Python driver provides a convenience function `insert_bulk`,
which allows you to efficiently insert multiple rows of data into a CrateDB
database table in a single operation. It can be used like this:

```python
# CrateDB Cloud
# DBURI = "crate://admin:<PASSWORD>@<CLUSTERNAME>.aks1.westeurope.azure.cratedb.net:4200?ssl=true"

# CrateDB Self-Managed
# DBURI = "crate://crate@localhost:4200/"

import sqlalchemy as sa
from crate.client.sqlalchemy.support import insert_bulk

ddf.to_sql(
"testdrive",
uri=DBURI,
index=False,
if_exists="replace",
chunksize=10_000,
parallel=True,
method=insert_bulk,
)
```


## Setup

To start a CrateDB instance on your machine, invoke:
```shell
docker run -it --rm \
--publish=4200:4200 --publish=5432:5432 \
--env=CRATE_HEAP_SIZE=4g \
crate:latest -Cdiscovery.type=single-node
```

Acquire `cratedb-example` repository, and set up sandbox:
```shell
git clone https://github.com/crate/cratedb-examples
cd cratedb-examples
python3 -m venv .venv
source .venv/bin/activate
```

Then, invoke the integration test cases:
```shell
ngr test by-dataframe/dask
```


## Examples
The `insert` example programs are about efficient data loading:
```shell
time python insert_dask.py
time python insert_dask.py --mode=basic
time python insert_dask.py --mode=bulk --bulk-size=20000 --num-records=75000
```


## Connect to CrateDB Cloud

By default, the example programs will connect to CrateDB on `localhost`.
In order to connect to any other database instance, for example to [CrateDB
Cloud]:

```shell
export DBURI="crate://crate@localhost:4200/"
export DBURI="crate://admin:<PASSWORD>@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true"
time python insert_dask.py --dburi="${DBURI}"
```

```{tip}
For more information, please refer to the header sections of each of the
provided example programs.
```


## Tests

To test the accompanied example programs all at once, invoke the software tests:
```shell
pytest
```


[CrateDB]: https://github.com/crate/crate
[CrateDB Cloud]: https://console.cratedb.cloud/
[Dask]: https://www.dask.org/
[DataFrame operations with SQLAlchemy]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html
71 changes: 71 additions & 0 deletions by-dataframe/dask/insert_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
About
=====

Evaluate inserting data from Dask dataframes into CrateDB.

Setup
=====
::

pip install --upgrade click colorlog 'crate[sqlalchemy]' pandas

Synopsis
========
::

docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate:latest
python insert_dask.py
"""
import logging

import click
import dask.dataframe as dd
import sqlalchemy as sa
from crate.client.sqlalchemy.support import insert_bulk
from dask.diagnostics import ProgressBar
from pueblo.testing.pandas import makeTimeDataFrame
from pueblo.util.logging import setup_logging

logger = logging.getLogger(__name__)

SQLALCHEMY_LOGGING = True
TABLE_NAME = "testdrive_dask"


def db_workload(dburi: str, mode: str, num_records: int, bulk_size: int):
pbar = ProgressBar()
pbar.register()

# Create example Dask DataFrame for testing purposes.
df = makeTimeDataFrame(nper=num_records, freq="S")
ddf = dd.from_pandas(df, npartitions=4)

# Save DataFrame into CrateDB efficiently.

# Works. Takes ~3 seconds.
if mode == "basic":
ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True)

# Works. Takes ~10 seconds.
elif mode == "multi":
ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method="multi")

# Works. Takes ~2 seconds.
elif mode == "bulk":
ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method=insert_bulk)


@click.command()
@click.option("--dburi", type=str, default="crate://localhost:4200", required=False, help="SQLAlchemy database connection URI.")
@click.option("--mode", type=click.Choice(['basic', 'multi', 'bulk']), default="bulk", required=False, help="Insert mode.")
@click.option("--num-records", type=int, default=125_000, required=False, help="Number of records to insert.")
@click.option("--bulk-size", type=int, default=10_000, required=False, help="Bulk size / chunk size.")
@click.help_option()
def main(dburi: str, mode: str, num_records: int, bulk_size: int):
setup_logging()
db_workload(dburi=dburi, mode=mode, num_records=num_records, bulk_size=bulk_size)


if __name__ == "__main__":
main()
Loading