Skip to content

Commit

Permalink
Merge pull request #140 from catalyst-cooperative/s3-pipeline
Browse files Browse the repository at this point in the history
Create a Dagster ETL pipeline for S3 usage metrics
  • Loading branch information
e-belfer authored Aug 15, 2024
2 parents d5a2c33 + 79ea5fe commit 86cffbf
Show file tree
Hide file tree
Showing 25 changed files with 683 additions and 107 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/tox-pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ jobs:
with:
version: ">= 363.0.0"

- name: "Print GCloud config"
run: "gcloud info"

- name: Run tox
env:
IPINFO_TOKEN: ${{ secrets.IPINFO_TOKEN }}
Expand Down
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ notebooks/.ipynb_checkpoints
# Exclude everything in dagster_home except the dagster.yaml config file.
dagster_home/*
!/dagster_home/dagster.yaml

# Ignore results of tox runs
.coverage
.tox/
coverage.xml
node_modules/
44 changes: 20 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This is the project structure generated by the [dagster cli](https://docs.dagste
| `README.md` | A description and guide for this code repository |
| `workspace.yaml` | A file that specifies the location of the user code for Dagit and the Dagster CLI |
| `src/usage_metrics/` | A Python directory that contains code for your Dagster repository |
| `usage_metrics_tests/` | A Python directory that contains tests for `usage_metrics` |
| `tests/` | A Python directory that contains tests for `usage_metrics` |
| `setup.py` | A build script with Python package dependencies for this code repository |

# Setup
Expand All @@ -30,6 +30,8 @@ conda activate pudl-usage-metrics

The ETL uses [ipinfo](https://ipinfo.io/) to geocode ip addresses. You need to obtain an ipinfo API token and store it in the `IPINFO_TOKEN` environment variable.

If you want to take advantage of caching raw logs, rather than redownloading them for each run, you can set the optional ``DATA_DIR`` environment variable. If this is not set, the script will save files to a temporary directory by default. This is highly recommended to avoid unnecessary egress charges.

Dagster stores run logs and caches in a directory stored in the `DAGSTER_HOME` environment variable. The `usage_metrics/dagster_home/dagster.yaml` file contains configuration for the dagster instance. **Note:** The `usage_metrics/dagster_home/storage` directory could grow to become a couple GBs because all op outputs for every run are stored there. You can read more about the dagster_home directory in the [dagster docs](https://docs.dagster.io/deployment/dagster-instance#default-local-behavior).

To set these environment variables, run these commands:
Expand All @@ -38,6 +40,7 @@ To set these environment variables, run these commands:
conda activate pudl-usage-metrics
conda env config vars set IPINFO_TOKEN="{your_api_key_here}"
conda env config vars set DAGSTER_HOME="$(pwd)/dagster_home/"
conda env config vars set DATA_DIR="$(pwd)/data/"
conda activate pudl-usage-metrics
```

Expand Down Expand Up @@ -67,49 +70,42 @@ The scripts that run are configured in the .pre-commit-config.yaml file.

Now the environment is all set up and we can start up dagster!

### Dagster Daemon

In one terminal window start the dagster-daemon by running these commands:
## Set some global Dagster configs

In your ``DAGSTER_HOME`` folder, add a ``dagster.yaml`` file or edit your existing one to contain the following code:
```
conda activate pudl-usage-metrics
dagster-daemon run
run_queue:
max_concurrent_runs: 1
```

The [dagster-daemon](https://docs.dagster.io/deployment/dagster-daemon) is a long-running service required for schedules, sensors and run queueing. The usage metrics ETL requires the daemon because the data is processed in partitions. Dagster kicks off individual runs for each [partition](https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions) which are sent to a queue managed by the dagster-daemon.
When running backfills, this prevents you from kicking off 80 concurrent runs that will at worst crash your drive and best create SQL database errors.

### Dagit
### Dagster Daemon

In another terminal window, start the [dagit UI](https://docs.dagster.io/concepts/dagit/dagit) by running these commands:
In one terminal window start the dagster-daemon and UI by running these commands:

```
conda activate pudl-usage-metrics
dagit
dagster dev -m usage_metrics.etl
```

This will launch dagit at [`http://localhost:3000/`](http://localhost:3000/). If you have another service running on port 3000 you can change the port by running:
The [dagster-webserver](https://docs.dagster.io/concepts/webserver/ui) is a long-running service required for schedules, sensors and run queueing. The usage metrics ETL requires the daemon because the data is processed in partitions. Dagster kicks off individual runs for each [partition](https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions) which are sent to a queue managed by the dagster-daemon.

This command simultaneously starts the dagit UI. This will launch dagit at [`http://localhost:3000/`](http://localhost:3000/). If you have another service running on port 3000 you can change the port by running:

```
dagit -p {another_cool_port}
dagster dev -m usage_metrics.etl -p {another_cool_port}
```

Dagit allows you to kick off [`backfills`](https://docs.dagster.io/concepts/partitions-schedules-sensors/backfills) and run partitions with specific configuration.
Dagster allows you to kick off [`backfills`](https://docs.dagster.io/concepts/partitions-schedules-sensors/backfills) and run partitions with specific configuration.

## Run the ETL

There is a module in the `usage_metrics/jobs` sub package for each datasource (e.g datasette logs, github metrics…) Each job module contains one graph of ops that extracts, transforms and loads the data. Two jobs are created for each graph, one job loads data to a local sqlite database for development and the other job loads data to a Google Cloud SQL Postgres database for a Preset dashboard to access.
There is a job in the `usage_metrics/etl` sub package for each datasource (e.g datasette logs, github metrics…). Each job module contains a series of assets that extract, transform and load the data. When run locally, the IO manager will load data to a local sqlite database for development. When run on Github actions, the IO manager will load data to a Google Cloud SQL Postgres database for a Superset dashboard to access.

You can run the ETL via the dagit UI or the [dagster CLI](https://docs.dagster.io/_apidocs/cli).

### CLI

To run a complete backfill for a job, run:

```
dagster job backfill --all {YOUR_JOB_NAME}
```

### Dagit UI
### Backfills

To run a a complete backfill from the Dagit UI go to the job's partitions tab. Then click on the "Launch Backfill" button in the upper left corner of the window. This should bring up a new window with a list of partitions. Click "Select All" and then click the "Submit" button. This will submit a run for each partition. You can follow the runs on the ["Runs" tab](http://localhost:3000/instance/runs).

Expand Down Expand Up @@ -151,4 +147,4 @@ The ETL uses [ipinfo](https://ipinfo.io/) for geocoding the user ip addresses wh

## Add new data sources

To add a new data source to the dagster repo, add new modules to the `usage_metrics/jobs/` and `usage_metrics/ops/` directories and create jobs that use the `SQLite` and `PostgresManager`. Then, create a new dagster repository in the repository module that contains the dataset jobs. Once the dataset has been tested locally, run a complete backfill for the job that uses the `PostgresManager` to populate the Cloud SQL database.
To add a new data source to the dagster repo, add new modules to the `raw` and `core` and `out` directories and add these modules to the corresponding jobs. Once the dataset has been tested locally, run a complete backfill for the job that uses the `PostgresManager` to populate the Cloud SQL database.
9 changes: 2 additions & 7 deletions dagster_home/dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,5 @@ python_logs:
managed_python_loggers:
- root
python_log_level: DEBUG
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
# TODO: Can I increase the concurrent runs with SQLite?
# Does the context manager and sqlalchemy engine prevent issues?
max_concurrent_runs: 1
run_queue:
max_concurrent_runs: 1
80 changes: 80 additions & 0 deletions notebooks/inspect-assets.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "c07231ee-a317-405b-9aec-56d5131ffb0d",
"metadata": {},
"source": [
"# Inspecting dagster assets\n",
"This notebooks allows you to inspect dagster asset values. **This is just a template notebook. Do your asset explorations in a copy of this notebook.** \n",
"\n",
"Some assets are written to the database in which case you can just pull the tables into pandas or explore them in the database. However, many assets use the default IO Manager which writes asset values to the `$DAGSTER_HOME/storage/` directory as pickle files. Dagster provides a method for inspecting asset values no matter what IO Manager the asset uses."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "de97d7ba-22f7-433e-9f2f-0b9df8b64fc7",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import os\n",
"\n",
"assert os.environ.get(\"DAGSTER_HOME\"), (\n",
" \"The DAGSTER_HOME env var is not set so dagster won't be able to find the assets.\"\n",
" \"Set the DAGSTER_HOME env var in this notebook or kill the jupyter server and set\"\n",
" \" the DAGSTER_HOME env var in your terminal and relaunch jupyter.\"\n",
")"
]
},
{
"cell_type": "markdown",
"id": "c54503cc-19a2-4cd0-8724-f371eebf54e4",
"metadata": {},
"source": [
"## Inspect an asset that uses Dagster's default IO manager"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aa537769",
"metadata": {},
"outputs": [],
"source": [
"from dagster import AssetKey\n",
"\n",
"from usage_metrics.etl import defs\n",
"\n",
"asset_key = \"transform_s3_logs\"\n",
"partition_key = \"2024-07-21\"\n",
"\n",
"with defs.get_asset_value_loader() as loader:\n",
" df = loader.load_asset_value(asset_key, partition_key = partition_key)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ requires-python = ">=3.12,<3.13"
dependencies = [
"pandas>=2.2,<2.3",
"sqlalchemy>=2",
"dagster>=1.7, <1.7.17", # Update to 1.7.14 once released, 1.7.13 clashes with Python 3.12
"dagster>=1.7.15, <1.7.17",
"dagster-webserver>=1.7.15,<1.8",
"pandas-gbq>=0.23.1",
"pydata-google-auth>=1.8.2",
"jupyterlab>=4.2.3",
Expand Down
8 changes: 7 additions & 1 deletion src/usage_metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""Module containing dagster tools for cleaning PUDL usage metrics."""

from usage_metrics.repository import datasette_logs, intake_logs # noqa: F401
from usage_metrics.repository import datasette_logs, intake_logs

from . import (
core,
out,
raw,
)
3 changes: 3 additions & 0 deletions src/usage_metrics/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Module contains assets that transform data into core assets."""

from . import s3
104 changes: 104 additions & 0 deletions src/usage_metrics/core/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Transform data from S3 logs."""

import pandas as pd
from dagster import (
AssetExecutionContext,
WeeklyPartitionsDefinition,
asset,
)

from usage_metrics.helpers import geocode_ips


@asset(
partitions_def=WeeklyPartitionsDefinition(start_date="2023-08-16"),
io_manager_key="database_manager",
tags={"source": "s3"},
)
def core_s3_logs(
context: AssetExecutionContext,
raw_s3_logs: pd.DataFrame,
) -> pd.DataFrame:
"""Transform daily S3 logs.
Add column headers, geocode values,
"""
# Name columns
raw_s3_logs.columns = [
"bucket_owner",
"bucket",
"time",
"timezone",
"remote_ip",
"requester",
"request_id",
"operation",
"key",
"request_uri",
"http_status",
"error_code",
"bytes_sent",
"object_size",
"total_time",
"turn_around_time",
"referer",
"user_agent",
"version_id",
"host_id",
"signature_version",
"cipher_suite",
"authentication_type",
"host_header",
"tls_version",
"access_point_arn",
"acl_required",
]

# Drop entirely duplicate rows
raw_s3_logs = raw_s3_logs.drop_duplicates()

# Combine time and timezone columns
raw_s3_logs.time = raw_s3_logs.time + " " + raw_s3_logs.timezone
raw_s3_logs = raw_s3_logs.drop(columns=["timezone"])

# Drop S3 lifecycle transitions
raw_s3_logs = raw_s3_logs.loc[raw_s3_logs.operation != "S3.TRANSITION_INT.OBJECT"]

# Geocode IPS
raw_s3_logs["remote_ip"] = raw_s3_logs["remote_ip"].mask(
raw_s3_logs["remote_ip"].eq("-"), pd.NA
) # Mask null IPs
geocoded_df = geocode_ips(raw_s3_logs)

# Convert string to datetime using Pandas
format_string = "[%d/%b/%Y:%H:%M:%S %z]"
geocoded_df["time"] = pd.to_datetime(geocoded_df.time, format=format_string)

geocoded_df["bytes_sent"] = geocoded_df["bytes_sent"].mask(
geocoded_df["bytes_sent"].eq("-"), 0
)
numeric_fields = [
"bytes_sent",
"http_status",
"object_size",
"total_time",
"turn_around_time",
]
for field in numeric_fields:
geocoded_df[field] = pd.to_numeric(geocoded_df[field], errors="coerce")

geocoded_df = geocoded_df.set_index("request_id")
assert geocoded_df.index.is_unique

# Drop unnecessary geocoding columns
geocoded_df = geocoded_df.drop(
columns=[
"remote_ip_country_flag",
"remote_ip_country_flag_url",
"remote_ip_country_currency",
"remote_ip_continent",
"remote_ip_isEU",
]
)

return geocoded_df.reset_index()
Loading

0 comments on commit 86cffbf

Please sign in to comment.