Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report model and DataStore failure modes #268

Merged
merged 24 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-improc-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=improc docker compose run runtests
TEST_SUBFOLDER=tests/improc docker compose run runtests
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Run Model Tests
name: Run Model Tests 1

on:
push:
Expand Down Expand Up @@ -59,4 +59,5 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=models docker compose run runtests
shopt -s nullglob
TEST_SUBFOLDER=$(ls tests/models/test_{a..l}*.py) docker compose run runtests
63 changes: 63 additions & 0 deletions .github/workflows/run-model-tests-2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: Run Model Tests 2

on:
push:
branches:
- main
pull_request:
workflow_dispatch:

jobs:
tests:
name: run tests in docker image
runs-on: ubuntu-latest
env:
REGISTRY: ghcr.io
COMPOSE_FILE: tests/docker-compose.yaml

steps:
- name: Dump docker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2

- name: checkout code
uses: actions/checkout@v3
with:
submodules: recursive

- name: log into github container registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: setup docker buildx
uses: docker/setup-buildx-action@v2
with:
driver: docker-container

- name: bake
uses: docker/[email protected]
with:
workdir: tests
load: true
files: docker-compose.yaml
set: |
seechange_postgres.tags=ghcr.io/${{ github.repository_owner }}/seechange-postgres
seechange_postgres.cache-from=type=gha,scope=cached-seechange-postgres
seechange_postgres.cache-to=type=gha,scope=cached-seechange-postgres,mode=max
setuptables.tags=ghcr.io/${{ github.repository_owner }}/runtests
setuptables.cache-from=type=gha,scope=cached-seechange
setuptables.cache-to=type=gha,scope=cached-seechange,mode=max
runtests.tags=ghcr.io/${{ github.repository_owner }}/runtests
runtests.cache-from=type=gha,scope=cached-seechange
runtests.cache-to=type=gha,scope=cached-seechange,mode=max
shell.tags=ghcr.io/${{ github.repository_owner }}/runtests
shell.cache-from=type=gha,scope=cached-seechange
shell.cache-to=type=gha,scope=cached-seechange,mode=max

- name: run test
run: |
shopt -s nullglob
TEST_SUBFOLDER=$(ls tests/models/test_{m..z}*.py) docker compose run runtests
2 changes: 1 addition & 1 deletion .github/workflows/run-pipeline-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=pipeline docker compose run runtests
TEST_SUBFOLDER=tests/pipeline docker compose run runtests
2 changes: 1 addition & 1 deletion .github/workflows/run-util-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=util docker compose run runtests
TEST_SUBFOLDER=tests/util docker compose run runtests
76 changes: 76 additions & 0 deletions alembic/versions/2024_05_15_1210-485334f16c23_add_report_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""add report model

