Skip to content

Commit

Permalink
Merge branch 'main' into task/remove-jsonpickle
Browse files Browse the repository at this point in the history
  • Loading branch information
dgboss authored Nov 19, 2024
2 parents 5d0b631 + b04be48 commit f5738a3
Show file tree
Hide file tree
Showing 14 changed files with 688 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ jobs:
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_hourly_prune_cronjob.sh ${SUFFIX} apply
- name: Partitioner cronjob
shell: bash
run: |
oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}"
PROJ_DEV="e1e498-dev" SCHEDULE="0 0 1 * *" bash openshift/scripts/oc_provision_partitioner_cronjob.sh ${SUFFIX} apply
# TODO: Delete once pmtiles has run for some time
# deploy-tileserv:
# name: Deploy tileserv to Dev
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""partition weather_station_model_predictions part 1
Revision ID: 07007f659064
Revises: c5bea0920d53
Create Date: 2024-11-04 10:41:31.466124
"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "07007f659064"
down_revision = "c5bea0920d53"
branch_labels = None
depends_on = None

### Adapted from pgslice "prep" command
# BEGIN;
# CREATE TABLE "public"."weather_station_model_predictions_intermediate" (LIKE "public"."weather_station_model_predictions" INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS INCLUDING STATISTICS INCLUDING GENERATED INCLUDING COMPRESSION) PARTITION BY RANGE ("prediction_timestamp");
# CREATE UNIQUE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code, prediction_model_run_timestamp_id, prediction_timestamp);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (id);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_model_run_timestamp_id);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (update_date);
# CREATE INDEX ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp, station_code);
# ALTER TABLE "public"."weather_station_model_predictions_intermediate" ADD FOREIGN KEY (prediction_model_run_timestamp_id) REFERENCES prediction_model_run_timestamps(id);
# COMMENT ON TABLE "public"."weather_station_model_predictions_intermediate" IS 'column:prediction_timestamp,period:month,cast:timestamptz,version:3';
# COMMIT;


def upgrade():
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_intermediate" (LIKE "public"."weather_station_model_predictions" INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS INCLUDING STATISTICS INCLUDING GENERATED INCLUDING COMPRESSION) PARTITION BY RANGE ("prediction_timestamp");'
)
op.execute(
'CREATE UNIQUE INDEX wsmp_unique_record_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code, prediction_model_run_timestamp_id, prediction_timestamp);'
)
op.execute('CREATE INDEX wsmp_id_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (id);')
op.execute('CREATE INDEX wsmp_prediction_model_run_timestamp_id_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_model_run_timestamp_id);')
op.execute('CREATE INDEX wsmp_prediction_timestamp_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp);')
op.execute('CREATE INDEX wsmp_station_code_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (station_code);')
op.execute('CREATE INDEX wsmp_update_date_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (update_date);')
op.execute('CREATE INDEX wsmp_prediction_station_code_idx ON "public"."weather_station_model_predictions_intermediate" USING btree (prediction_timestamp, station_code);')
op.execute(
'ALTER TABLE "public"."weather_station_model_predictions_intermediate" ADD CONSTRAINT wsmp_id_fk FOREIGN KEY (prediction_model_run_timestamp_id) REFERENCES prediction_model_run_timestamps(id);'
)
op.execute('COMMENT ON TABLE "public"."weather_station_model_predictions_intermediate" IS \'column:prediction_timestamp,period:month,cast:timestamptz,version:3\';')


def downgrade():
op.execute('COMMENT ON TABLE "public"."weather_station_model_predictions_intermediate" IS NULL;')
op.execute('ALTER TABLE "public"."weather_station_model_predictions_intermediate" DROP CONSTRAINT wsmp_id_fk;')
op.execute("DROP INDEX wsmp_prediction_station_code_idx;")
op.execute("DROP INDEX wsmp_update_date_idx;")
op.execute("DROP INDEX wsmp_station_code_idx;")
op.execute("DROP INDEX wsmp_prediction_timestamp_idx;")
op.execute("DROP INDEX wsmp_prediction_model_run_timestamp_id_idx;")
op.execute("DROP INDEX wsmp_id_idx;")
op.execute("DROP INDEX wsmp_unique_record_idx;")
op.execute('DROP TABLE "public"."weather_station_model_predictions_intermediate"')
105 changes: 105 additions & 0 deletions api/alembic/versions/362d268606f3_partition_weather_station_model_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""partition weather_station_model_predictions part 2
Revision ID: 362d268606f3
Revises: 07007f659064
Create Date: 2024-11-04 11:02:57.501656
"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "362d268606f3"
down_revision = "07007f659064"
branch_labels = None
depends_on = None

