From b89b6bad9b6f5fba5703ff8f551ba35e52dc2bb3 Mon Sep 17 00:00:00 2001 From: Rob Knop Date: Thu, 22 Aug 2024 14:43:19 -0700 Subject: [PATCH] Add image.upsert to properly handle upstreams --- models/image.py | 49 +++++++++++++++++++++----------------- models/measurements.py | 5 ++++ pipeline/data_store.py | 6 ++--- tests/models/test_image.py | 30 +++++++++++++++++++++++ 4 files changed, 64 insertions(+), 26 deletions(-) diff --git a/models/image.py b/models/image.py index c9d750d4..4391c979 100644 --- a/models/image.py +++ b/models/image.py @@ -14,7 +14,7 @@ from sqlalchemy.dialects.postgresql import UUID as sqlUUID from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.hybrid import hybrid_property -from sqlalchemy.orm.exc import DetachedInstanceError +from sqlalchemy.exc import IntegrityError from sqlalchemy.schema import CheckConstraint from astropy.time import Time @@ -470,7 +470,7 @@ def init_on_load(self): # if this_object_session is not None: # if just loaded, should usually have a session! # self.load_upstream_products(this_object_session) - def insert( self, verifyupstreams=False, session=None ): + def insert( self, session=None ): """Add the Image object to the database. In any events, if there are no exceptions, self.id will be set upon @@ -484,30 +484,10 @@ def insert( self, verifyupstreams=False, session=None ): Parameters ---------- - verifyupstreams: bool, default False - If the object was not inserted into the database, verify that - self._upstream_ids is consistent with the image upstreams - in the database. (self._upstream_ids=None is consistent, because - that means it will just get loaded later.) - session: SQLAlchemy Session, default None Usually you do not want to pass this; it's mostly for other upsert etc. methods that cascade to this. - Returns - ------- - was_inserted: bool - True = the image was newly inserted. False = the object was - already in the database. - - If the image was in the database and onlyinsert was True, - or if the id of the object in the database didn't match a - non-None id of self, an execption will be raised. - - - Returns True if the image was newly loaded, False if the image - was already in the database. - """ with SmartSession( session ) as sess: @@ -524,6 +504,31 @@ def insert( self, verifyupstreams=False, session=None ): sess.commit() + def upsert( self, session=None, load_defaults=False ): + with SmartSession( session ) as sess: + SeeChangeBase.upsert( self, session=sess, load_defaults=load_defaults ) + + # We're just going to merrily try to set all the upstream associations and not care + # if we get already existing errors. Assume that if we get one, we'll get 'em + # all, because somebody else has already loaded all of them. + # (I hope that's right. But, in reality, it's extremely unlikely that two processes + # will be trying to upsert the same image at the same time.) + + if ( self._upstream_ids is not None ) and ( len(self._upstream_ids) > 0 ): + try: + for ui in self._upstream_ids: + sess.execute( sa.text( "INSERT INTO " + "image_upstreams_association(upstream_id,downstream_id) " + "VALUES (:them,:me)" ), + { "them": ui, "me": self.id } ) + sess.commit() + except IntegrityError as ex: + if 'duplicate key value violates unique constraint "image_upstreams_association_pkey"' in str(ex): + sess.rollback() + else: + raise + + def set_corners_from_header_wcs( self, wcs=None, setradec=False ): """Update the image's four corners (and, optionally, RA/Dec) from a WCS. diff --git a/models/measurements.py b/models/measurements.py index ca83565e..38b6c733 100644 --- a/models/measurements.py +++ b/models/measurements.py @@ -442,6 +442,11 @@ def __init__(self, **kwargs): self._zp = None + # These are server defaults, but we might use them + # before saving and reloading + self.best_aperture = -1 + self.disqualifier_scores = {} + # manually set all properties (columns or not) for key, value in kwargs.items(): if hasattr(self, key): diff --git a/pipeline/data_store.py b/pipeline/data_store.py index 57b80efb..862ea567 100644 --- a/pipeline/data_store.py +++ b/pipeline/data_store.py @@ -773,10 +773,8 @@ def finalize_report(self, session=None): """Mark the report as successful and set the finish time.""" self.report.success = True self.report.finish_time = datetime.datetime.utcnow() - with SmartSession(session) as session: - new_report = session.merge(self.report) - session.commit() - self.report = new_report + self.report.upsert() + def get_inputs(self): """Get a string with the relevant inputs. """ diff --git a/tests/models/test_image.py b/tests/models/test_image.py index c69ca630..b1e39194 100644 --- a/tests/models/test_image.py +++ b/tests/models/test_image.py @@ -131,6 +131,36 @@ def test_image_insert( sim_image1, sim_image2, sim_image3, sim_image_uncommitted assert len(upstrs) == 0 +def test_image_upsert( sim_image1, sim_image2, sim_image_uncommitted ): + im = sim_image_uncommitted + im.filepath = im.invent_filepath() + im.md5sum = uuid.uuid4() + im.is_coadd = True + + upstreamids = ( [ sim_image1.id, sim_image2.id ] + if sim_image1.mjd < sim_image2.mjd + else [ sim_image2.id, sim_image1.id ] ) + im._upstream_ids = [ i for i in upstreamids ] + im.insert() + + expectedupstreams = set( upstreamids ) + expectedupstreams.add( im.exposure_id ) + + newim = Image.get_by_id( im.id ) + assert set( [ i.id for i in newim.get_upstreams( only_images=True ) ] ) == expectedupstreams + + # Make sure that if I upsert, it works without complaining, and the upstreams are still in place + oldfmt = newim._format + im._format = oldfmt + 1 + im.upsert() + newim = Image.get_by_id( im.id ) + assert set( [ i.id for i in newim.get_upstreams( only_images=True ) ] ) == expectedupstreams + assert newim._format == oldfmt + 1 + + im._format = None + im.upsert( load_defaults=True ) + assert im._format == oldfmt + 1 + def test_image_must_have_md5(sim_image_uncommitted, provenance_base): try: