Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report model and DataStore failure modes #268

Merged
merged 24 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-improc-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=improc docker compose run runtests
TEST_SUBFOLDER=tests/improc docker compose run runtests
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Run Model Tests
name: Run Model Tests 1

on:
push:
Expand Down Expand Up @@ -59,4 +59,5 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=models docker compose run runtests
shopt -s nullglob
TEST_SUBFOLDER=$(ls tests/models/test_{a..l}*.py) docker compose run runtests
63 changes: 63 additions & 0 deletions .github/workflows/run-model-tests-2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: Run Model Tests 2

on:
push:
branches:
- main
pull_request:
workflow_dispatch:

jobs:
tests:
name: run tests in docker image
runs-on: ubuntu-latest
env:
REGISTRY: ghcr.io
COMPOSE_FILE: tests/docker-compose.yaml

steps:
- name: Dump docker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2

- name: checkout code
uses: actions/checkout@v3
with:
submodules: recursive

- name: log into github container registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: setup docker buildx
uses: docker/setup-buildx-action@v2
with:
driver: docker-container

- name: bake
uses: docker/[email protected]
with:
workdir: tests
load: true
files: docker-compose.yaml
set: |
seechange_postgres.tags=ghcr.io/${{ github.repository_owner }}/seechange-postgres
seechange_postgres.cache-from=type=gha,scope=cached-seechange-postgres
seechange_postgres.cache-to=type=gha,scope=cached-seechange-postgres,mode=max
setuptables.tags=ghcr.io/${{ github.repository_owner }}/runtests
setuptables.cache-from=type=gha,scope=cached-seechange
setuptables.cache-to=type=gha,scope=cached-seechange,mode=max
runtests.tags=ghcr.io/${{ github.repository_owner }}/runtests
runtests.cache-from=type=gha,scope=cached-seechange
runtests.cache-to=type=gha,scope=cached-seechange,mode=max
shell.tags=ghcr.io/${{ github.repository_owner }}/runtests
shell.cache-from=type=gha,scope=cached-seechange
shell.cache-to=type=gha,scope=cached-seechange,mode=max

- name: run test
run: |
shopt -s nullglob
TEST_SUBFOLDER=$(ls tests/models/test_{m..z}*.py) docker compose run runtests
2 changes: 1 addition & 1 deletion .github/workflows/run-pipeline-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=pipeline docker compose run runtests
TEST_SUBFOLDER=tests/pipeline docker compose run runtests
2 changes: 1 addition & 1 deletion .github/workflows/run-util-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ jobs:

- name: run test
run: |
TEST_SUBFOLDER=util docker compose run runtests
TEST_SUBFOLDER=tests/util docker compose run runtests
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

# revision identifiers, used by Alembic.
revision = '485334f16c23'
down_revision = '573289f12368'
down_revision = 'ec64a8fd8cf3'
branch_labels = None
depends_on = None