### Adapted from pgslice "add_partitions" command, partitions previous 6 months, and the future 3 months
# BEGIN;
# CREATE TABLE "public"."weather_station_model_predictions_202405" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-05-01 00:00:00 UTC') TO ('2024-06-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202405" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202406" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-06-01 00:00:00 UTC') TO ('2024-07-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202406" ADD PRIMARY KEY ("id");


# CREATE TABLE "public"."weather_station_model_predictions_202407" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-07-01 00:00:00 UTC') TO ('2024-08-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202407" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202408" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-08-01 00:00:00 UTC') TO ('2024-09-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202408" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202409" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-09-01 00:00:00 UTC') TO ('2024-10-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202409" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202410" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-10-01 00:00:00 UTC') TO ('2024-11-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202410" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202411" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-11-01 00:00:00 UTC') TO ('2024-12-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202411" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202412" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2024-12-01 00:00:00 UTC') TO ('2025-01-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202412" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202501" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2025-01-01 00:00:00 UTC') TO ('2025-02-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202501" ADD PRIMARY KEY ("id");

# CREATE TABLE "public"."weather_station_model_predictions_202502" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM ('2025-02-01 00:00:00 UTC') TO ('2025-03-01 00:00:00 UTC');
# ALTER TABLE "public"."weather_station_model_predictions_202502" ADD PRIMARY KEY ("id");
# COMMIT;


def upgrade():
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202405" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-05-01 00:00:00 UTC\') TO (\'2024-06-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202405" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202406" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-06-01 00:00:00 UTC\') TO (\'2024-07-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202406" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202407" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-07-01 00:00:00 UTC\') TO (\'2024-08-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202407" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202408" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-08-01 00:00:00 UTC\') TO (\'2024-09-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202408" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202409" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-09-01 00:00:00 UTC\') TO (\'2024-10-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202409" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202410" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-10-01 00:00:00 UTC\') TO (\'2024-11-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202410" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202411" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-11-01 00:00:00 UTC\') TO (\'2024-12-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202411" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202412" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2024-12-01 00:00:00 UTC\') TO (\'2025-01-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202412" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202501" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2025-01-01 00:00:00 UTC\') TO (\'2025-02-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202501" ADD PRIMARY KEY ("id");')
op.execute(
'CREATE TABLE "public"."weather_station_model_predictions_202502" PARTITION OF "public"."weather_station_model_predictions_intermediate" FOR VALUES FROM (\'2025-02-01 00:00:00 UTC\') TO (\'2025-03-01 00:00:00 UTC\');'
)
op.execute('ALTER TABLE "public"."weather_station_model_predictions_202502" ADD PRIMARY KEY ("id");')


def downgrade():
op.execute("DROP TABLE weather_station_model_predictions_202502;")
op.execute("DROP TABLE weather_station_model_predictions_202501;")
op.execute("DROP TABLE weather_station_model_predictions_202412;")
op.execute("DROP TABLE weather_station_model_predictions_202411;")
op.execute("DROP TABLE weather_station_model_predictions_202410;")
op.execute("DROP TABLE weather_station_model_predictions_202409;")
op.execute("DROP TABLE weather_station_model_predictions_202408;")
op.execute("DROP TABLE weather_station_model_predictions_202407;")
op.execute("DROP TABLE weather_station_model_predictions_202406;")
op.execute("DROP TABLE weather_station_model_predictions_202405;")
15 changes: 15 additions & 0 deletions openshift/pgslice/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM ankane/pgslice:v0.6.1

RUN apk update && apk add unzip bash

# Download the Amazon CLI installer.
ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip

# Switch to root user for package installs.
USER root
RUN unzip /tmp/awscliv2.zip -d /tmp/ &&\
/tmp/aws/install


COPY fill_partition_data.sh .
COPY partition_and_archive.sh .
60 changes: 60 additions & 0 deletions openshift/pgslice/docker/fill_partition_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash

# Fills recently partitioned tables with data from the origin table and swaps in the partition
#
# usage example:
# PG_PASSWORD=wps PG_HOSTNAME=localhost PG_PORT=5432 PG_USER=wps PG_DATABASE=wps TABLE=table ./fill_partition_data.sh

