Skip to content

Commit

Permalink
Merge pull request #163 from catalyst-cooperative/duckdb_io
Browse files Browse the repository at this point in the history
Create S3 production/cloud processing pipeline and update dagster infrastructure
  • Loading branch information
e-belfer authored Sep 17, 2024
2 parents 3074ae0 + dcfb683 commit ff0f41f
Show file tree
Hide file tree
Showing 19 changed files with 742 additions and 293 deletions.
77 changes: 29 additions & 48 deletions .github/workflows/load-metrics.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ name: load-metrics

on:
workflow_dispatch:
# schedule:
# - cron: "14 0 * * *" # Every day at 12:14 AM UTC
schedule:
- cron: "14 7 * * 1" # Every Monday at 7:14 AM UTC

env:
PRESET_IP1: 44.193.153.196
PRESET_IP2: 52.70.123.52
PRESET_IP3: 54.83.88.93
METRICS_PROD_ENV: "prod"

jobs:
load-metrics:
Expand All @@ -29,69 +27,52 @@ jobs:
service_account: "pudl-usage-metrics-etl@catalyst-cooperative-pudl.iam.gserviceaccount.com"
create_credentials_file: true

- name: Set up conda environment for testing
uses: conda-incubator/setup-[email protected]
- name: Install Conda environment using mamba
uses: mamba-org/setup-micromamba@v1
with:
miniforge-variant: Mambaforge
miniforge-version: latest
use-mamba: true
mamba-version: "*"
channels: conda-forge,defaults
channel-priority: true
python-version: ${{ matrix.python-version }}
activate-environment: pudl-usage-metrics
environment-file: environment.yml
- shell: bash -l {0}
run: |
mamba info
mamba list
conda config --show-sources
conda config --show
printenv | sort
cache-environment: true
condarc: |
channels:
- conda-forge
- defaults
channel_priority: strict
- name: Get GitHub Action runner's IP Address
id: ip
uses: haythem/[email protected]
run: ipv4=$(curl --silent --url https://api.ipify.org); echo "ipv4=$ipv4" >> $GITHUB_OUTPUT

- name: Echo IP for github runner
run: |
echo ${{ steps.ip.outputs.ipv4 }}
- name: Whitelist Github Action and Superset IPs
run: |
gcloud sql instances patch ${{ secrets.GCSQL_INSTANCE_NAME }} --authorized-networks=${{ steps.ip.outputs.ipv4 }},${{ env.PRESET_IP1 }},${{ env.PRESET_IP2 }},${{ env.PRESET_IP3 }}
gcloud sql instances patch ${{ secrets.GCSQL_INSTANCE_NAME }} --authorized-networks=${{ steps.ip.outputs.ipv4 }}
- name: Run ETL
- name: Run ETL on the latest full week of data
id: load-data
env:
IPINFO_TOKEN: ${{ secrets.IPINFO_TOKEN }}
POSTGRES_IP: ${{ secrets.POSTGRES_IP }}
POSTGRES_USER: ${{ secrets.POSTGRES_USER }}
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
POSTGRES_DB: ${{ secrets.POSTGRES_DB }}
POSTGRES_PORT: ${{ secrets.POSTGRES_PORT }}
shell: bash -l {0}
run: |
mamba run -n pudl-usage-metrics python run_data_update.py
python run_data_update.py
- name: Remove Github Action runner's IP
run: |
gcloud sql instances patch ${{ secrets.GCSQL_INSTANCE_NAME }} --authorized-networks=${{ env.PRESET_IP1 }},${{ env.PRESET_IP2 }},${{ env.PRESET_IP3 }}
gcloud sql instances patch ${{ secrets.GCSQL_INSTANCE_NAME }} --clear-authorized-networks
ci-notify:
runs-on: ubuntu-latest
if: ${{ always() }}
needs: load-metrics
steps:
- name: Inform the Codemonkeys
uses: 8398a7/action-slack@v3
- name: Post to pudl-deployments channel
if: always()
id: slack
uses: slackapi/slack-github-action@v1
with:
status: custom
fields: workflow,job,commit,repo,ref,author,took
custom_payload: |
{
username: 'action-slack',
icon_emoji: ':octocat:',
attachments: [{
color: '${{ needs.ci-test.result }}' === 'success' ? 'good' : '${{ needs.ci-test.result }}' === 'failure' ? 'danger' : 'warning',
text: `${process.env.AS_REPO}@${process.env.AS_REF}\n ${process.env.AS_WORKFLOW} (${process.env.AS_COMMIT})\n by ${process.env.AS_AUTHOR}\n Status: ${{ needs.ci-test.result }}`,
}]
}
channel-id: "C03FHB9N0PQ"
slack-message: "Weekly usage metrics processing ran with status: ${{ job.status }}."
env:
GITHUB_TOKEN: ${{ github.token }} # required
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} # required
MATRIX_CONTEXT: ${{ toJson(matrix) }} # required
SLACK_BOT_TOKEN: ${{ secrets.PUDL_DEPLOY_SLACK_TOKEN }}
79 changes: 49 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ This is the project structure generated by the [dagster cli](https://docs.dagste

# Setup

## Conda Environment
## Mamba Environment

We use the conda package manager to specify and update our development environment. We recommend using [miniconda](https://docs.conda.io/en/latest/miniconda.html) rather than the large pre-defined collection of scientific packages bundled together in the Anaconda Python distribution. You may also want to consider using [mamba](https://github.com/mamba-org/mamba) – a faster drop-in replacement for conda written in C++.
We use the mamba package manager to specify and update our development environment.

```
conda update conda
conda env create --name pudl-usage-metrics --file environment.yml
conda activate pudl-usage-metrics
mamba update mamba
mamba env create --name pudl-usage-metrics --file environment.yml
mamba activate pudl-usage-metrics
```

## Environment Variables

The ETL uses [ipinfo](https://ipinfo.io/) to geocode ip addresses. You need to obtain an ipinfo API token and store it in the `IPINFO_TOKEN` environment variable.

If you want to take advantage of caching raw logs, rather than redownloading them for each run, you can set the optional ``DATA_DIR`` environment variable. If this is not set, the script will save files to a temporary directory by default. This is highly recommended to avoid unnecessary egress charges.
If you want to take advantage of caching raw logs, rather than redownloading them for each run, you can set the optional ``DATA_DIR`` environment variable. If this is not set, the script will save files to a temporary directory by default.

Dagster stores run logs and caches in a directory stored in the `DAGSTER_HOME` environment variable. The `usage_metrics/dagster_home/dagster.yaml` file contains configuration for the dagster instance. **Note:** The `usage_metrics/dagster_home/storage` directory could grow to become a couple GBs because all op outputs for every run are stored there. You can read more about the dagster_home directory in the [dagster docs](https://docs.dagster.io/deployment/dagster-instance#default-local-behavior).

Expand All @@ -39,13 +39,13 @@ To use the Kaggle API, [sign up for a Kaggle account](https://www.kaggle.com). T
To set these environment variables, run these commands:

```
conda activate pudl-usage-metrics
conda env config vars set IPINFO_TOKEN="{your_api_key_here}"
conda env config vars set DAGSTER_HOME="$(pwd)/dagster_home/"
conda env config vars set DATA_DIR="$(pwd)/data/"
conda env config vars set KAGGLE_USER="{your_kaggle_username_here}" # If setting manually
conda env config vars set KAGGLE_KEY="{your_kaggle_api_key_here}" # If setting manually
conda activate pudl-usage-metrics
mamba activate pudl-usage-metrics
mamba env config vars set IPINFO_TOKEN="{your_api_key_here}"
mamba env config vars set DAGSTER_HOME="$(pwd)/dagster_home/"
mamba env config vars set DATA_DIR="$(pwd)/data/"
mamba env config vars set KAGGLE_USER="{your_kaggle_username_here}" # If setting manually
mamba env config vars set KAGGLE_KEY="{your_kaggle_api_key_here}" # If setting manually
mamba activate pudl-usage-metrics
```

## Google Cloud Permissions
Expand Down Expand Up @@ -89,7 +89,7 @@ When running backfills, this prevents you from kicking off 80 concurrent runs th
In one terminal window start the dagster-daemon and UI by running these commands:

```
conda activate pudl-usage-metrics
mamba activate pudl-usage-metrics
dagster dev -m usage_metrics.etl
```

Expand All @@ -113,42 +113,61 @@ You can run the ETL via the dagit UI or the [dagster CLI](https://docs.dagster.i

To run a a complete backfill from the Dagit UI go to the job's partitions tab. Then click on the "Launch Backfill" button in the upper left corner of the window. This should bring up a new window with a list of partitions. Click "Select All" and then click the "Submit" button. This will submit a run for each partition. You can follow the runs on the ["Runs" tab](http://localhost:3000/instance/runs).

### Databases
### Local vs. production development

#### SQLite
The choice between local development (written to an SQLite database) and production development (written to a Google CloudSQL Postgres database) is determined through the `METRIC_PROD_ENV` environment variable. By default, if this is not set you will develop locally. To set this variable to develop in production, run the following:

Jobs in the `local_usage_metrics` dagster repository create a sqlite database called `usage_metrics.db` in the `usage_metrics/data/` directory. A primary key constraint error will be thrown if you rerun the ETL for a partition. If you want to recreate the entire database just delete the sqlite database and rerun the ETL.
```
mamba env config vars set METRIC_PROD_ENV='prod'
mamba activate pudl-usage-metrics
```

To revert to local development, set `METRIC_PROD_ENV='local'`.

#### Schema management
We use Alembic to manage the schemas of both local and production databases. Whenever a new column or table is added, run the following commands to create a new schema migration and then upgrade the database schema to match using the following code:

```
alembic revision --autogenerate -m "Add my cool new table"
alembic upgrade head
```

Because of the primary key constraints, if you need to rerun a partition that has already been run before you'll need to delete the database and start over. If you're adding a new table or datasource, run a backfill just for that dataset's particular job to avoid this constraint.

#### Local development (SQLite)

Local development will create a sqlite database called `usage_metrics.db` in the `usage_metrics/data/` directory. A primary key constraint error will be thrown if you rerun the ETL for a partition. If you want to recreate the entire database just delete the sqlite database and rerun the ETL.

#### Google Cloud SQL Postgres
#### Production development (Google Cloud SQL Postgres)

Jobs in the `gcp_usage_metrics` dagster repository append new partitions to tables in a Cloud SQL postgres database. A primary key constraint error will be thrown if you rerun the ETL for a partition. The `load-metrics` GitHub action is responsible for updating the database with new partitioned data.
Production runs will append new partitions to tables in a Cloud SQL postgres database. A primary key constraint error will be thrown if you rerun the ETL for a partition. The `load-metrics` GitHub action is responsible for updating the database with new partitioned data.

If a new column is added or data is processed in a new way, you'll have to delete the table in the database and rerun a complete backfill. **Note: The Preset dashboard will be unavailable during the complete backfill.**
If a new column is added or data is processed in a new way, you'll have to delete the table in the database and rerun a complete backfill.

To run jobs in the `gcp_usage_metrics` repo, you need to whitelist your ip address for the database:

```
gcloud sql instances patch pudl-usage-metrics-db --authorized-networks={YOUR_IP_ADDRESS}
```

Then add the connection details as environment variables to your conda environment:
Then add the connection details as environment variables to your mamba environment:

```
conda activate pudl-usage-metrics
conda env config vars set POSTGRES_IP={PUDL_USAGE_METRICS_DB_IP}
conda env config vars set POSTGRES_USER={PUDL_USAGE_METRICS_DB_USER}
conda env config vars set POSTGRES_PASSWORD={PUDL_USAGE_METRICS_DB_PASSWORD}
conda env config vars set POSTGRES_DB={PUDL_USAGE_METRICS_DB_DB}
conda env config vars set POSTGRES_PORT={PUDL_USAGE_METRICS_DB_PORT}
conda activate pudl-usage-metrics
mamba activate pudl-usage-metrics
mamba env config vars set POSTGRES_IP={PUDL_USAGE_METRICS_DB_IP}
mamba env config vars set POSTGRES_USER={PUDL_USAGE_METRICS_DB_USER}
mamba env config vars set POSTGRES_PASSWORD={PUDL_USAGE_METRICS_DB_PASSWORD}
mamba env config vars set POSTGRES_DB={PUDL_USAGE_METRICS_DB_DB}
mamba env config vars set POSTGRES_PORT={PUDL_USAGE_METRICS_DB_PORT}
mamba activate pudl-usage-metrics
```

You can find the connection details in the
Ask a member of Inframundo for the connection details.

### IP Geocoding with ipinfo

The ETL uses [ipinfo](https://ipinfo.io/) for geocoding the user ip addresses which provides 50k free API requests a month. The `usage_metrics.helpers.geocode_ip()` function using [joblib](https://joblib.readthedocs.io/en/latest/#main-features) to cache API calls so we don't call the API multiple times for a single ip address. The first time you run the ETL no API calls will be cached so the `geocode_ips()` op will take a while to complete.

## Add new data sources

To add a new data source to the dagster repo, add new modules to the `raw` and `core` and `out` directories and add these modules to the corresponding jobs. Once the dataset has been tested locally, run a complete backfill for the job that uses the `PostgresManager` to populate the Cloud SQL database.
To add a new data source to the dagster repo, add new modules to the `raw` and `core` and `out` directories and add these modules to the corresponding jobs. Once the dataset has been tested locally, run a complete backfill for the job with `METRIC_PROD_ENV="prod"` to populate the Cloud SQL database.
107 changes: 107 additions & 0 deletions alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# A generic, single database configuration.

[alembic]
# path to migration scripts
script_location = migrations

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =

# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.

# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = INFO
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = WARN
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
1 change: 1 addition & 0 deletions migrations/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Generic single-database configuration.
Loading

0 comments on commit ff0f41f

Please sign in to comment.