Skip to content

Commit

Permalink
whackamole
Browse files Browse the repository at this point in the history
  • Loading branch information
rknop committed Jul 18, 2024
1 parent 1cbf1b6 commit 5e75e26
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 108 deletions.
167 changes: 83 additions & 84 deletions models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ def get_downstreams(self, session=None, siblings=True):
"""
raise NotImplementedError('get_downstreams not implemented for this class')

def delete_from_database(self, session=None, commit=True, remove_downstreams=False):
"""Remove the object from the database.
def _delete_from_database(self, session=None, commit=True, remove_downstreams=False):
"""Remove the object from the database -- don't call this, call delete_from_disk_and_database.
This does not remove any associated files (if this is a FileOnDiskMixin)
and does not remove the object from the archive.
Expand Down Expand Up @@ -392,8 +392,8 @@ def delete_from_database(self, session=None, commit=True, remove_downstreams=Fal
try:
downstreams = self.get_downstreams(session=session)
for d in downstreams:
if hasattr(d, 'delete_from_database'):
if d.delete_from_database(session=session, commit=False, remove_downstreams=True):
if hasattr(d, '_delete_from_database'):
if d._delete_from_database(session=session, commit=False, remove_downstreams=True):
need_commit = True
if isinstance(d, list) and len(d) > 0 and hasattr(d[0], 'delete_list'):
d[0].delete_list(d, remove_local=False, archive=False, commit=False, session=session)
Expand All @@ -420,6 +420,85 @@ def delete_from_database(self, session=None, commit=True, remove_downstreams=Fal

return need_commit # to be able to recursively report back if there's a need to commit

def delete_from_disk_and_database(
self, session=None, commit=True, remove_folders=True, remove_downstreams=False, archive=True,
):
"""Delete any data from disk, archive and the database.
Use this to clean up an entry from all locations, as relevant
for the particular class. Will delete the object from the DB
using the given session (or using an internal session). If
using an internal session, commit must be True, to allow the
change to be committed before closing it.
This will silently continue if the file does not exist
(locally or on the archive), or if it isn't on the database,
and will attempt to delete from any locations regardless
of if it existed elsewhere or not.
TODO : this is sometimes broken if you don't pass a session.
Parameters
----------
session: sqlalchemy session
The session to use for the deletion. If None, will open a new session,
which will also close at the end of the call.
commit: bool
Whether to commit the deletion to the database.
Default is True. When session=None then commit must be True,
otherwise the session will exit without committing
(in this case the function will raise a RuntimeException).
remove_folders: bool
If True, will remove any folders on the path to the files
associated to this object, if they are empty.
remove_downstreams: bool
If True, will also remove any downstream data.
Will recursively call get_downstreams() and find any objects
that can have their data deleted from disk, archive and database.
Default is False.
archive: bool
If True, will also delete the file from the archive.
Default is True.
"""
if session is None and not commit:
raise RuntimeError("When session=None, commit must be True!")

# Recursively remove downstreams first

if remove_downstreams:
downstreams = self.get_downstreams()
for d in downstreams:
if hasattr( d, 'delete_from_disk_and_database' ):
d.delete_from_disk_and_database( session=session, commit=commit,
remove_folders=remove_folders, archive=archive,
remove_downstreams=True )

if archive and hasattr( self, "filepath" ):
if self.filepath is not None:
if self.filepath_extensions is None:
self.archive.delete( self.filepath, okifmissing=True )
else:
for ext in self.filepath_extensions:
self.archive.delete( f"{self.filepath}{ext}", okifmissing=True )

# make sure these are set to null just in case we fail
# to commit later on, we will at least know something is wrong
self.md5sum = None
self.md5sum_extensions = None


if hasattr( self, "remove_data_from_disk" ):
self.remove_data_from_disk( remove_folders=remove_folders )
# make sure these are set to null just in case we fail
# to commit later on, we will at least know something is wrong
self.filepath_extensions = None
self.filepath = None

# Don't pass remove_downstreams here because we took care of downstreams above.
SeeChangeBase._delete_from_database( self, session=session, commit=commit, remove_downstreams=False )


def to_dict(self):
"""Translate all the SQLAlchemy columns into a dictionary.
Expand Down Expand Up @@ -1316,86 +1395,6 @@ def remove_data_from_disk(self, remove_folders=True):
else:
break

def delete_from_disk_and_database(
self, session=None, commit=True, remove_folders=True, remove_downstreams=False, archive=True,
):
"""
Delete the data from disk, archive and the database.
Use this to clean up an entry from all locations.
Will delete the object from the DB using the given session
(or using an internal session).
If using an internal session, commit must be True,
to allow the change to be committed before closing it.
This will silently continue if the file does not exist
(locally or on the archive), or if it isn't on the database,
and will attempt to delete from any locations regardless
of if it existed elsewhere or not.
TODO : this is sometimes broken if you don't pass a session.
Parameters
----------
session: sqlalchemy session
The session to use for the deletion. If None, will open a new session,
which will also close at the end of the call.
commit: bool
Whether to commit the deletion to the database.
Default is True. When session=None then commit must be True,
otherwise the session will exit without committing
(in this case the function will raise a RuntimeException).
remove_folders: bool
If True, will remove any folders on the path to the files
associated to this object, if they are empty.
remove_downstreams: bool
If True, will also remove any downstream data.
Will recursively call get_downstreams() and find any objects
that can have their data deleted from disk, archive and database.
Default is False.
archive: bool
If True, will also delete the file from the archive.
Default is True.
"""
if session is None and not commit:
raise RuntimeError("When session=None, commit must be True!")

# Recursively remove downstreams first

