diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index e8c0a73d..99472cbe 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -22,7 +22,6 @@ __all__ = ["get_central_butler", "make_local_repo", "MiddlewareInterface"] import collections.abc -import datetime import hashlib import itertools import logging @@ -128,8 +127,8 @@ def make_local_repo(local_storage: str, central_butler: Butler, instrument: str) return repo_dir -# Time zone used to define exposures' day_obs value. -_DAY_OBS_TZ = datetime.timezone(datetime.timedelta(hours=-12), name="day_obs") +# Offset used to define exposures' day_obs value. +_DAY_OBS_DELTA = astropy.time.TimeDelta(-12.0 * astropy.units.hour, scale="tai") class MiddlewareInterface: @@ -221,8 +220,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVi self.instrument = lsst.obs.base.Instrument.from_string(visit.instrument, central_butler.registry) self.pipelines = pipelines - # Guard against a processing run starting on one day and ending the next. - self._day_obs = datetime.datetime.now(_DAY_OBS_TZ) + self._day_obs = (astropy.time.Time.now() + _DAY_OBS_DELTA).tai.to_value("iso", "date") self._init_local_butler(local_repo, [self.instrument.makeUmbrellaCollectionName()], None) self._prep_collections() @@ -523,9 +521,7 @@ def _export_calibs(self, detector_id, filter): # supported in queryDatasets yet. calib_where = f"detector={detector_id} and physical_filter='{filter}'" # TAI observation start time should be used for calib validity range. - # private_sndStamp is the visit publication time in TAI, not UTC, - # but difference shouldn't matter. - calib_date = datetime.datetime.fromtimestamp(self.visit.private_sndStamp, tz=datetime.timezone.utc) + calib_date = astropy.time.Time(self.visit.private_sndStamp, format="unix_tai") # TODO: we can't use findFirst=True yet because findFirst query # in CALIBRATION-type collection is not supported currently. calibs = set(_filter_datasets( @@ -627,7 +623,7 @@ def get_key(ref): def _get_init_output_run(self, pipeline_file: str, - date: datetime.date | None = None) -> str: + date: str) -> str: """Generate a deterministic init-output collection name that avoids configuration conflicts. @@ -635,24 +631,21 @@ def _get_init_output_run(self, ---------- pipeline_file : `str` The pipeline file that the run will be used for. - date : `datetime.date`, optional - Date of the processing run (not observation!), defaults to the - day_obs this method was called. + date : `str` + Date of the processing run (not observation!). Returns ------- run : `str` The run in which to place pipeline init-outputs. """ - if date is None: - date = datetime.datetime.now(_DAY_OBS_TZ) # Current executor requires that init-outputs be in the same run as # outputs. This can be changed once DM-36162 is done. return self._get_output_run(pipeline_file, date) def _get_output_run(self, pipeline_file: str, - date: datetime.date | None = None) -> str: + date: str) -> str: """Generate a deterministic collection name that avoids version or provenance conflicts. @@ -660,21 +653,18 @@ def _get_output_run(self, ---------- pipeline_file : `str` The pipeline file that the run will be used for. - date : `datetime.date`, optional - Date of the processing run (not observation!), defaults to the - day_obs this method was called. + date : `str` + Date of the processing run (not observation!). Returns ------- run : `str` The run in which to place processing outputs. """ - if date is None: - date = datetime.datetime.now(_DAY_OBS_TZ) pipeline_name, _ = os.path.splitext(os.path.basename(pipeline_file)) # Order optimized for S3 bucket -- filter out as many files as soon as possible. return self.instrument.makeCollectionName( - "prompt", f"output-{date:%Y-%m-%d}", pipeline_name, self._deployment) + "prompt", f"output-{date}", pipeline_name, self._deployment) def _prep_collections(self): """Pre-register output collections in advance of running the pipeline. @@ -1062,7 +1052,7 @@ class _MissingDatasetError(RuntimeError): def _filter_datasets(src_repo: Butler, dest_repo: Butler, *args, - calib_date: datetime.datetime | None = None, + calib_date: astropy.time.Time | None = None, **kwargs) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]: """Identify datasets in a source repository, filtering out those already present in a destination. @@ -1076,10 +1066,9 @@ def _filter_datasets(src_repo: Butler, The repository in which a dataset must be present. dest_repo : `lsst.daf.butler.Butler` The repository in which a dataset must not be present. - calib_date : `datetime.datetime`, optional + calib_date : `astropy.time.Time`, optional If provided, also filter anything other than calibs valid at ``calib_date`` and check that at least one valid calib was found. - Any ``datetime`` object must be aware. *args, **kwargs Parameters for describing the dataset query. They have the same meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`. @@ -1176,7 +1165,7 @@ def _remove_from_chain(butler: Butler, chain: str, old_collections: collections. def _filter_calibs_by_date(butler: Butler, collections: typing.Any, unfiltered_calibs: collections.abc.Collection[lsst.daf.butler.DatasetRef], - date: datetime.datetime + date: astropy.time.Time ) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]: """Trim a set of calib datasets to those that are valid at a particular time. @@ -1189,9 +1178,8 @@ def _filter_calibs_by_date(butler: Butler, collections, to query for validity data. unfiltered_calibs : collection [`lsst.daf.butler.DatasetRef`] The calibs to be filtered by validity. May be empty. - date : `datetime.datetime` - The time at which the calibs must be valid. Must be an - aware ``datetime``. + date : `astropy.time.Time` + The time at which the calibs must be valid. Returns ------- @@ -1203,7 +1191,7 @@ def _filter_calibs_by_date(butler: Butler, # Unfiltered_calibs can have up to one copy of each calib per certify cycle. # Minimize redundant queries to find_dataset. unique_ids = {(ref.datasetType, ref.dataId) for ref in unfiltered_calibs} - t = Timespan.fromInstant(astropy.time.Time(date, scale='utc')) + t = Timespan.fromInstant(date) _log_trace.debug("Looking up calibs for %s in %s.", t, collections) filtered_calibs = [] for dataset_type, data_id in unique_ids: diff --git a/python/tester/upload.py b/python/tester/upload.py index 1be5f2fc..9f3dabfb 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -20,7 +20,6 @@ # along with this program. If not, see . import dataclasses -import datetime import itertools import json import logging @@ -30,6 +29,7 @@ import tempfile import time +import astropy.time import boto3 from botocore.handlers import validate_bucket_name @@ -290,7 +290,7 @@ def get_samples_lsst(bucket, instrument): dome=FannedOutVisit.Dome.OPEN, duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL), totalCheckpoints=1, - private_sndStamp=datetime.datetime.fromisoformat(md["DATE-BEG"]).timestamp(), + private_sndStamp=astropy.time.Time(md["DATE-BEG"], format="isot", scale="tai").unix_tai, ) _log.debug(f"File {blob.key} parsed as visit {visit} and registered as group {md['GROUPID']}.") result[md["GROUPID"]] = {0: {visit: blob}} diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 08f6ccc0..a5c50df9 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -29,6 +29,7 @@ import warnings import astropy.coordinates +import astropy.time import astropy.units as u import psycopg2 @@ -116,22 +117,6 @@ def fake_file_data(filename, dimensions, instrument, visit): class MiddlewareInterfaceTest(unittest.TestCase): """Test the MiddlewareInterface class with faked data. """ - @classmethod - def setUpClass(cls): - cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"URL_APDB": "postgresql://localhost/postgres", - "K_REVISION": "prompt-proto-service-042", - }) - cls.env_patcher.start() - - super().setUpClass() - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - - cls.env_patcher.stop() - def setUp(self): self.data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.central_repo = os.path.join(self.data_dir, "central_repo") @@ -143,6 +128,13 @@ def setUp(self): self.input_data = os.path.join(self.data_dir, "input_data") self.local_repo = make_local_repo(tempfile.gettempdir(), self.central_butler, instname) + env_patcher = unittest.mock.patch.dict(os.environ, + {"URL_APDB": f"sqlite:///{self.local_repo.name}/apdb.db", + "K_REVISION": "prompt-proto-service-042", + }) + env_patcher.start() + self.addCleanup(env_patcher.stop) + # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 dec = -4.950050405424033 @@ -606,37 +598,11 @@ def test_run_pipeline_cascading_exception(self): def test_get_output_run(self): filename = "ApPipe.yaml" - for date in [datetime.date.today(), datetime.datetime.today()]: - out_run = self.interface._get_output_run(filename, date) - self.assertEqual(out_run, - f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" - "/ApPipe/prompt-proto-service-042" - ) - init_run = self.interface._get_init_output_run(filename, date) - self.assertEqual(init_run, - f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" - "/ApPipe/prompt-proto-service-042" - ) - - def test_get_output_run_default(self): - # Workaround for mocking builtin class; see - # https://williambert.online/2011/07/how-to-unit-testing-in-django-with-mocking-and-patching/ - class MockDatetime(datetime.datetime): - @classmethod - def now(cls, tz=None): - # This time will be the same day in CLT/CLST, but the previous day in day_obs. - utc = datetime.datetime(2023, 3, 15, 5, 42, 3, tzinfo=datetime.timezone.utc) - if tz: - return utc.astimezone(tz) - else: - return utc.replace(tzinfo=None) - - filename = "ApPipe.yaml" - with unittest.mock.patch("datetime.datetime", MockDatetime): - out_run = self.interface._get_output_run(filename) - self.assertIn("output-2023-03-14", out_run) - init_run = self.interface._get_init_output_run(filename) - self.assertIn("output-2023-03-14", init_run) + date = "2023-01-22" + out_run = self.interface._get_output_run(filename, date) + self.assertEqual(out_run, f"{instname}/prompt/output-2023-01-22/ApPipe/prompt-proto-service-042") + init_run = self.interface._get_init_output_run(filename, date) + self.assertEqual(init_run, f"{instname}/prompt/output-2023-01-22/ApPipe/prompt-proto-service-042") def _assert_in_collection(self, butler, collection, dataset_type, data_id): # Pass iff any dataset matches the query, no need to check them all. @@ -670,7 +636,7 @@ def test_clean_local_repo(self): cat = lsst.afw.table.SourceCatalog() raw_collection = self.interface.instrument.makeDefaultRawIngestRunName() butler.registry.registerCollection(raw_collection, CollectionType.RUN) - out_collection = self.interface._get_output_run("ApPipe.yaml") + out_collection = self.interface._get_output_run("ApPipe.yaml", self.interface._day_obs) butler.registry.registerCollection(out_collection, CollectionType.RUN) chain = "generic-chain" butler.registry.registerCollection(chain, CollectionType.CHAINED) @@ -800,7 +766,7 @@ def test_filter_calibs_by_date_early(self): all_calibs = list(self.central_butler.registry.queryDatasets("cpBias")) early_calibs = list(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2015, 2, 26, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-02-26 00:00:00", scale="utc") )) self.assertEqual(len(early_calibs), 4) for calib in early_calibs: @@ -811,7 +777,7 @@ def test_filter_calibs_by_date_late(self): all_calibs = list(self.central_butler.registry.queryDatasets("cpFlat")) late_calibs = list(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2015, 3, 16, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-03-16 00:00:00", scale="utc") )) self.assertEqual(len(late_calibs), 4) for calib in late_calibs: @@ -825,7 +791,7 @@ def test_filter_calibs_by_date_never(self): warnings.simplefilter("ignore", category=astropy.utils.exceptions.ErfaWarning) future_calibs = list(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2050, 1, 1, tzinfo=datetime.timezone.utc) + astropy.time.Time("2050-01-01 00:00:00", scale="utc") )) self.assertEqual(len(future_calibs), 0) @@ -834,14 +800,14 @@ def test_filter_calibs_by_date_unbounded(self): all_calibs = set(self.central_butler.registry.queryDatasets(["camera", "crosstalk"])) valid_calibs = set(_filter_calibs_by_date( self.central_butler, "DECam/calib", all_calibs, - datetime.datetime(2015, 3, 15, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-03-15 00:00:00", scale="utc") )) self.assertEqual(valid_calibs, all_calibs) def test_filter_calibs_by_date_empty(self): valid_calibs = set(_filter_calibs_by_date( self.central_butler, "DECam/calib", [], - datetime.datetime(2015, 3, 15, tzinfo=datetime.timezone.utc) + astropy.time.Time("2015-03-15 00:00:00", scale="utc") )) self.assertEqual(len(valid_calibs), 0) @@ -853,22 +819,6 @@ class MiddlewareInterfaceWriteableTest(unittest.TestCase): setup takes longer than for MiddlewareInterfaceTest, so it should be used sparingly. """ - @classmethod - def setUpClass(cls): - super().setUpClass() - - cls.env_patcher = unittest.mock.patch.dict(os.environ, - {"URL_APDB": "postgresql://localhost/postgres", - "K_REVISION": "prompt-proto-service-042", - }) - cls.env_patcher.start() - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - - cls.env_patcher.stop() - def _create_copied_repo(self): """Create a fresh repository that's a copy of the test data. @@ -910,6 +860,13 @@ def setUp(self): self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo) self.addCleanup(tempfile.TemporaryDirectory.cleanup, second_local_repo) + env_patcher = unittest.mock.patch.dict(os.environ, + {"URL_APDB": f"sqlite:///{local_repo.name}/apdb.db", + "K_REVISION": "prompt-proto-service-042", + }) + env_patcher.start() + self.addCleanup(env_patcher.stop) + # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 dec = -4.950050405424033