From f813ad2d5fee4ceecf147b15a38f96a19ed4a139 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 25 Jan 2024 11:56:54 -0800 Subject: [PATCH 1/7] Move MWI test environment patches from setUpClass to setUp. This change makes it possible for the patches to refer to individual test cases' environments, in particular their temporary directories. --- tests/test_middleware_interface.py | 46 +++++++++--------------------- 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 08f6ccc0..c5af5908 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -116,23 +116,14 @@ def fake_file_data(filename, dimensions, instrument, visit): class MiddlewareInterfaceTest(unittest.TestCase): """Test the MiddlewareInterface class with faked data. """ - @classmethod - def setUpClass(cls): - cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"URL_APDB": "postgresql://localhost/postgres", - "K_REVISION": "prompt-proto-service-042", - }) - cls.env_patcher.start() - - super().setUpClass() - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - - cls.env_patcher.stop() - def setUp(self): + env_patcher = unittest.mock.patch.dict(os.environ, + {"URL_APDB": "postgresql://localhost/postgres", + "K_REVISION": "prompt-proto-service-042", + }) + env_patcher.start() + self.addCleanup(env_patcher.stop) + self.data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.central_repo = os.path.join(self.data_dir, "central_repo") self.umbrella = f"{instname}/defaults" @@ -853,22 +844,6 @@ class MiddlewareInterfaceWriteableTest(unittest.TestCase): setup takes longer than for MiddlewareInterfaceTest, so it should be used sparingly. """ - @classmethod - def setUpClass(cls): - super().setUpClass() - - cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"URL_APDB": "postgresql://localhost/postgres", - "K_REVISION": "prompt-proto-service-042", - }) - cls.env_patcher.start() - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - - cls.env_patcher.stop() - def _create_copied_repo(self): """Create a fresh repository that's a copy of the test data. @@ -894,6 +869,13 @@ def _create_copied_repo(self): central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto") def setUp(self): + env_patcher = unittest.mock.patch.dict(os.environ, + {"URL_APDB": "postgresql://localhost/postgres", + "K_REVISION": "prompt-proto-service-042", + }) + env_patcher.start() + self.addCleanup(env_patcher.stop) + self._create_copied_repo() central_butler = Butler(self.central_repo.name, instrument=instname, From 804ecf5dc44de9fae901144675c1a026dff6c8c5 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 25 Jan 2024 12:03:53 -0800 Subject: [PATCH 2/7] Set mock URL_APDB to sqlite, not postgres. Some execution paths construct a real ApdbSql object, and the object may attempt to contact the database it represents. Since neither our development and build environments don't have a local PostgreSQL server, use a temporary sqlite DB instead. --- tests/test_middleware_interface.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index c5af5908..251b4700 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -117,13 +117,6 @@ class MiddlewareInterfaceTest(unittest.TestCase): """Test the MiddlewareInterface class with faked data. """ def setUp(self): - env_patcher = unittest.mock.patch.dict(os.environ, - {"URL_APDB": "postgresql://localhost/postgres", - "K_REVISION": "prompt-proto-service-042", - }) - env_patcher.start() - self.addCleanup(env_patcher.stop) - self.data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.central_repo = os.path.join(self.data_dir, "central_repo") self.umbrella = f"{instname}/defaults" @@ -134,6 +127,13 @@ def setUp(self): self.input_data = os.path.join(self.data_dir, "input_data") self.local_repo = make_local_repo(tempfile.gettempdir(), self.central_butler, instname) + env_patcher = unittest.mock.patch.dict(os.environ, + {"URL_APDB": f"sqlite:///{self.local_repo.name}/apdb.db", + "K_REVISION": "prompt-proto-service-042", + }) + env_patcher.start() + self.addCleanup(env_patcher.stop) + # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 dec = -4.950050405424033 @@ -869,13 +869,6 @@ def _create_copied_repo(self): central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto") def setUp(self): - env_patcher = unittest.mock.patch.dict(os.environ, - {"URL_APDB": "postgresql://localhost/postgres", - "K_REVISION": "prompt-proto-service-042", - }) - env_patcher.start() - self.addCleanup(env_patcher.stop) - self._create_copied_repo() central_butler = Butler(self.central_repo.name, instrument=instname, @@ -892,6 +885,13 @@ def setUp(self): self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo) self.addCleanup(tempfile.TemporaryDirectory.cleanup, second_local_repo) + env_patcher = unittest.mock.patch.dict(os.environ, + {"URL_APDB": f"sqlite:///{local_repo.name}/apdb.db", + "K_REVISION": "prompt-proto-service-042", + }) + env_patcher.start() + self.addCleanup(env_patcher.stop) + # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 dec = -4.950050405424033 From 249038026b5947e199d5dcec2b8de1666909a7d6 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 24 Jan 2024 15:48:41 -0800 Subject: [PATCH 3/7] Use astropy.time instead of datetime in calib lookups. astropy.time lets us handle TAI natively, and there's no risk of accidentally giving an unaware datetime. This will also give us more precision when Visit.startTime arrives. --- python/activator/middleware_interface.py | 18 +++++++----------- tests/test_middleware_interface.py | 11 ++++++----- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index e8c0a73d..66c72043 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -523,9 +523,7 @@ def _export_calibs(self, detector_id, filter): # supported in queryDatasets yet. calib_where = f"detector={detector_id} and physical_filter='{filter}'" # TAI observation start time should be used for calib validity range. - # private_sndStamp is the visit publication time in TAI, not UTC, - # but difference shouldn't matter. - calib_date = datetime.datetime.fromtimestamp(self.visit.private_sndStamp, tz=datetime.timezone.utc) + calib_date = astropy.time.Time(self.visit.private_sndStamp, format="unix_tai") # TODO: we can't use findFirst=True yet because findFirst query # in CALIBRATION-type collection is not supported currently. calibs = set(_filter_datasets( @@ -1062,7 +1060,7 @@ class _MissingDatasetError(RuntimeError): def _filter_datasets(src_repo: Butler, dest_repo: Butler, *args, - calib_date: datetime.datetime | None = None, + calib_date: astropy.time.Time | None = None, **kwargs) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]: """Identify datasets in a source repository, filtering out those already present in a destination. @@ -1076,10 +1074,9 @@ def _filter_datasets(src_repo: Butler, The repository in which a dataset must be present. dest_repo : `lsst.daf.butler.Butler` The repository in which a dataset must not be present. - calib_date : `datetime.datetime`, optional + calib_date : `astropy.time.Time`, optional If provided, also filter anything other than calibs valid at ``calib_date`` and check that at least one valid calib was found. - Any ``datetime`` object must be aware. *args, **kwargs Parameters for describing the dataset query. They have the same meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`. @@ -1176,7 +1173,7 @@ def _remove_from_chain(butler: Butler, chain: str, old_collections: collections. def _filter_calibs_by_date(butler: Butler, collections: typing.Any, unfiltered_calibs: collections.abc.Collection[lsst.daf.butler.DatasetRef], - date: datetime.datetime + date: astropy.time.Time ) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]: """Trim a set of calib datasets to those that are valid at a particular time. @@ -1189,9 +1186,8 @@ def _filter_calibs_by_date(butler: Butler, collections, to query for validity data. unfiltered_calibs : collection [`lsst.daf.butler.DatasetRef`] The calibs to be filtered by validity. May be empty. - date : `datetime.datetime` - The time at which the calibs must be valid. Must be an - aware ``datetime``. + date : `astropy.time.Time` + The time at which the calibs must be valid. Returns ------- @@ -1203,7 +1199,7 @@ def _filter_calibs_by_date(butler: Butler, # Unfiltered_calibs can have up to one copy of each calib per certify cycle. # Minimize redundant queries to find_dataset. unique_ids = {(ref.datasetType, ref.dataId) for ref in unfiltered_calibs} - t = Timespan.fromInstant(astropy.time.Time(date, scale='utc')) + t = Timespan.fromInstant(date) _log_trace.debug("Looking up calibs for %s in %s.", t, collections) filtered_calibs = [] for dataset_type, data_id in unique_ids: diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 251b4700..88ad5f27 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -29,6 +29,7 @@ import warnings import astropy.coordinates +import astropy.time import astropy.units as u import psycopg2 @@ -791,7 +792,7 @@ def test_filter_calibs_by_date_early(self): all_calibs = list(self.central_butler.registry.queryDatasets("cpBias")) early_calibs = list(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2015, 2, 26, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-02-26 00:00:00", scale="utc") )) self.assertEqual(len(early_calibs), 4) for calib in early_calibs: @@ -802,7 +803,7 @@ def test_filter_calibs_by_date_late(self): all_calibs = list(self.central_butler.registry.queryDatasets("cpFlat")) late_calibs = list(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2015, 3, 16, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-03-16 00:00:00", scale="utc") )) self.assertEqual(len(late_calibs), 4) for calib in late_calibs: @@ -816,7 +817,7 @@ def test_filter_calibs_by_date_never(self): warnings.simplefilter("ignore", category=astropy.utils.exceptions.ErfaWarning) future_calibs = list(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2050, 1, 1, tzinfo=datetime.timezone.utc) + astropy.time.Time("2050-01-01 00:00:00", scale="utc") )) self.assertEqual(len(future_calibs), 0) @@ -825,14 +826,14 @@ def test_filter_calibs_by_date_unbounded(self): all_calibs = set(self.central_butler.registry.queryDatasets(["camera", "crosstalk"])) valid_calibs = set(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2015, 3, 15, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-03-15 00:00:00", scale="utc") )) self.assertEqual(valid_calibs, all_calibs) def test_filter_calibs_by_date_empty(self): valid_calibs = set(_filter_calibs_by_date( self.central_butler, "DECam/calib", [], - datetime.datetime(2015, 3, 15, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-03-15 00:00:00", scale="utc") )) self.assertEqual(len(valid_calibs), 0) From f25ea1c1c75125ffb77d76fab013943bae27bb1e Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 24 Jan 2024 16:19:12 -0800 Subject: [PATCH 4/7] Disallow default day_obs in _get_output_run. The default is not used by any production code, and both raises the risk of inconsistent collection handling and confuses the program's decision flow. --- python/activator/middleware_interface.py | 18 ++++++------------ tests/test_middleware_interface.py | 22 +--------------------- 2 files changed, 7 insertions(+), 33 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 66c72043..8b2a6cfa 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -625,7 +625,7 @@ def get_key(ref): def _get_init_output_run(self, pipeline_file: str, - date: datetime.date | None = None) -> str: + date: datetime.date) -> str: """Generate a deterministic init-output collection name that avoids configuration conflicts. @@ -633,24 +633,21 @@ def _get_init_output_run(self, ---------- pipeline_file : `str` The pipeline file that the run will be used for. - date : `datetime.date`, optional - Date of the processing run (not observation!), defaults to the - day_obs this method was called. + date : `datetime.date` + Date of the processing run (not observation!). Returns ------- run : `str` The run in which to place pipeline init-outputs. """ - if date is None: - date = datetime.datetime.now(_DAY_OBS_TZ) # Current executor requires that init-outputs be in the same run as # outputs. This can be changed once DM-36162 is done. return self._get_output_run(pipeline_file, date) def _get_output_run(self, pipeline_file: str, - date: datetime.date | None = None) -> str: + date: datetime.date) -> str: """Generate a deterministic collection name that avoids version or provenance conflicts. @@ -658,17 +655,14 @@ def _get_output_run(self, ---------- pipeline_file : `str` The pipeline file that the run will be used for. - date : `datetime.date`, optional - Date of the processing run (not observation!), defaults to the - day_obs this method was called. + date : `datetime.date` + Date of the processing run (not observation!). Returns ------- run : `str` The run in which to place processing outputs. """ - if date is None: - date = datetime.datetime.now(_DAY_OBS_TZ) pipeline_name, _ = os.path.splitext(os.path.basename(pipeline_file)) # Order optimized for S3 bucket -- filter out as many files as soon as possible. return self.instrument.makeCollectionName( diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 88ad5f27..fed96354 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -610,26 +610,6 @@ def test_get_output_run(self): "/ApPipe/prompt-proto-service-042" ) - def test_get_output_run_default(self): - # Workaround for mocking builtin class; see - # https://williambert.online/2011/07/how-to-unit-testing-in-django-with-mocking-and-patching/ - class MockDatetime(datetime.datetime): - @classmethod - def now(cls, tz=None): - # This time will be the same day in CLT/CLST, but the previous day in day_obs. - utc = datetime.datetime(2023, 3, 15, 5, 42, 3, tzinfo=datetime.timezone.utc) - if tz: - return utc.astimezone(tz) - else: - return utc.replace(tzinfo=None) - - filename = "ApPipe.yaml" - with unittest.mock.patch("datetime.datetime", MockDatetime): - out_run = self.interface._get_output_run(filename) - self.assertIn("output-2023-03-14", out_run) - init_run = self.interface._get_init_output_run(filename) - self.assertIn("output-2023-03-14", init_run) - def _assert_in_collection(self, butler, collection, dataset_type, data_id): # Pass iff any dataset matches the query, no need to check them all. for dataset in butler.registry.queryDatasets(dataset_type, collections=collection, dataId=data_id): @@ -662,7 +642,7 @@ def test_clean_local_repo(self): cat = lsst.afw.table.SourceCatalog() raw_collection = self.interface.instrument.makeDefaultRawIngestRunName() butler.registry.registerCollection(raw_collection, CollectionType.RUN) - out_collection = self.interface._get_output_run("ApPipe.yaml") + out_collection = self.interface._get_output_run("ApPipe.yaml", self.interface._day_obs) butler.registry.registerCollection(out_collection, CollectionType.RUN) chain = "generic-chain" butler.registry.registerCollection(chain, CollectionType.CHAINED) From 4f840cbfcc44d56ac22bef3414c2003049986b40 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 25 Jan 2024 12:23:01 -0800 Subject: [PATCH 5/7] Centralize day_obs calculation in MWI constructor. This prevents the output run code from depending on which time library we use to calculate day_obs; it only needs to know its value. --- python/activator/middleware_interface.py | 13 ++++++------- tests/test_middleware_interface.py | 16 +++++----------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 8b2a6cfa..e5bc5af2 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -221,8 +221,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVi self.instrument = lsst.obs.base.Instrument.from_string(visit.instrument, central_butler.registry) self.pipelines = pipelines - # Guard against a processing run starting on one day and ending the next. - self._day_obs = datetime.datetime.now(_DAY_OBS_TZ) + self._day_obs = datetime.datetime.now(_DAY_OBS_TZ).strftime("%Y-%m-%d") self._init_local_butler(local_repo, [self.instrument.makeUmbrellaCollectionName()], None) self._prep_collections() @@ -625,7 +624,7 @@ def get_key(ref): def _get_init_output_run(self, pipeline_file: str, - date: datetime.date) -> str: + date: str) -> str: """Generate a deterministic init-output collection name that avoids configuration conflicts. @@ -633,7 +632,7 @@ def _get_init_output_run(self, ---------- pipeline_file : `str` The pipeline file that the run will be used for. - date : `datetime.date` + date : `str` Date of the processing run (not observation!). Returns @@ -647,7 +646,7 @@ def _get_init_output_run(self, def _get_output_run(self, pipeline_file: str, - date: datetime.date) -> str: + date: str) -> str: """Generate a deterministic collection name that avoids version or provenance conflicts. @@ -655,7 +654,7 @@ def _get_output_run(self, ---------- pipeline_file : `str` The pipeline file that the run will be used for. - date : `datetime.date` + date : `str` Date of the processing run (not observation!). Returns @@ -666,7 +665,7 @@ def _get_output_run(self, pipeline_name, _ = os.path.splitext(os.path.basename(pipeline_file)) # Order optimized for S3 bucket -- filter out as many files as soon as possible. return self.instrument.makeCollectionName( - "prompt", f"output-{date:%Y-%m-%d}", pipeline_name, self._deployment) + "prompt", f"output-{date}", pipeline_name, self._deployment) def _prep_collections(self): """Pre-register output collections in advance of running the pipeline. diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index fed96354..a5c50df9 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -598,17 +598,11 @@ def test_run_pipeline_cascading_exception(self): def test_get_output_run(self): filename = "ApPipe.yaml" - for date in [datetime.date.today(), datetime.datetime.today()]: - out_run = self.interface._get_output_run(filename, date) - self.assertEqual(out_run, - f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" - "/ApPipe/prompt-proto-service-042" - ) - init_run = self.interface._get_init_output_run(filename, date) - self.assertEqual(init_run, - f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" - "/ApPipe/prompt-proto-service-042" - ) + date = "2023-01-22" + out_run = self.interface._get_output_run(filename, date) + self.assertEqual(out_run, f"{instname}/prompt/output-2023-01-22/ApPipe/prompt-proto-service-042") + init_run = self.interface._get_init_output_run(filename, date) + self.assertEqual(init_run, f"{instname}/prompt/output-2023-01-22/ApPipe/prompt-proto-service-042") def _assert_in_collection(self, butler, collection, dataset_type, data_id): # Pass iff any dataset matches the query, no need to check them all. From 76ae49c20c4e3962883ca56eb1a75562c64772fb Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 26 Jan 2024 11:58:46 -0800 Subject: [PATCH 6/7] Use astropy.time instead of datetime in day_obs calculation. For consistency with the observing day_obs, the value is defined based on the TAI-12 date of processing start, not the UTC-12 date. --- python/activator/middleware_interface.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index e5bc5af2..99472cbe 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -22,7 +22,6 @@ __all__ = ["get_central_butler", "make_local_repo", "MiddlewareInterface"] import collections.abc -import datetime import hashlib import itertools import logging @@ -128,8 +127,8 @@ def make_local_repo(local_storage: str, central_butler: Butler, instrument: str) return repo_dir -# Time zone used to define exposures' day_obs value. -_DAY_OBS_TZ = datetime.timezone(datetime.timedelta(hours=-12), name="day_obs") +# Offset used to define exposures' day_obs value. +_DAY_OBS_DELTA = astropy.time.TimeDelta(-12.0 * astropy.units.hour, scale="tai") class MiddlewareInterface: @@ -221,7 +220,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVi self.instrument = lsst.obs.base.Instrument.from_string(visit.instrument, central_butler.registry) self.pipelines = pipelines - self._day_obs = datetime.datetime.now(_DAY_OBS_TZ).strftime("%Y-%m-%d") + self._day_obs = (astropy.time.Time.now() + _DAY_OBS_DELTA).tai.to_value("iso", "date") self._init_local_butler(local_repo, [self.instrument.makeUmbrellaCollectionName()], None) self._prep_collections() From 584c3814c9d136f41bbf6a4b0d120f46915cf0f3 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 26 Jan 2024 14:24:11 -0800 Subject: [PATCH 7/7] Use astropy.time instead of datetime in upload.py LATISS. The private_sndStamp field is populated by converting an ISO 8601 string to a Unix epoch. To avoid any complications in the future, we should do the conversion in a TAI-aware way. --- python/tester/upload.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 1be5f2fc..9f3dabfb 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -20,7 +20,6 @@ # along with this program. If not, see . import dataclasses -import datetime import itertools import json import logging @@ -30,6 +29,7 @@ import tempfile import time +import astropy.time import boto3 from botocore.handlers import validate_bucket_name @@ -290,7 +290,7 @@ def get_samples_lsst(bucket, instrument): dome=FannedOutVisit.Dome.OPEN, duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), totalCheckpoints=1, - private_sndStamp=datetime.datetime.fromisoformat(md["DATE-BEG"]).timestamp(), + private_sndStamp=astropy.time.Time(md["DATE-BEG"], format="isot", scale="tai").unix_tai, ) _log.debug(f"File {blob.key} parsed as visit {visit} and registered as group {md['GROUPID']}.") result[md["GROUPID"]] = {0: {visit: blob}}