if remove_downstreams:
downstreams = self.get_downstreams()
for d in downstreams:
if hasattr( d, 'delete_from_disk_and_database' ):
d.delete_from_disk_and_database( session=session, commit=commit,
remove_folders=remove_folders, archive=archive,
remove_downstreams=True )
else:
raise RuntimeError( f"A {self.__class__} isn't able to remove a downstream {d.__class__}" )


if archive:
if self.filepath is not None:
if self.filepath_extensions is None:
self.archive.delete( self.filepath, okifmissing=True )
else:
for ext in self.filepath_extensions:
self.archive.delete( f"{self.filepath}{ext}", okifmissing=True )

# make sure these are set to null just in case we fail
# to commit later on, we will at least know something is wrong
self.md5sum = None
self.md5sum_extensions = None


self.remove_data_from_disk( remove_folders=remove_folders )

# Don't pass remove_downstreams here because we took care of downstreams above.
SeeChangeBase.delete_from_database( self, session=session, commit=commit, remove_downstreams=False )

# make sure these are set to null just in case we fail
# to commit later on, we will at least know something is wrong
self.filepath_extensions = None
self.filepath = None


# load the default paths from the config
FileOnDiskMixin.configure_paths()
Expand Down
2 changes: 1 addition & 1 deletion models/measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def delete_list(cls, measurements_list, session=None, commit=True):

with SmartSession(session) as session:
for m in measurements_list:
m.delete_from_database(session=session, commit=False)
m.delete_from_disk_and_database(session=session, commit=False)
if commit:
session.commit()

Expand Down
65 changes: 42 additions & 23 deletions tests/fixtures/ptf.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,19 @@ def ptf_reference_images(ptf_images_factory):

yield images

with SmartSession() as session:
session.autoflush = False
for image in images:
image.exposure.delete_from_disk_and_database(commit=True)
image.delete_from_disk_and_database(commit=True, remove_downstreams=True)

for image in images:
image = session.merge(image)
image.exposure.delete_from_disk_and_database(session=session, commit=False)
image.delete_from_disk_and_database(session=session, commit=False, remove_downstreams=True)
session.commit()
# ROB REMOVE THIS COMMENT
# with SmartSession() as session:
# session.autoflush = False

# for image in images:
# image = session.merge(image)
# image.exposure.delete_from_disk_and_database(session=session, commit=False)
# image.delete_from_disk_and_database(session=session, commit=False, remove_downstreams=True)
# session.commit()


@pytest.fixture(scope='session')
Expand All @@ -303,17 +308,22 @@ def ptf_supernova_images(ptf_images_factory):

yield images

with SmartSession() as session:
session.autoflush = False
for image in images:
image.delete_from_disk_and_database(commit=True, remove_downstreams=True)
image.exposure.delete_from_disk_and_database(session=session, commit=True)

for image in images:
image = session.merge(image)
# first delete the image and all it's products and the associated data (locally and on archive)
image.delete_from_disk_and_database(session=session, commit=False, remove_downstreams=True)
# only then delete the exposure, so it doesn't cascade delete the image and prevent deleting products
image.exposure.delete_from_disk_and_database(session=session, commit=False)
# ROB REMOVE THIS COMMENT
# with SmartSession() as session:
# session.autoflush = False

session.commit()
# for image in images:
# image = session.merge(image)
# # first delete the image and all it's products and the associated data (locally and on archive)
# image.delete_from_disk_and_database(session=session, commit=False, remove_downstreams=True)
# # only then delete the exposure, so it doesn't cascade delete the image and prevent deleting products
# image.exposure.delete_from_disk_and_database(session=session, commit=False)

# session.commit()


# conditionally call the ptf_reference_images fixture if cache is not there:
Expand Down Expand Up @@ -393,16 +403,26 @@ def ptf_aligned_images(request, ptf_cache_dir, data_dir, code_version):

# must delete these here, as the cleanup for the getfixturevalue() happens after pytest_sessionfinish!
if 'ptf_reference_images' in locals():
with SmartSession() as session, warnings.catch_warnings():

with warnings.catch_warnings():
warnings.filterwarnings(
action='ignore',
message=r'.*DELETE statement on table .* expected to delete \d* row\(s\).*',
)
for image in ptf_reference_images:
# image = merge( session, image )
image.exposure.delete_from_disk_and_database(commit=False, session=session, remove_downstreams=True)
# image.delete_from_disk_and_database(commit=False, session=session, remove_downstreams=True)
session.commit()
image.exposure.delete_from_disk_and_database( commit=True, remove_downstreams=True )

# ROB REMOVE THIS COMMENT
# with SmartSession() as session, warnings.catch_warnings():
# warnings.filterwarnings(
# action='ignore',
# message=r'.*DELETE statement on table .* expected to delete \d* row\(s\).*',
# )
# for image in ptf_reference_images:
# image = merge( session, image )
# image.exposure.delete_from_disk_and_database(commit=False, session=session, remove_downstreams=True)
# # image.delete_from_disk_and_database(commit=False, session=session, remove_downstreams=True)
# session.commit()


@pytest.fixture
Expand Down Expand Up @@ -523,9 +543,8 @@ def ptf_ref(

yield ref

coadd_image.delete_from_disk_and_database(commit=True, remove_downstreams=True)
with SmartSession() as session:
coadd_image = coadd_image.merge_all(session=session)
coadd_image.delete_from_disk_and_database(commit=True, session=session, remove_downstreams=True)
ref_in_db = session.scalars(sa.select(Reference).where(Reference.id == ref.id)).first()
assert ref_in_db is None # should have been deleted by cascade when image is deleted

Expand Down

0 comments on commit 5e75e26

Please sign in to comment.