Expand All @@ -27,6 +27,7 @@ def upgrade() -> None:
sa.Column('num_prev_reports', sa.Integer(), nullable=False),
sa.Column('worker_id', sa.Text(), nullable=True),
sa.Column('node_id', sa.Text(), nullable=True),
sa.Column('cluster_id', sa.Text(), nullable=True),
sa.Column('error_step', sa.Text(), nullable=True),
sa.Column('error_type', sa.Text(), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""reference instrument

Revision ID: 9a4097979249
Revises: 485334f16c23
Create Date: 2024-05-22 11:22:20.322800

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '9a4097979249'
down_revision = '485334f16c23'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('refs', sa.Column('instrument', sa.Text(), nullable=False))
op.create_index(op.f('ix_refs_instrument'), 'refs', ['instrument'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_refs_instrument'), table_name='refs')
op.drop_column('refs', 'instrument')
# ### end Alembic commands ###
22 changes: 11 additions & 11 deletions docker/application/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,17 @@ ENV PYTHONPATH "/seechange"
#
# Need to install mpich here rather than via package manager to ensure
# ABI compatibility.
ARG mpich_version=4.0.2
ARG mpich_prefix=mpich-$mpich_version
RUN curl -L https://www.mpich.org/static/downloads/$mpich_version/$mpich_prefix.tar.gz -O \
&& tar xf $mpich_prefix.tar.gz \
&& cd $mpich_prefix \
&& ./configure FFLAGS=-fallow-argument-mismatch FCFLAGS=-fallow-argument-mismatch \
&& make -j 16 \
&& make install \
&& make clean \
&& cd .. \
&& rm -rf $mpich_prefix $mpich_prefix.tar.gz
# ARG mpich_version=4.0.2
# ARG mpich_prefix=mpich-$mpich_version
# RUN curl -L https://www.mpich.org/static/downloads/$mpich_version/$mpich_prefix.tar.gz -O \
# && tar xf $mpich_prefix.tar.gz \
# && cd $mpich_prefix \
# && ./configure FFLAGS=-fallow-argument-mismatch FCFLAGS=-fallow-argument-mismatch \
# && make -j 16 \
# && make install \
# && make clean \
# && cd .. \
# && rm -rf $mpich_prefix $mpich_prefix.tar.gz

# Hotpants Alard/Lupton image subtraction
RUN git clone https://github.com/acbecker/hotpants.git \
Expand Down
2 changes: 1 addition & 1 deletion models/exposure.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ def merge_concurrent(self, session=None):
# this could happen if in between the query and the merge(exposure)
# another process added the same exposure to the database
if 'duplicate key value violates unique constraint "ix_exposures_filepath"' in str(e):
print(str(e))
SCLogger.debug(str(e))
session.rollback()
time.sleep(0.1 * 2 ** i) # exponential backoff
else:
Expand Down
3 changes: 0 additions & 3 deletions models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,6 @@ def load(self):
if not ( gotim and gotweight and gotflags ):
raise FileNotFoundError( "Failed to load at least one of image, weight, flags" )


def free( self, free_derived_products=True, free_aligned=True, only_free=None ):
"""Free loaded image memory. Does not delete anything from disk.

Expand Down Expand Up @@ -1557,8 +1556,6 @@ def free( self, free_derived_products=True, free_aligned=True, only_free=None ):
for alim in self._aligned_images:
alim.free( free_derived_products=free_derived_products, only_free=only_free )



def get_upstream_provenances(self):
"""Collect the provenances for all upstream objects.

Expand Down
83 changes: 9 additions & 74 deletions models/reference.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy import orm, func

from models.base import Base, AutoIDMixin, SmartSession
from models.image import Image
Expand Down Expand Up @@ -52,6 +52,13 @@ class Reference(Base, AutoIDMixin):
)
)

instrument = sa.Column(
sa.Text,
nullable=False,
index=True,
doc="Name of the instrument used to make the images for this reference image. "
)

filter = sa.Column(
sa.Text,
nullable=False,
Expand Down Expand Up @@ -138,6 +145,7 @@ def __init__(self, **kwargs):
def __setattr__(self, key, value):
if key == 'image' and value is not None:
self.target = value.target
self.instrument = value.instrument
self.filter = value.filter
self.section_id = value.section_id
self.sources = value.sources
Expand Down Expand Up @@ -273,76 +281,3 @@ def merge_all(self, session):
new_ref.image = self.image.merge_all(session)

return new_ref

@staticmethod
def check_reference(ref, filter, target, obs_time):
"""Check if the given reference is valid for the given filter, target, and observation time.
If the reference has is_bad==True, it will also not be considered valid.
Parameters
----------
filter: str
The filter of the image/exposure.
target: str
The target of the image/exposure, or the name of the field. # TODO: can we replace this with coordinates?
obs_time: datetime
The observation time of the image.

Returns
-------
bool:
True if the reference is valid for the given filter, target, and observation time.
"""
return (
(ref.validity_start is None or ref.validity_start <= obs_time) and
(ref.validity_end is None or ref.validity_end >= obs_time) and
ref.filter == filter and ref.target == target and
ref.is_bad is False
)

@staticmethod
def get_reference(filter, target, obs_time, session=None):
"""
Get a reference for a given filter, target, and observation time.
References with is_bad==True will not be considered.

Parameters
----------
filter: str
The filter of the image/exposure.
target: str
The target of the image/exposure, or the name of the field. # TODO: can we replace this with coordinates?
obs_time: datetime
The observation time of the image.
session: sqlalchemy.orm.session.Session
An optional session to use for the database query.
If not given, will use the session stored inside the
DataStore object; if there is none, will open a new session
and close it at the end of the function.

Returns
-------
ref: Image object
The reference image for this image, or None if no reference is found.

"""
with SmartSession(session) as session:
ref = session.scalars(
sa.select(Reference).where(
sa.or_(
Reference.validity_start.is_(None),
Reference.validity_start <= obs_time
),
sa.or_(
Reference.validity_end.is_(None),
Reference.validity_end >= obs_time
),
Reference.filter == filter,
Reference.target == target,
Reference.is_bad.is_(False),
)
).first()

if ref is None:
raise ValueError(f'No reference found with filter={filter}, target={target}, obs_time={obs_time}')

return ref
16 changes: 12 additions & 4 deletions models/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from sqlalchemy import orm
from sqlalchemy.dialects.postgresql import JSONB


from models.base import Base, SeeChangeBase, AutoIDMixin, SmartSession, _logger
from models.base import Base, SeeChangeBase, AutoIDMixin, SmartSession
from models.enums_and_bitflags import (
bitflag_to_string,
string_to_bitflag,
Expand All @@ -14,6 +13,7 @@
pipeline_products_inverse,
)

from util.logger import SCLogger

class Report(Base, AutoIDMixin):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you envisioning that we'll keep these reports around forever, or that we'll sometimes clean the table out?

If we think of this as something mostly to keep track of what happened with recent processing, then fine. But, if we want to use this as a record of processing from a long time ago, I can see there starting to be problems identifying the right report rows.

For instance, if we've created images from exposures for more than one provenance, there will be multiple images for a given exposure/sensor section. Finding the right report row for a given image becomes challenging in that case. If we care about that, then we need to think about this.

(My instinct is that, at least for now, we say that the reports class is effectively a transitory table, for keeping track of what's happening with "current" processing.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the reports are only a way for things like the webapp to know what is going on. We can remove them periodically.
There could be multiple reports of the same image with the same provenance (if you had an error and restarted the pipeline).
We might decide we do want to keep a subset of the reports as way to mark which exposures have been "finished" but we would want to periodically prune the reports and leave (a) the correct provenance (b) the latest run that finished successfully. Then we'd have a checklist of what was successfully processed and stored. Or you can do that with a similar, smaller table that says something like StoredInDataRelease and only tells you which exposures were done (and maybe point you to the provenances that are relevant).

"""A report on the status of analysis of one section from an Exposure.
Expand Down Expand Up @@ -105,6 +105,14 @@ class Report(Base, AutoIDMixin):
)
)

cluster_id = sa.Column(
sa.Text,
nullable=True,
doc=(
"ID of the cluster where the worker/process ran this section. "
)
)

error_step = sa.Column(
sa.Text,
nullable=True,
Expand Down Expand Up @@ -311,7 +319,7 @@ def scan_datastore(self, ds, process_step, session=None):
if self.warnings is None or self.warnings == '':
self.warnings = new_string
else:
self.warnings += ', ' + new_string
self.warnings += '\n***|***|***\n' + new_string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I understand what you meant in that comment; I was confused at first, because I thought "***" meant a warning, not literal "***". I was confused as to why three warnings separated by |, then a newline. This here makes much more sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I didn't think I made it very clear. But I think this will be good.


if exception is not None:
self.error_type = exception.__class__.__name__
Expand Down Expand Up @@ -339,7 +347,7 @@ def read_warnings(process_step, warnings_list):
for w in warnings_list:
text = f'{process_step}: {w.category} {w.message} ({w.filename}:{w.lineno})'
formatted_warnings.append(text)
_logger.warning(text) # make sure warnings also get printed to the log/on screen.
SCLogger.warning(text) # make sure warnings also get printed to the log/on screen.

warnings_list.clear() # remove all the warnings but keep the list object

Expand Down
Loading
Loading