Revision ID: 485334f16c23
Revises: 573289f12368
Create Date: 2024-05-15 12:10:56.118620

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '485334f16c23'
down_revision = 'ec64a8fd8cf3'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('reports',
sa.Column('exposure_id', sa.BigInteger(), nullable=False),
sa.Column('section_id', sa.Text(), nullable=False),
sa.Column('start_time', sa.DateTime(), nullable=False),
sa.Column('finish_time', sa.DateTime(), nullable=True),
sa.Column('success', sa.Boolean(), nullable=False),
sa.Column('num_prev_reports', sa.Integer(), nullable=False),
sa.Column('worker_id', sa.Text(), nullable=True),
sa.Column('node_id', sa.Text(), nullable=True),
sa.Column('cluster_id', sa.Text(), nullable=True),
sa.Column('error_step', sa.Text(), nullable=True),
sa.Column('error_type', sa.Text(), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.Column('warnings', sa.Text(), nullable=True),
sa.Column('process_memory', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('process_runtime', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('progress_steps_bitflag', sa.BIGINT(), nullable=False),
sa.Column('products_exist_bitflag', sa.BIGINT(), nullable=False),
sa.Column('products_committed_bitflag', sa.BIGINT(), nullable=False),
sa.Column('provenance_id', sa.String(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('modified', sa.DateTime(), nullable=False),
sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
sa.ForeignKeyConstraint(['exposure_id'], ['exposures.id'], name='reports_exposure_id_fkey', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['provenance_id'], ['provenances.id'], name='images_provenance_id_fkey', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_reports_created_at'), 'reports', ['created_at'], unique=False)
op.create_index(op.f('ix_reports_exposure_id'), 'reports', ['exposure_id'], unique=False)
op.create_index(op.f('ix_reports_finish_time'), 'reports', ['finish_time'], unique=False)
op.create_index(op.f('ix_reports_id'), 'reports', ['id'], unique=False)
op.create_index(op.f('ix_reports_products_committed_bitflag'), 'reports', ['products_committed_bitflag'], unique=False)
op.create_index(op.f('ix_reports_products_exist_bitflag'), 'reports', ['products_exist_bitflag'], unique=False)
op.create_index(op.f('ix_reports_progress_steps_bitflag'), 'reports', ['progress_steps_bitflag'], unique=False)
op.create_index(op.f('ix_reports_provenance_id'), 'reports', ['provenance_id'], unique=False)
op.create_index(op.f('ix_reports_section_id'), 'reports', ['section_id'], unique=False)
op.create_index(op.f('ix_reports_start_time'), 'reports', ['start_time'], unique=False)
op.create_index(op.f('ix_reports_success'), 'reports', ['success'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_reports_success'), table_name='reports')
op.drop_index(op.f('ix_reports_start_time'), table_name='reports')
op.drop_index(op.f('ix_reports_section_id'), table_name='reports')
op.drop_index(op.f('ix_reports_provenance_id'), table_name='reports')
op.drop_index(op.f('ix_reports_progress_steps_bitflag'), table_name='reports')
op.drop_index(op.f('ix_reports_products_exist_bitflag'), table_name='reports')
op.drop_index(op.f('ix_reports_products_committed_bitflag'), table_name='reports')
op.drop_index(op.f('ix_reports_id'), table_name='reports')
op.drop_index(op.f('ix_reports_finish_time'), table_name='reports')
op.drop_index(op.f('ix_reports_exposure_id'), table_name='reports')
op.drop_index(op.f('ix_reports_created_at'), table_name='reports')
op.drop_table('reports')
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""reference instrument

Revision ID: 9a4097979249
Revises: 485334f16c23
Create Date: 2024-05-22 11:22:20.322800

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '9a4097979249'
down_revision = '485334f16c23'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('refs', sa.Column('instrument', sa.Text(), nullable=False))
op.create_index(op.f('ix_refs_instrument'), 'refs', ['instrument'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_refs_instrument'), table_name='refs')
op.drop_column('refs', 'instrument')
# ### end Alembic commands ###
22 changes: 11 additions & 11 deletions docker/application/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,17 @@ ENV PYTHONPATH "/seechange"
#
# Need to install mpich here rather than via package manager to ensure
# ABI compatibility.
ARG mpich_version=4.0.2
ARG mpich_prefix=mpich-$mpich_version
RUN curl -L https://www.mpich.org/static/downloads/$mpich_version/$mpich_prefix.tar.gz -O \
&& tar xf $mpich_prefix.tar.gz \
&& cd $mpich_prefix \
&& ./configure FFLAGS=-fallow-argument-mismatch FCFLAGS=-fallow-argument-mismatch \
&& make -j 16 \
&& make install \
&& make clean \
&& cd .. \
&& rm -rf $mpich_prefix $mpich_prefix.tar.gz
# ARG mpich_version=4.0.2
# ARG mpich_prefix=mpich-$mpich_version
# RUN curl -L https://www.mpich.org/static/downloads/$mpich_version/$mpich_prefix.tar.gz -O \
# && tar xf $mpich_prefix.tar.gz \
# && cd $mpich_prefix \
# && ./configure FFLAGS=-fallow-argument-mismatch FCFLAGS=-fallow-argument-mismatch \
# && make -j 16 \
# && make install \
# && make clean \
# && cd .. \
# && rm -rf $mpich_prefix $mpich_prefix.tar.gz

# Hotpants Alard/Lupton image subtraction
RUN git clone https://github.com/acbecker/hotpants.git \
Expand Down
2 changes: 1 addition & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Only parameters that affect the product values are included.

The upstreams are other `Provenance` objects defined for the data products that
are an input to the current processing step.
The flowchart of the different process steps is defined in `pipeline.datastore.UPSTREAM_NAMES`.
The flowchart of the different process steps is defined in `pipeline.datastore.UPSTREAM_STEPS`.
E.g., the upstreams for the `photo_cal` object are `['extraction', 'astro_cal']`.

When a `Provenance` object has all the required inputs, it will produce a hash identifier
Expand Down
3 changes: 2 additions & 1 deletion models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def setup_warning_filters():
"- parent object of type <Image> has been garbage collected",
)


setup_warning_filters() # need to call this here and also call it explicitly when setting up tests

_engine = None
Expand Down Expand Up @@ -446,7 +447,7 @@ def to_dict(self):
@classmethod
def from_dict(cls, dictionary):
"""Convert a dictionary into a new object. """
dictionary.pop('modified') # we do not want to recreate the object with an old "modified" time
dictionary.pop('modified', None) # we do not want to recreate the object with an old "modified" time

md5sum = dictionary.get('md5sum', None)
if md5sum is not None:
Expand Down
35 changes: 34 additions & 1 deletion models/enums_and_bitflags.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def string_to_bitflag(value, dictionary):
original_keyword = keyword
keyword = EnumConverter.c(keyword)
if keyword not in dictionary:
raise ValueError(f'Keyword "{original_keyword}" not recognized in dictionary')
raise ValueError(f'Keyword "{original_keyword.strip()}" not recognized in dictionary')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the error, perhaps we don't want to strip the spaces, just in case it's spurious leading/trailing spaces that are the problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what I was thinking but we also remove all spaces (leading or otherwise) in the convert method. So this is actually going to lead you down the wrong path if you leave the spaces (as it did for me).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the .strip() in convert. (I looked for it before making this comment.) What am I missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a replace(' ', '') command in the c function.

output += 2 ** dictionary[keyword]
return output

Expand Down Expand Up @@ -409,3 +409,36 @@ class BitFlagConverter( EnumConverter ):
_allowed_values = flag_image_bits
_dict_filtered = None
_dict_inverse = None


# the list of possible processing steps from a section of an exposure up to measurments, r/b scores, and report
process_steps_dict = {
1: 'preprocessing', # creates an Image from a section of the Exposure
2: 'extraction', # creates a SourceList from an Image, and a PSF
3: 'astro_cal', # creates a WorldCoordinates from a SourceList
4: 'photo_cal', # creates a ZeroPoint from a WorldCoordinates
5: 'subtraction', # creates a subtraction Image
6: 'detection', # creates a SourceList from a subtraction Image
7: 'cutting', # creates Cutouts from a subtraction Image
8: 'measuring', # creates Measurements from Cutouts
# TODO: add R/B scores and maybe an extra step for finalizing a report
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That extra step in the TODO would also include alert production.

(My plan is not to save the full alert text, because we can reconstruct them from what's already in the database. What's more, we'll probably have something like kowalski running which will itself be saving all the alert text.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I view alerts as transitory things (as the name suggests), but I have heard a lot of people talk about alerts as if they are a database.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we've had this discussion lately. I think users expect to be able to search through alerts on a database, it is just not clear who will be managing it. I don't think it will cost us much to have an alerts database since we already keep all the data on the objects and measurements, and the alerts will just point to those objects.

}
process_steps_inverse = {EnumConverter.c(v): k for k, v in process_steps_dict.items()}


# the list of objects that could be loaded to a datastore after running the pipeline
pipeline_products_dict = {
1: 'image',
2: 'sources',
3: 'psf',
# 4: 'background', # not yet implemented
5: 'wcs',
6: 'zp',
7: 'sub_image',
8: 'detections',
9: 'cutouts',
10: 'measurements',
# 11: 'rb_scores',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ref_image be in here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. I was thinking about this list as a progress bar that fills up with things made by the pipeline. The reference would be a fixture that is needed before you even start.

}

pipeline_products_inverse = {EnumConverter.c(v): k for k, v in pipeline_products_dict.items()}
51 changes: 51 additions & 0 deletions models/exposure.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import pathlib
from collections import defaultdict

Expand All @@ -7,6 +8,7 @@
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.orm.session import object_session
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.exc import IntegrityError

from astropy.time import Time
from astropy.io import fits
Expand Down Expand Up @@ -522,6 +524,14 @@ def end_mjd(self):
return None
return self.mjd + self.exp_time / 86400.0

@property
def observation_time(self):
"""Translation of the MJD column to datetime object."""
if self.mjd is None:
return None
else:
return Time(self.mjd, format='mjd').datetime

def __repr__(self):

filter_str = '--'
Expand Down Expand Up @@ -735,6 +745,47 @@ def get_downstreams(self, session=None):

return images

def merge_concurrent(self, session=None):
"""Try multiple times to fetch and merge this exposure.
This will hopefully protect us against concurrently adding the exposure from multiple processes.
Should also be safe to use in case that the same exposure (i.e., with the same filepath)
was added by previous runs.
"""
exposure = None
with SmartSession(session) as session:

for i in range(5):
try:
found_exp = session.scalars(
sa.select(Exposure).where(Exposure.filepath == self.filepath)
).first()
if found_exp is None:
exposure = session.merge(self)
session.commit()
else:
# update the found exposure with any modifications on the existing exposure
columns = Exposure.__table__.columns.keys()
for col in columns:
if col in ['id', 'created_at', 'modified']:
continue
setattr(found_exp, col, getattr(self, col))
exposure = found_exp

break # if we got here without an exception, we can break out of the loop
except IntegrityError as e:
# this could happen if in between the query and the merge(exposure)
# another process added the same exposure to the database
if 'duplicate key value violates unique constraint "ix_exposures_filepath"' in str(e):
SCLogger.debug(str(e))
session.rollback()
time.sleep(0.1 * 2 ** i) # exponential backoff
else:
raise e
else: # if we didn't break out of the loop, there must have been some integrity error
raise e

return exposure


if __name__ == '__main__':
import os
Expand Down
3 changes: 0 additions & 3 deletions models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,6 @@ def load(self):
if not ( gotim and gotweight and gotflags ):
raise FileNotFoundError( "Failed to load at least one of image, weight, flags" )


def free( self, free_derived_products=True, free_aligned=True, only_free=None ):
"""Free loaded image memory. Does not delete anything from disk.

Expand Down Expand Up @@ -1557,8 +1556,6 @@ def free( self, free_derived_products=True, free_aligned=True, only_free=None ):
for alim in self._aligned_images:
alim.free( free_derived_products=free_derived_products, only_free=only_free )



def get_upstream_provenances(self):
"""Collect the provenances for all upstream objects.

Expand Down
Loading
Loading