Skip to content

Commit

Permalink
Time Series QA: Make Dask notebook self-contained and testable
Browse files Browse the repository at this point in the history
Include data acquisition from Kaggle.
  • Loading branch information
amotl committed Mar 21, 2024
1 parent f787654 commit fb04467
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 19 deletions.
74 changes: 56 additions & 18 deletions topic/timeseries/dask-weather-data-import.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,41 @@
"The following data sets need to be processed:\n",
"- Daily weather data (daily_weather.parquet)\n",
"- Cities (cities.csv)\n",
"- Countries (countries.csv)"
"- Countries (countries.csv)\n",
"\n",
"The subsequent code acquires the dataset directly from kaggle.com.\n",
"To properly configure the notebook to use corresponding credentials\n",
"after signing up on Kaggle, define the `KAGGLE_USERNAME` and\n",
"`KAGGLE_KEY` environment variables. Alternatively, put them into the\n",
"file `~/.kaggle/kaggle.json` in your home folder, like this:\n",
"```json\n",
"{\n",
" \"username\": \"acme\",\n",
" \"key\": \"2b1dac2af55caaf1f34df76236fada4a\"\n",
"}\n",
"```\n",
"Another variant is to acquire the dataset files manually, and extract\n",
"them into a folder called `DOWNLOAD`. In this case, you can deactivate\n",
"those two lines of code, in order to skip automatic dataset acquisition."
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"from cratedb_toolkit.datasets import load_dataset\n",
"\n",
"dataset = load_dataset(\"kaggle://guillemservera/global-daily-climate-data/daily_weather.parquet\")\n",
"dataset.acquire()"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 88,
"id": "fa24e753",
"metadata": {},
"outputs": [],
"source": [
"from dask import dataframe as dd\n",
Expand All @@ -88,7 +115,10 @@
"# Show a progress bar for dask activities\n",
"pbar = ProgressBar()\n",
"pbar.register()"
]
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
Expand Down Expand Up @@ -288,7 +318,7 @@
],
"source": [
"# Load the parquet file. Please adjust the file path as needed.\n",
"df_kaggle = dd.read_parquet('DOWNLOAD_PATH/daily_weather.parquet')\n",
"df_kaggle = dd.read_parquet('DOWNLOAD/daily_weather.parquet')\n",
"\n",
"# Show info about the data.\n",
"df_kaggle.info(verbose=True, memory_usage=True)\n",
Expand Down Expand Up @@ -421,7 +451,7 @@
],
"source": [
"# Read cities, adapt the path to the files accordingly\n",
"cities = dd.read_csv(\"DOWNLOAD_PATH/cities.csv\",dtype={'station_id': 'object'})\n",
"cities = dd.read_csv(\"DOWNLOAD/cities.csv\",dtype={'station_id': 'object'})\n",
"\n",
"# Modify lon and lat of cities into an array that can be interpreted directly by CrateDB\n",
"def create_location_column(df):\n",
Expand All @@ -442,7 +472,7 @@
"outputs": [],
"source": [
"# Read countries, adapt the path to the files accordingly\n",
"countries = dd.read_csv(\"DOWNLOAD_PATH/countries.csv\")"
"countries = dd.read_csv(\"DOWNLOAD/countries.csv\")"
]
},
{
Expand Down Expand Up @@ -476,17 +506,25 @@
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import sqlalchemy as sa\n",
"from crate.client.sqlalchemy.support import insert_bulk\n",
"\n",
"# Connect to CrateDB\n",
"# For a database running in the cloud, please use a connection string like this:\n",
"dburi = 'crate://USER:PASSWORD@HOST:4200?ssl=true'\n",
"# Define database address when using CrateDB Cloud.\n",
"# Please find these settings on your cluster overview page.\n",
"CONNECTION_STRING = os.environ.get(\n",
" \"CRATEDB_CONNECTION_STRING\",\n",
" \"crate://<USER>:<PASSWORD>@<CRATEDB_HOST>/?ssl=true\",\n",
")\n",
"\n",
"# For a database running locally, please use the following connection string:\n",
"# dburi = 'crate://localhost:4200?ssl=false'\n",
"# Define database address when using CrateDB on localhost.\n",
"#CONNECTION_STRING = os.environ.get(\n",
"# \"CRATEDB_CONNECTION_STRING\",\n",
"# \"crate://crate@localhost/\",\n",
"#)\n",
"\n",
"engine = sa.create_engine(dburi, echo=False)\n",
"# Connect to CrateDB using SQLAlchemy.\n",
"engine = sa.create_engine(CONNECTION_STRING, echo=False)\n",
"connection = engine.connect()"
]
},
Expand Down Expand Up @@ -520,7 +558,7 @@
],
"source": [
"connection.execute(sa.text(\"\"\"\n",
"CREATE TABLE IF NOT EXISTS \"doc\".\"weather_data\" (\n",
"CREATE TABLE IF NOT EXISTS \"weather_data\" (\n",
" \"station_id\" TEXT,\n",
" \"city_name\" TEXT,\n",
" \"date\" TIMESTAMP WITHOUT TIME ZONE,\n",
Expand Down Expand Up @@ -567,7 +605,7 @@
],
"source": [
"connection.execute(sa.text(\"\"\"\n",
"CREATE TABLE \"doc\".\"cities\" (\n",
"CREATE TABLE \"cities\" (\n",
" \"station_id\" TEXT,\n",
" \"city_name\" TEXT,\n",
" \"country\" TEXT,\n",
Expand Down Expand Up @@ -626,7 +664,7 @@
"# Uncomment the following lines to process the actual weather data.\n",
"# They have been disabled in order to avoid long-running operations.\n",
"# df_kaggle = df_kaggle.repartition(26)\n",
"# df_kaggle.to_sql(name='weather_data', uri=dburi, schema='doc', if_exists='append', \n",
"# df_kaggle.to_sql(name='weather_data', uri=dburi, if_exists='append',\n",
"# index=False, chunksize=10000, parallel=True, method=insert_bulk)"
]
},
Expand Down Expand Up @@ -659,7 +697,7 @@
}
],
"source": [
"countries.to_sql('countries', dburi, schema='doc', if_exists='append', \n",
"countries.to_sql('countries', CONNECTION_STRING, if_exists='append',\n",
" index=False, chunksize=1000, parallel=True, method=insert_bulk)"
]
},
Expand Down Expand Up @@ -692,7 +730,7 @@
}
],
"source": [
"cities.to_sql('cities', dburi, schema='doc', if_exists='append', \n",
"cities.to_sql('cities', CONNECTION_STRING, if_exists='append',\n",
" index=False, chunksize=1000, parallel=True, method=insert_bulk)"
]
}
Expand Down
13 changes: 12 additions & 1 deletion topic/timeseries/test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
from pathlib import Path

import pytest
from testbook import testbook

Expand All @@ -7,6 +10,14 @@ def test_notebook(notebook):
Execute Jupyter Notebook, one test case per .ipynb file.
"""
if notebook.name == "dask-weather-data-import.ipynb":
raise pytest.skip("Depends on DOWNLOAD_PATH/daily_weather.parquet")

# Skip Kaggle tests when having no authentication information.
kaggle_auth_exists = Path("~/.kaggle/kaggle.json").exists() or (
"KAGGLE_USERNAME" in os.environ and "KAGGLE_KEY" in os.environ
)
if not kaggle_auth_exists:
raise pytest.skip(f"Kaggle dataset can not be tested "
f"without authentication: {notebook.name}")

with testbook(notebook) as tb:
tb.execute()

0 comments on commit fb04467

Please sign in to comment.