# From http://redsymbol.net/articles/unofficial-bash-strict-mode/
# Exits execution if any command fails for safety

set -euxo pipefail

# We can extend this later on to be a list of tables
TABLE=weather_station_model_predictions

# variable checks
if [ -z ${PG_PASSWORD+0} ]
then
echo "PG_PASSWORD not specified"
echo "Specify a postgres password"
exit 1
fi

if [ -z ${PG_HOSTNAME+0} ]
then
echo "PG_HOSTNAME not specified"
echo "Specify a postgres hostname"
exit 1
fi

if [ -z ${PG_PORT+0} ]
then
echo "PG_PORT not specified"
echo "Specify a postgres port"
exit 1
fi

if [ -z ${PG_USER+0} ]
then
echo "PG_USER not specified"
echo "Specify a postgres user"
exit 1
fi

if [ -z ${PG_DATABASE+0} ]
then
echo "PG_DATABASE not specified"
echo "Specify a postgres database"
exit 1
fi

export PGSLICE_URL="postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}"
# Fill the partitions with data from the original table
pgslice fill $TABLE
# Analyze for query planner
pgslice analyze $TABLE
# Swap the intermediate table with the original table
pgslice swap $TABLE
# Fill the rest (rows inserted between the first fill and the swap)
pgslice fill $TABLE --swapped
65 changes: 65 additions & 0 deletions openshift/pgslice/docker/partition_and_archive.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/bin/bash

# Adds a new partition for the next month, fills and swaps any partitions
#
# usage example:
# PG_PASSWORD=wps PG_HOSTNAME=localhost PG_PORT=5432 PG_USER=wps PG_DATABASE=wps TABLE=table ./partition_and_archive.sh

# From http://redsymbol.net/articles/unofficial-bash-strict-mode/
# Exits execution if any command fails for safety

set -euxo pipefail

# We can extend this later on to be a list of tables
TABLE=weather_station_model_predictions

# variable checks
if [ -z ${PG_PASSWORD+0} ]
then
echo "PG_PASSWORD not specified"
echo "Specify a postgres password"
exit 1
fi

if [ -z ${PG_HOSTNAME+0} ]
then
echo "PG_HOSTNAME not specified"
echo "Specify a postgres hostname"
exit 1
fi

if [ -z ${PG_PORT+0} ]
then
echo "PG_PORT not specified"
echo "Specify a postgres port"
exit 1
fi

if [ -z ${PG_USER+0} ]
then
echo "PG_USER not specified"
echo "Specify a postgres user"
exit 1
fi

if [ -z ${PG_DATABASE+0} ]
then
echo "PG_DATABASE not specified"
echo "Specify a postgres database"
exit 1
fi

export PGSLICE_URL = "postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}"
# Add partitions to the intermediate table (assumes it already exists)
pgslice add_partitions $TABLE --intermediate --future 1
# Fill the partitions with data from the original table
pgslice fill $TABLE
# Analyze for query planner
pgslice analyze $TABLE
# Swap the intermediate table with the original table
pgslice swap $TABLE
# Fill the rest (rows inserted between the first fill and the swap)
pgslice fill $TABLE --swapped
# Dump any retired tables to S3 and drop
pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY}" AWS_SECRET_ACCESS_KEY="${AWS_SECRET_KEY}" aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${TABLE}_retired.dump.gz"
psql -c "DROP TABLE ${TABLE}_retired" $PGSLICE_URL
27 changes: 27 additions & 0 deletions openshift/pgslice/openshift/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# pgslice partitioning image

Uses https://hub.docker.com/r/ankane/pgslice to run the pgslice commands
Runs `fill`, `analyze` and `swap` for a newly partitioned table where the original has data, to fill the partitions with existing data.

## Building

### Create a new build

```bash
`oc -n e1e498-tools process -f build.yaml -p VERSION=05-11-2024 | oc -n e1e498-tools apply -f -`
```

### Kick off a new build

```bash
oc -n e1e498-tools start-build pgslice --follow
```

### Tag image for prod

```bash
# tag this image with todays' date so we can revert to it again
oc -n e1e498-tools tag pgslice:dev pgslice:<yyyy-mm-dd>
# tag it for production
oc -n e1e498-tools tag pgslice:dev s3-backup:prod
```
Loading

0 comments on commit f5738a3

Please sign in to comment.