Skip to content

Commit

Permalink
Fix logic error re: downstreams in FileOnDiskMixin.delete_from_disk_a…
Browse files Browse the repository at this point in the history
…nd_database; add file cleanup to a test
  • Loading branch information
rknop committed Jul 18, 2024
1 parent dc16aad commit 3ff7848
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 66 deletions.
92 changes: 29 additions & 63 deletions models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,29 +1287,20 @@ def save(self, data, extension=None, overwrite=True, exists_ok=True, verify_md5=
else:
self.md5sum = remmd5

def remove_data_from_disk(self, remove_folders=True, remove_downstreams=False):
def remove_data_from_disk(self, remove_folders=True):
"""Delete the data from local disk, if it exists.
If remove_folders=True, will also remove any folders
if they are empty after the deletion.
Use remove_downstreams=True to also remove any
downstream data (e.g., for an Image, that would be the
data for the SourceLists and PSFs that depend on this Image).
This function will not remove database rows or archive files,
only cleanup local storage for this object and its downstreams.
To remove both the files and the database entry, use
delete_from_disk_and_database() instead.
delete_from_disk_and_database() instead. That one
also supports removing downstreams.
Parameters
----------
remove_folders: bool
If True, will remove any folders on the path to the files
associated to this object, if they are empty.
remove_downstreams: bool
If True, will also remove any downstream data.
Will recursively call get_downstreams() and find any objects
that have remove_data_from_disk() implemented, and call it.
Default is False.
"""
if self.filepath is not None:
# get the filepath, but don't check if the file exists!
Expand All @@ -1325,54 +1316,6 @@ def remove_data_from_disk(self, remove_folders=True, remove_downstreams=False):
else:
break

if remove_downstreams:
try:
downstreams = self.get_downstreams()
for d in downstreams:
if hasattr(d, 'remove_data_from_disk'):
d.remove_data_from_disk(remove_folders=remove_folders, remove_downstreams=True)
if isinstance(d, list) and len(d) > 0 and hasattr(d[0], 'delete_list'):
d[0].delete_list(d, remove_local=True, archive=False, database=False)
except NotImplementedError as e:
pass # if this object does not implement get_downstreams, it is ok

def delete_from_archive(self, remove_downstreams=False):
"""Delete the file from the archive, if it exists.
This will not remove the file from local disk, nor
from the database. Use delete_from_disk_and_database()
to do that.
Parameters
----------
remove_downstreams: bool
If True, will also remove any downstream data.
Will recursively call get_downstreams() and find any objects
that have delete_from_archive() implemented, and call it.
Default is False.
"""
if remove_downstreams:
try:
downstreams = self.get_downstreams()
for d in downstreams:
if hasattr(d, 'delete_from_archive'):
d.delete_from_archive(remove_downstreams=True) # TODO: do we need remove_folders?
if isinstance(d, list) and len(d) > 0 and hasattr(d[0], 'delete_list'):
d[0].delete_list(d, remove_local=False, archive=True, database=False)
except NotImplementedError as e:
pass # if this object does not implement get_downstreams, it is ok

if self.filepath is not None:
if self.filepath_extensions is None:
self.archive.delete( self.filepath, okifmissing=True )
else:
for ext in self.filepath_extensions:
self.archive.delete( f"{self.filepath}{ext}", okifmissing=True )

# make sure these are set to null just in case we fail
# to commit later on, we will at least know something is wrong
self.md5sum = None
self.md5sum_extensions = None

def delete_from_disk_and_database(
self, session=None, commit=True, remove_folders=True, remove_downstreams=False, archive=True,
):
Expand Down Expand Up @@ -1416,12 +1359,35 @@ def delete_from_disk_and_database(
if session is None and not commit:
raise RuntimeError("When session=None, commit must be True!")

SeeChangeBase.delete_from_database(self, session=session, commit=commit, remove_downstreams=remove_downstreams)
# Recursively remove downstreams first

if remove_downstreams:
downstreams = self.get_downstreams()
for d in downstreams:
if hasattr( d, 'delete_from_disk_and_database' ):
d.delete_from_disk_and_database( session=session, commit=commit,
remove_folders=remove_folders, archive=archive,
remove_downstreams=True )

self.remove_data_from_disk(remove_folders=remove_folders, remove_downstreams=remove_downstreams)

if archive:
self.delete_from_archive(remove_downstreams=remove_downstreams)
if self.filepath is not None:
if self.filepath_extensions is None:
self.archive.delete( self.filepath, okifmissing=True )
else:
for ext in self.filepath_extensions:
self.archive.delete( f"{self.filepath}{ext}", okifmissing=True )

# make sure these are set to null just in case we fail
# to commit later on, we will at least know something is wrong
self.md5sum = None
self.md5sum_extensions = None


self.remove_data_from_disk( remove_folders=remove_folders )

# Don't pass remove_downstreams here because we took care of downstreams above.
SeeChangeBase.delete_from_database( self, session=session, commit=commit, remove_downstreams=False )

# make sure these are set to null just in case we fail
# to commit later on, we will at least know something is wrong
Expand Down
1 change: 1 addition & 0 deletions tests/fixtures/decam.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def decam_default_calibrators(cache_dir, data_dir):
imagestonuke.add( info[ f'{filetype}_fileid' ] )
else:
datafilestonuke.add( info[ f'{filetype}_fileid' ] )

for imid in imagestonuke:
im = session.scalars( sa.select(Image).where(Image.id == imid )).first()
im.delete_from_disk_and_database( session=session, commit=False )
Expand Down
28 changes: 25 additions & 3 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from models.base import SmartSession, FileOnDiskMixin
from models.provenance import Provenance
from models.image import Image, image_upstreams_association_table
from models.calibratorfile import CalibratorFile
from models.source_list import SourceList
from models.psf import PSF
from models.world_coordinates import WorldCoordinates
Expand Down Expand Up @@ -191,18 +192,39 @@ def check_override( new_values_dict, pars ):

def test_running_without_reference(decam_exposure, decam_refset, decam_default_calibrators, pipeline_for_tests):
p = pipeline_for_tests
p.subtractor.pars.refset = 'test_refset_decam' # pointing out this ref set doesn't mean we have an actual reference
p.subtractor.pars.refset = 'test_refset_decam' # choosing ref set doesn't mean we have an actual reference
p.pars.save_before_subtraction = True # need this so images get saved even though it crashes on "no reference"

with pytest.raises(ValueError, match='Cannot find a reference image corresponding to.*'):
# Use the 'N1' sensor section since that's not one of the ones used in the regular
# DECam fixtures, so we don't have to worry about any session scope fixtures that
# load refererences. (Though I don't think there are any.)
ds = p.run(decam_exposure, 'N1')
ds.reraise()

# make sure the data is saved
# make sure the data is saved, but then clean it up
with SmartSession() as session:
im = session.scalars(sa.select(Image).where(Image.id == ds.image.id)).first()
assert im is not None

im.delete_from_disk_and_database( remove_downstreams=True, session=session )

# The N1 decam calibrator files will have been automatically added
# in the pipeline run above; need to clean them up. However,
# *don't* remove the linearity calibrator file, because that will
# have been added in session fixtures used by other tests. (Tests
# and automatic cleanup become very fraught when you have automatic
# loading of stuff....)

cfs = ( session.query( CalibratorFile )
.filter( CalibratorFile.instrument == 'DECam' )
.filter( CalibratorFile.sensor_section == 'N1' )
.filter( CalibratorFile.image_id != None ) )
imdel = [ c.image_id for c in cfs ]
imgtodel = session.query( Image ).filter( Image.id.in_( imdel ) )
for i in imgtodel:
i.delete_from_disk_and_database( session=session )

session.commit()

def test_data_flow(decam_exposure, decam_reference, decam_default_calibrators, pipeline_for_tests, archive):
"""Test that the pipeline runs end-to-end."""
Expand Down

0 comments on commit 3ff7848

Please sign in to comment.