Skip to content

Commit

Permalink
Add image.upsert to properly handle upstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
rknop committed Aug 22, 2024
1 parent 1a01217 commit b89b6ba
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 26 deletions.
49 changes: 27 additions & 22 deletions models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from sqlalchemy.dialects.postgresql import UUID as sqlUUID
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm.exc import DetachedInstanceError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.schema import CheckConstraint

from astropy.time import Time
Expand Down Expand Up @@ -470,7 +470,7 @@ def init_on_load(self):
# if this_object_session is not None: # if just loaded, should usually have a session!
# self.load_upstream_products(this_object_session)

def insert( self, verifyupstreams=False, session=None ):
def insert( self, session=None ):
"""Add the Image object to the database.
In any events, if there are no exceptions, self.id will be set upon
Expand All @@ -484,30 +484,10 @@ def insert( self, verifyupstreams=False, session=None ):
Parameters
----------
verifyupstreams: bool, default False
If the object was not inserted into the database, verify that
self._upstream_ids is consistent with the image upstreams
in the database. (self._upstream_ids=None is consistent, because
that means it will just get loaded later.)
session: SQLAlchemy Session, default None
Usually you do not want to pass this; it's mostly for other
upsert etc. methods that cascade to this.
Returns
-------
was_inserted: bool
True = the image was newly inserted. False = the object was
already in the database.
If the image was in the database and onlyinsert was True,
or if the id of the object in the database didn't match a
non-None id of self, an execption will be raised.
Returns True if the image was newly loaded, False if the image
was already in the database.
"""

with SmartSession( session ) as sess:
Expand All @@ -524,6 +504,31 @@ def insert( self, verifyupstreams=False, session=None ):
sess.commit()


def upsert( self, session=None, load_defaults=False ):
with SmartSession( session ) as sess:
SeeChangeBase.upsert( self, session=sess, load_defaults=load_defaults )

# We're just going to merrily try to set all the upstream associations and not care
# if we get already existing errors. Assume that if we get one, we'll get 'em
# all, because somebody else has already loaded all of them.
# (I hope that's right. But, in reality, it's extremely unlikely that two processes
# will be trying to upsert the same image at the same time.)

if ( self._upstream_ids is not None ) and ( len(self._upstream_ids) > 0 ):
try:
for ui in self._upstream_ids:
sess.execute( sa.text( "INSERT INTO "
"image_upstreams_association(upstream_id,downstream_id) "
"VALUES (:them,:me)" ),
{ "them": ui, "me": self.id } )
sess.commit()
except IntegrityError as ex:
if 'duplicate key value violates unique constraint "image_upstreams_association_pkey"' in str(ex):
sess.rollback()
else:
raise


def set_corners_from_header_wcs( self, wcs=None, setradec=False ):
"""Update the image's four corners (and, optionally, RA/Dec) from a WCS.
Expand Down
5 changes: 5 additions & 0 deletions models/measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,11 @@ def __init__(self, **kwargs):

self._zp = None

# These are server defaults, but we might use them
# before saving and reloading
self.best_aperture = -1
self.disqualifier_scores = {}

# manually set all properties (columns or not)
for key, value in kwargs.items():
if hasattr(self, key):
Expand Down
6 changes: 2 additions & 4 deletions pipeline/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,10 +773,8 @@ def finalize_report(self, session=None):
"""Mark the report as successful and set the finish time."""
self.report.success = True
self.report.finish_time = datetime.datetime.utcnow()
with SmartSession(session) as session:
new_report = session.merge(self.report)
session.commit()
self.report = new_report
self.report.upsert()


def get_inputs(self):
"""Get a string with the relevant inputs. """
Expand Down
30 changes: 30 additions & 0 deletions tests/models/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,36 @@ def test_image_insert( sim_image1, sim_image2, sim_image3, sim_image_uncommitted
assert len(upstrs) == 0


def test_image_upsert( sim_image1, sim_image2, sim_image_uncommitted ):
im = sim_image_uncommitted
im.filepath = im.invent_filepath()
im.md5sum = uuid.uuid4()
im.is_coadd = True

upstreamids = ( [ sim_image1.id, sim_image2.id ]
if sim_image1.mjd < sim_image2.mjd
else [ sim_image2.id, sim_image1.id ] )
im._upstream_ids = [ i for i in upstreamids ]
im.insert()

expectedupstreams = set( upstreamids )
expectedupstreams.add( im.exposure_id )

newim = Image.get_by_id( im.id )
assert set( [ i.id for i in newim.get_upstreams( only_images=True ) ] ) == expectedupstreams

# Make sure that if I upsert, it works without complaining, and the upstreams are still in place
oldfmt = newim._format
im._format = oldfmt + 1
im.upsert()
newim = Image.get_by_id( im.id )
assert set( [ i.id for i in newim.get_upstreams( only_images=True ) ] ) == expectedupstreams
assert newim._format == oldfmt + 1

im._format = None
im.upsert( load_defaults=True )
assert im._format == oldfmt + 1


def test_image_must_have_md5(sim_image_uncommitted, provenance_base):
try:
Expand Down

0 comments on commit b89b6ba

Please sign in to comment.