diff --git a/models/background.py b/models/background.py index 95cb8eaa..2e0c1a63 100644 --- a/models/background.py +++ b/models/background.py @@ -169,6 +169,7 @@ def _get_inverse_badness(self): def __init__( self, *args, **kwargs ): FileOnDiskMixin.__init__( self, **kwargs ) + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__( self ) self._image_shape = None self._counts_data = None diff --git a/models/base.py b/models/base.py index f1a96801..f7cc1d70 100644 --- a/models/base.py +++ b/models/base.py @@ -1697,6 +1697,10 @@ def append_badness(self, value): doc='Free text comment about this data product, e.g., why it is bad. ' ) + def __init__(self): + self._bitflag = 0 + self._upstream_bitflag = 0 + def update_downstream_badness(self, session=None, commit=True, siblings=True): """Send a recursive command to update all downstream objects that have bitflags. diff --git a/models/cutouts.py b/models/cutouts.py index b35e153e..03f9ad1e 100644 --- a/models/cutouts.py +++ b/models/cutouts.py @@ -126,6 +126,7 @@ def ref_image(self): def __init__(self, *args, **kwargs): FileOnDiskMixin.__init__(self, *args, **kwargs) + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__(self) # don't pass kwargs as they could contain non-column key-values self.format = 'hdf5' # the default should match the column-defined default above! diff --git a/models/exposure.py b/models/exposure.py index 052d3269..fa4d2fe3 100644 --- a/models/exposure.py +++ b/models/exposure.py @@ -340,6 +340,7 @@ def __init__(self, current_file=None, invent_filepath=True, **kwargs): """ FileOnDiskMixin.__init__(self, **kwargs) + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__(self) # don't pass kwargs as they could contain non-column key-values self._data = None # the underlying image data for each section diff --git a/models/image.py b/models/image.py index c2b09e8c..cf61974b 100644 --- a/models/image.py +++ b/models/image.py @@ -455,6 +455,7 @@ def _get_inverse_badness(self): def __init__(self, *args, **kwargs): FileOnDiskMixin.__init__(self, *args, **kwargs) + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__(self) # don't pass kwargs as they could contain non-column key-values self.raw_data = None # the raw exposure pixels (2D float or uint16 or whatever) not saved to disk! @@ -472,6 +473,7 @@ def __init__(self, *args, **kwargs): # additional data products that could go with the Image self.sources = None # the sources extracted from this Image (optionally loaded) self.psf = None # the point-spread-function object (optionally loaded) + self.bg = None # the background object (optionally loaded) self.wcs = None # the WorldCoordinates object (optionally loaded) self.zp = None # the zero-point object (optionally loaded) @@ -513,6 +515,7 @@ def init_on_load(self): self.sources = None self.psf = None + self.bg = None self.wcs = None self.zp = None @@ -581,10 +584,16 @@ def merge_all(self, session): self.psf.image_id = new_image.id self.psf.provenance_id = self.psf.provenance.id if self.psf.provenance is not None else None new_image.psf = self.psf.safe_merge(session=session) - if new_image.psf._bitflag is None: # I don't know why this isn't set to 0 using the default - new_image.psf._bitflag = 0 - if new_image.psf._upstream_bitflag is None: # I don't know why this isn't set to 0 using the default - new_image.psf._upstream_bitflag = 0 + # if new_image.psf._bitflag is None: # I don't know why this isn't set to 0 using the default + # new_image.psf._bitflag = 0 + # if new_image.psf._upstream_bitflag is None: # I don't know why this isn't set to 0 using the default + # new_image.psf._upstream_bitflag = 0 + + if self.bg is not None: + self.bg.image = new_image + self.bg.image_id = new_image.id + self.bg.provenance_id = self.bg.provenance.id if self.bg.provenance is not None else None + new_image.bg = self.bg.safe_merge(session=session) # take care of the upstream images and their products # if sa.inspect(self).detached: # self can't load the images, but new_image has them @@ -1516,8 +1525,8 @@ def free( self, free_derived_products=True, free_aligned=True, only_free=None ): Parameters ---------- free_derived_products: bool, default True - If True, will also call free on self.sources, self.psf, and - self.wcs + If True, will also call free on self.sources, self.psf, + self.bg and self.wcs. free_aligned: bool, default True Will call free() on each of the aligned images referenced @@ -1547,11 +1556,11 @@ def free( self, free_derived_products=True, free_aligned=True, only_free=None ): self.sources.free() if self.psf is not None: self.psf.free() - # This implementation in WCS should be done after PR167 is done. - # Not a big deal if it's not done, because WCSes will not use - # very much memory - # if self.wcs is not None: - # self.wcs.free() + if self.bg is not None: + self.bg.free() + if self.wcs is not None: + self.wcs.free() + if free_aligned: if self._aligned_images is not None: for alim in self._aligned_images: @@ -1665,6 +1674,9 @@ def load_upstream_products(self, session=None): if im.psf is None or im.psf.provenance_id not in prov_ids: need_to_load = True break + if im.bg is None or im.bg.provenance_id not in prov_ids: + need_to_load = True + break if im.wcs is None or im.wcs.provenance_id not in prov_ids: need_to_load = True break @@ -1677,6 +1689,7 @@ def load_upstream_products(self, session=None): from models.source_list import SourceList from models.psf import PSF + from models.background import Background from models.world_coordinates import WorldCoordinates from models.zero_point import ZeroPoint @@ -1704,6 +1717,13 @@ def load_upstream_products(self, session=None): ) ).all() + bg_results = session.scalars( + sa.select(Background).where( + Background.image_id.in_(im_ids), + Background.provenance_id.in_(prov_ids), + ) + ).all() + wcs_results = session.scalars( sa.select(WorldCoordinates).where( WorldCoordinates.sources_id.in_(sources_ids), @@ -1735,6 +1755,14 @@ def load_upstream_products(self, session=None): elif len(psfs) == 1: im.psf = psfs[0] + bgs = [b for b in bg_results if b.image_id == im.id] # only get the bgs for this image + if len(bgs) > 1: + raise ValueError( + f"Image {im.id} has more than one Background matching upstream provenance." + ) + elif len(bgs) == 1: + im.bg = bgs[0] + if im.sources is not None: wcses = [w for w in wcs_results if w.sources_id == im.sources.id] # the wcses for this image if len(wcses) > 1: @@ -1791,6 +1819,8 @@ def get_upstreams(self, session=None): upstreams.append(im.sources) if im.psf is not None: upstreams.append(im.psf) + if im.bg is not None: + upstreams.append(im.bg) if im.wcs is not None: upstreams.append(im.wcs) if im.zp is not None: @@ -1803,17 +1833,12 @@ def get_downstreams(self, session=None, siblings=False): # avoids circular import from models.source_list import SourceList from models.psf import PSF + from models.background import Background from models.world_coordinates import WorldCoordinates from models.zero_point import ZeroPoint downstreams = [] with SmartSession(session) as session: - # get all psfs that are related to this image (regardless of provenance) - psfs = session.scalars(sa.select(PSF).where(PSF.image_id == self.id)).all() - downstreams += psfs - if self.psf is not None and self.psf not in psfs: # if not in the session, could be duplicate! - downstreams.append(self.psf) - # get all source lists that are related to this image (regardless of provenance) sources = session.scalars( sa.select(SourceList).where(SourceList.image_id == self.id) @@ -1822,6 +1847,17 @@ def get_downstreams(self, session=None, siblings=False): if self.sources is not None and self.sources not in sources: # if not in the session, could be duplicate! downstreams.append(self.sources) + # get all psfs that are related to this image (regardless of provenance) + psfs = session.scalars(sa.select(PSF).where(PSF.image_id == self.id)).all() + downstreams += psfs + if self.psf is not None and self.psf not in psfs: # if not in the session, could be duplicate! + downstreams.append(self.psf) + + bgs = session.scalars(sa.select(Background).where(Background.image_id == self.id)).all() + downstreams += bgs + if self.bg is not None and self.bg not in bgs: # if not in the session, could be duplicate! + downstreams.append(self.bg) + wcses = [] zps = [] for s in sources: @@ -1915,17 +1951,6 @@ def weight(self): def weight(self, value): self._weight = value - @property - def background(self): - """An estimate for the background flux (2D float array). """ - if self._data is None and self.filepath is not None: - self.load() - return self._background - - @background.setter - def background(self, value): - self._background = value - @property def score(self): """The image after filtering with the PSF and normalizing to S/N units (2D float array). """ @@ -1986,6 +2011,20 @@ def nanscore(self): def nanscore(self, value): self._nanscore = value + @property + def data_bg(self): + """The image data, after subtracting the background. If no Background object is loaded, will raise. """ + if self.bg is None: + raise ValueError("No background is loaded for this image.") + return self.data - self.bg.counts + + @property + def nandata_bg(self): + """The image data, after subtracting the background and masking with NaNs wherever the flag is not zero. """ + if self.bg is None: + raise ValueError("No background is loaded for this image.") + return self.nandata - self.bg.counts + def show(self, **kwargs): """ Display the image using the matplotlib imshow function. diff --git a/models/measurements.py b/models/measurements.py index 547660f6..9ad12681 100644 --- a/models/measurements.py +++ b/models/measurements.py @@ -291,6 +291,7 @@ def instrument_object(self): def __init__(self, **kwargs): SeeChangeBase.__init__(self) # don't pass kwargs as they could contain non-column key-values + HasBitFlagBadness.__init__(self) self._cutouts_list_index = None # helper (transient) attribute that helps find the right cutouts in a list # manually set all properties (columns or not) diff --git a/models/psf.py b/models/psf.py index cc6ba0fc..3c4473dd 100644 --- a/models/psf.py +++ b/models/psf.py @@ -167,6 +167,7 @@ def _get_inverse_badness(self): def __init__( self, *args, **kwargs ): FileOnDiskMixin.__init__( self, **kwargs ) + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__( self ) self._header = None self._data = None @@ -305,7 +306,7 @@ def load( self, download=True, always_verify_md5=False, psfpath=None, psfxmlpath self._info = ifp.read() def free( self ): - """Free loaded world coordinates memory. + """Free loaded PSF memory. Wipe out the data, info, and header fields, freeing memory. Depends on python garbage collection, so if there are other diff --git a/models/source_list.py b/models/source_list.py index 49dd718a..fb0c3009 100644 --- a/models/source_list.py +++ b/models/source_list.py @@ -142,6 +142,7 @@ def _get_inverse_badness(self): def __init__(self, *args, **kwargs): FileOnDiskMixin.__init__(self, *args, **kwargs) + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__(self) # don't pass kwargs as they could contain non-column key-values self._data = None diff --git a/models/world_coordinates.py b/models/world_coordinates.py index d4676115..76f5a828 100644 --- a/models/world_coordinates.py +++ b/models/world_coordinates.py @@ -75,6 +75,7 @@ def wcs( self, value ): def __init__(self, *args, **kwargs): FileOnDiskMixin.__init__( self, **kwargs ) + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__( self ) self._wcs = None @@ -223,4 +224,12 @@ def load( self, download=True, always_verify_md5=False, txtpath=None ): with open( txtpath ) as ifp: headertxt = ifp.read() self.wcs = WCS( fits.Header.fromstring( headertxt , sep='\\n' )) - + + def free(self): + """Free loaded world coordinates memory. + + Wipe out the _wcs text field, freeing a small amount of memory. + Depends on python garbage collection, so if there are other + references to those objects, the memory won't actually be freed. + """ + self._wcs = None diff --git a/models/zero_point.py b/models/zero_point.py index 96a40fe8..215e2d03 100644 --- a/models/zero_point.py +++ b/models/zero_point.py @@ -94,6 +94,7 @@ class ZeroPoint(Base, AutoIDMixin, HasBitFlagBadness): ) def __init__(self, *args, **kwargs): + HasBitFlagBadness.__init__(self) SeeChangeBase.__init__(self) # don't pass kwargs as they could contain non-column key-values # manually set all properties (columns or not) diff --git a/pipeline/data_store.py b/pipeline/data_store.py index 6f185526..3c6aeabe 100644 --- a/pipeline/data_store.py +++ b/pipeline/data_store.py @@ -271,6 +271,7 @@ def __init__(self, *args, **kwargs): self.image = None # single image from one sensor section self.sources = None # extracted sources (a SourceList object, basically a catalog) self.psf = None # psf determined from the extracted sources + self.bg = None # background from the extraction phase self.wcs = None # astrometric solution self.zp = None # photometric calibration self.reference = None # the Reference object needed to make subtractions @@ -749,8 +750,6 @@ def get_psf(self, provenance=None, session=None): the current code version and critical parameters. If none is given, uses the appropriate provenance from the prov_tree dictionary. - If prov_tree is None, will use the latest provenance - for the "extraction" process. Usually the provenance is not given when the psf is loaded in order to be used as an upstream of the current process. session: sqlalchemy.orm.session.Session @@ -791,6 +790,60 @@ def get_psf(self, provenance=None, session=None): return self.psf + def get_bg(self, provenance=None, session=None): + """Get a Background object, either from memory or from the database. + + Parameters + ---------- + provenance: Provenance object + The provenance to use for the background. + This provenance should be consistent with + the current code version and critical parameters. + If none is given, uses the appropriate provenance + from the prov_tree dictionary. + Usually the provenance is not given when the background is loaded + in order to be used as an upstream of the current process. + session: sqlalchemy.orm.session.Session + An optional session to use for the database query. + If not given, will use the session stored inside the + DataStore object; if there is none, will open a new session + and close it at the end of the function. + + Returns + ------- + bg: Background object + The background object for this image, + or None if no matching background is found. + + """ + process_name = 'extraction' + if provenance is None: # try to get the provenance from the prov_tree + provenance = self._get_provenance_for_an_upstream(process_name, session) + + # if psf exists in memory, check the provenance is ok + if self.bg is not None: + # make sure the psf object has the correct provenance + if self.bg.provenance is None: + raise ValueError('Background has no provenance!') + if provenance is not None and provenance.id != self.bg.provenance.id: + self.bg = None + + # TODO: do we need to test the b/g Provenance has upstreams consistent with self.image.provenance? + + # not in memory, look for it on the DB + if self.bg is None: + with SmartSession(session, self.session) as session: + image = self.get_image(session=session) + if image is not None: + self.bg = session.scalars( + sa.select(Background).where( + Background.image_id == image.id, + Background.provenance.has(id=provenance.id), + ) + ).first() + + return self.bg + def get_wcs(self, provenance=None, session=None): """Get an astrometric solution in the form of a WorldCoordinates object, from memory or from the database. @@ -1485,8 +1538,9 @@ def save_and_commit(self, exists_ok=False, overwrite=True, no_archive=False, if self.image_id is None and self.image is not None: self.image_id = self.image.id - self.psf = self.image.psf self.sources = self.image.sources + self.psf = self.image.psf + self.bg = self.image.bg self.wcs = self.image.wcs self.zp = self.image.zp diff --git a/tests/fixtures/pipeline_objects.py b/tests/fixtures/pipeline_objects.py index 93be2623..13a3a7b3 100644 --- a/tests/fixtures/pipeline_objects.py +++ b/tests/fixtures/pipeline_objects.py @@ -483,11 +483,11 @@ def make_datastore( else: raise e # if any other error comes up, raise it - ############# extraction to create sources / PSF / WCS / ZP ############# + ############# extraction to create sources / PSF / BG / WCS / ZP ############# if ( ( not os.getenv( "LIMIT_CACHE_USAGE" ) ) and ( cache_dir is not None ) and ( cache_base_name is not None ) ): - # try to get the SourceList, PSF, WCS and ZP from cache + # try to get the SourceList, PSF, BG, WCS and ZP from cache prov = Provenance( code_version=code_version, process='extraction', @@ -553,6 +553,34 @@ def make_datastore( # make sure this is saved to the archive as well ds.psf.save(verify_md5=False, overwrite=True) + # try to get the background from cache + cache_name = f'{cache_base_name}.bg_{prov.id[:6]}.fits.json' + bg_cache_path = os.path.join(cache_dir, cache_name) + if os.path.isfile(bg_cache_path): + SCLogger.debug('loading background from cache. ') + ds.bg = copy_from_cache(Image, cache_dir, cache_name) + + # if BG already exists on the database, use that instead of this one + existing = session.scalars( + sa.select(Image).where(Image.filepath == ds.bg.filepath) + ).first() + if existing is not None: + # overwrite the existing row data using the JSON cache file + for key in sa.inspect(ds.bg).mapper.columns.keys(): + value = getattr(ds.bg, key) + if ( + key not in ['id', 'image_id', 'created_at', 'modified'] and + value is not None + ): + setattr(existing, key, value) + ds.bg = existing + + ds.bg.provenance = prov + ds.bg.image = ds.image + + # make sure this is saved to the archive as well + ds.bg.save(verify_md5=False, overwrite=True) + ############## astro_cal to create wcs ################ cache_name = f'{cache_base_name}.wcs_{prov.id[:6]}.txt.json' wcs_cache_path = os.path.join(cache_dir, cache_name) @@ -589,7 +617,6 @@ def make_datastore( ds.wcs.save(verify_md5=False, overwrite=True) ########### photo_cal to create zero point ############ - cache_name = cache_base_name + '.zp.json' zp_cache_path = os.path.join(cache_dir, cache_name) if os.path.isfile(zp_cache_path): @@ -621,11 +648,12 @@ def make_datastore( ds.zp.provenance = prov ds.zp.sources = ds.sources - if ds.sources is None or ds.psf is None or ds.wcs is None or ds.zp is None: # redo extraction + # if any data product is missing, must redo the extraction step + if ds.sources is None or ds.psf is None or ds.bg is None or ds.wcs is None or ds.zp is None: SCLogger.debug('extracting sources. ') ds = p.extractor.run(ds, session) - ds.sources.save() + ds.sources.save(overwrite=True) if cache_dir is not None and cache_base_name is not None: output_path = copy_to_cache(ds.sources, cache_dir) if cache_dir is not None and cache_base_name is not None and output_path != sources_cache_path: @@ -637,6 +665,12 @@ def make_datastore( if cache_dir is not None and cache_base_name is not None and output_path != psf_cache_path: warnings.warn(f'cache path {psf_cache_path} does not match output path {output_path}') + ds.bg.save(overwrite=True) + if cache_dir is not None and cache_base_name is not None: + output_path = copy_to_cache(ds.bg, cache_dir) + if cache_dir is not None and cache_base_name is not None and output_path != psf_cache_path: + warnings.warn(f'cache path {bg_cache_path} does not match output path {output_path}') + SCLogger.debug('Running astrometric calibration') ds = p.astrometor.run(ds, session) ds.wcs.save() @@ -726,9 +760,7 @@ def make_datastore( parameters=prov_aligned_ref.parameters, upstreams=[ ds.image.provenance, - ds.sources.provenance, # this also includes the PSF's provenance - ds.wcs.provenance, - ds.zp.provenance, + ds.sources.provenance, # this also includes provs for PSF, BG, WCS, ZP ], process='alignment', is_testing=True,