Skip to content

Commit

Permalink
Merge branch 'tickets/DM-42227'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed Jan 26, 2024
2 parents 217c987 + 584c381 commit 8ab3583
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 100 deletions.
46 changes: 17 additions & 29 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
__all__ = ["get_central_butler", "make_local_repo", "MiddlewareInterface"]

import collections.abc
import datetime
import hashlib
import itertools
import logging
Expand Down Expand Up @@ -128,8 +127,8 @@ def make_local_repo(local_storage: str, central_butler: Butler, instrument: str)
return repo_dir


# Time zone used to define exposures' day_obs value.
_DAY_OBS_TZ = datetime.timezone(datetime.timedelta(hours=-12), name="day_obs")
# Offset used to define exposures' day_obs value.
_DAY_OBS_DELTA = astropy.time.TimeDelta(-12.0 * astropy.units.hour, scale="tai")


class MiddlewareInterface:
Expand Down Expand Up @@ -221,8 +220,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVi
self.instrument = lsst.obs.base.Instrument.from_string(visit.instrument, central_butler.registry)
self.pipelines = pipelines

# Guard against a processing run starting on one day and ending the next.
self._day_obs = datetime.datetime.now(_DAY_OBS_TZ)
self._day_obs = (astropy.time.Time.now() + _DAY_OBS_DELTA).tai.to_value("iso", "date")

self._init_local_butler(local_repo, [self.instrument.makeUmbrellaCollectionName()], None)
self._prep_collections()
Expand Down Expand Up @@ -523,9 +521,7 @@ def _export_calibs(self, detector_id, filter):
# supported in queryDatasets yet.
calib_where = f"detector={detector_id} and physical_filter='{filter}'"
# TAI observation start time should be used for calib validity range.
# private_sndStamp is the visit publication time in TAI, not UTC,
# but difference shouldn't matter.
calib_date = datetime.datetime.fromtimestamp(self.visit.private_sndStamp, tz=datetime.timezone.utc)
calib_date = astropy.time.Time(self.visit.private_sndStamp, format="unix_tai")
# TODO: we can't use findFirst=True yet because findFirst query
# in CALIBRATION-type collection is not supported currently.
calibs = set(_filter_datasets(
Expand Down Expand Up @@ -627,54 +623,48 @@ def get_key(ref):

def _get_init_output_run(self,
pipeline_file: str,
date: datetime.date | None = None) -> str:
date: str) -> str:
"""Generate a deterministic init-output collection name that avoids
configuration conflicts.
Parameters
----------
pipeline_file : `str`
The pipeline file that the run will be used for.
date : `datetime.date`, optional
Date of the processing run (not observation!), defaults to the
day_obs this method was called.
date : `str`
Date of the processing run (not observation!).
Returns
-------
run : `str`
The run in which to place pipeline init-outputs.
"""
if date is None:
date = datetime.datetime.now(_DAY_OBS_TZ)
# Current executor requires that init-outputs be in the same run as
# outputs. This can be changed once DM-36162 is done.
return self._get_output_run(pipeline_file, date)

def _get_output_run(self,
pipeline_file: str,
date: datetime.date | None = None) -> str:
date: str) -> str:
"""Generate a deterministic collection name that avoids version or
provenance conflicts.
Parameters
----------
pipeline_file : `str`
The pipeline file that the run will be used for.
date : `datetime.date`, optional
Date of the processing run (not observation!), defaults to the
day_obs this method was called.
date : `str`
Date of the processing run (not observation!).
Returns
-------
run : `str`
The run in which to place processing outputs.
"""
if date is None:
date = datetime.datetime.now(_DAY_OBS_TZ)
pipeline_name, _ = os.path.splitext(os.path.basename(pipeline_file))
# Order optimized for S3 bucket -- filter out as many files as soon as possible.
return self.instrument.makeCollectionName(
"prompt", f"output-{date:%Y-%m-%d}", pipeline_name, self._deployment)
"prompt", f"output-{date}", pipeline_name, self._deployment)

def _prep_collections(self):
"""Pre-register output collections in advance of running the pipeline.
Expand Down Expand Up @@ -1062,7 +1052,7 @@ class _MissingDatasetError(RuntimeError):
def _filter_datasets(src_repo: Butler,
dest_repo: Butler,
*args,
calib_date: datetime.datetime | None = None,
calib_date: astropy.time.Time | None = None,
**kwargs) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]:
"""Identify datasets in a source repository, filtering out those already
present in a destination.
Expand All @@ -1076,10 +1066,9 @@ def _filter_datasets(src_repo: Butler,
The repository in which a dataset must be present.
dest_repo : `lsst.daf.butler.Butler`
The repository in which a dataset must not be present.
calib_date : `datetime.datetime`, optional
calib_date : `astropy.time.Time`, optional
If provided, also filter anything other than calibs valid at
``calib_date`` and check that at least one valid calib was found.
Any ``datetime`` object must be aware.
*args, **kwargs
Parameters for describing the dataset query. They have the same
meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`.
Expand Down Expand Up @@ -1176,7 +1165,7 @@ def _remove_from_chain(butler: Butler, chain: str, old_collections: collections.
def _filter_calibs_by_date(butler: Butler,
collections: typing.Any,
unfiltered_calibs: collections.abc.Collection[lsst.daf.butler.DatasetRef],
date: datetime.datetime
date: astropy.time.Time
) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]:
"""Trim a set of calib datasets to those that are valid at a particular time.
Expand All @@ -1189,9 +1178,8 @@ def _filter_calibs_by_date(butler: Butler,
collections, to query for validity data.
unfiltered_calibs : collection [`lsst.daf.butler.DatasetRef`]
The calibs to be filtered by validity. May be empty.
date : `datetime.datetime`
The time at which the calibs must be valid. Must be an
aware ``datetime``.
date : `astropy.time.Time`
The time at which the calibs must be valid.
Returns
-------
Expand All @@ -1203,7 +1191,7 @@ def _filter_calibs_by_date(butler: Butler,
# Unfiltered_calibs can have up to one copy of each calib per certify cycle.
# Minimize redundant queries to find_dataset.
unique_ids = {(ref.datasetType, ref.dataId) for ref in unfiltered_calibs}
t = Timespan.fromInstant(astropy.time.Time(date, scale='utc'))
t = Timespan.fromInstant(date)
_log_trace.debug("Looking up calibs for %s in %s.", t, collections)
filtered_calibs = []
for dataset_type, data_id in unique_ids:
Expand Down
4 changes: 2 additions & 2 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import dataclasses
import datetime
import itertools
import json
import logging
Expand All @@ -30,6 +29,7 @@
import tempfile
import time

import astropy.time
import boto3
from botocore.handlers import validate_bucket_name

Expand Down Expand Up @@ -290,7 +290,7 @@ def get_samples_lsst(bucket, instrument):
dome=FannedOutVisit.Dome.OPEN,
duration=float(EXPOSURE_INTERVAL+SLEW_INTERVAL),
totalCheckpoints=1,
private_sndStamp=datetime.datetime.fromisoformat(md["DATE-BEG"]).timestamp(),
private_sndStamp=astropy.time.Time(md["DATE-BEG"], format="isot", scale="tai").unix_tai,
)
_log.debug(f"File {blob.key} parsed as visit {visit} and registered as group {md['GROUPID']}.")
result[md["GROUPID"]] = {0: {visit: blob}}
Expand Down
95 changes: 26 additions & 69 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import warnings

import astropy.coordinates
import astropy.time
import astropy.units as u
import psycopg2

Expand Down Expand Up @@ -116,22 +117,6 @@ def fake_file_data(filename, dimensions, instrument, visit):
class MiddlewareInterfaceTest(unittest.TestCase):
"""Test the MiddlewareInterface class with faked data.
"""
@classmethod
def setUpClass(cls):
cls.env_patcher = unittest.mock.patch.dict(os.environ,
{"URL_APDB": "postgresql://localhost/postgres",
"K_REVISION": "prompt-proto-service-042",
})
cls.env_patcher.start()

super().setUpClass()

@classmethod
def tearDownClass(cls):
super().tearDownClass()

cls.env_patcher.stop()

def setUp(self):
self.data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data")
self.central_repo = os.path.join(self.data_dir, "central_repo")
Expand All @@ -143,6 +128,13 @@ def setUp(self):
self.input_data = os.path.join(self.data_dir, "input_data")
self.local_repo = make_local_repo(tempfile.gettempdir(), self.central_butler, instname)

env_patcher = unittest.mock.patch.dict(os.environ,
{"URL_APDB": f"sqlite:///{self.local_repo.name}/apdb.db",
"K_REVISION": "prompt-proto-service-042",
})
env_patcher.start()
self.addCleanup(env_patcher.stop)

# coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371
ra = 155.4702849608958
dec = -4.950050405424033
Expand Down Expand Up @@ -606,37 +598,11 @@ def test_run_pipeline_cascading_exception(self):

def test_get_output_run(self):
filename = "ApPipe.yaml"
for date in [datetime.date.today(), datetime.datetime.today()]:
out_run = self.interface._get_output_run(filename, date)
self.assertEqual(out_run,
f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}"
"/ApPipe/prompt-proto-service-042"
)
init_run = self.interface._get_init_output_run(filename, date)
self.assertEqual(init_run,
f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}"
"/ApPipe/prompt-proto-service-042"
)

def test_get_output_run_default(self):
# Workaround for mocking builtin class; see
# https://williambert.online/2011/07/how-to-unit-testing-in-django-with-mocking-and-patching/
class MockDatetime(datetime.datetime):
@classmethod
def now(cls, tz=None):
# This time will be the same day in CLT/CLST, but the previous day in day_obs.
utc = datetime.datetime(2023, 3, 15, 5, 42, 3, tzinfo=datetime.timezone.utc)
if tz:
return utc.astimezone(tz)
else:
return utc.replace(tzinfo=None)

filename = "ApPipe.yaml"
with unittest.mock.patch("datetime.datetime", MockDatetime):
out_run = self.interface._get_output_run(filename)
self.assertIn("output-2023-03-14", out_run)
init_run = self.interface._get_init_output_run(filename)
self.assertIn("output-2023-03-14", init_run)
date = "2023-01-22"
out_run = self.interface._get_output_run(filename, date)
self.assertEqual(out_run, f"{instname}/prompt/output-2023-01-22/ApPipe/prompt-proto-service-042")
init_run = self.interface._get_init_output_run(filename, date)
self.assertEqual(init_run, f"{instname}/prompt/output-2023-01-22/ApPipe/prompt-proto-service-042")

def _assert_in_collection(self, butler, collection, dataset_type, data_id):
# Pass iff any dataset matches the query, no need to check them all.
Expand Down Expand Up @@ -670,7 +636,7 @@ def test_clean_local_repo(self):
cat = lsst.afw.table.SourceCatalog()
raw_collection = self.interface.instrument.makeDefaultRawIngestRunName()
butler.registry.registerCollection(raw_collection, CollectionType.RUN)
out_collection = self.interface._get_output_run("ApPipe.yaml")
out_collection = self.interface._get_output_run("ApPipe.yaml", self.interface._day_obs)
butler.registry.registerCollection(out_collection, CollectionType.RUN)
chain = "generic-chain"
butler.registry.registerCollection(chain, CollectionType.CHAINED)
Expand Down Expand Up @@ -800,7 +766,7 @@ def test_filter_calibs_by_date_early(self):
all_calibs = list(self.central_butler.registry.queryDatasets("cpBias"))
early_calibs = list(_filter_calibs_by_date(
self.central_butler, "DECam/calib", all_calibs,
datetime.datetime(2015, 2, 26, tzinfo=datetime.timezone.utc)
astropy.time.Time("2015-02-26 00:00:00", scale="utc")
))
self.assertEqual(len(early_calibs), 4)
for calib in early_calibs:
Expand All @@ -811,7 +777,7 @@ def test_filter_calibs_by_date_late(self):
all_calibs = list(self.central_butler.registry.queryDatasets("cpFlat"))
late_calibs = list(_filter_calibs_by_date(
self.central_butler, "DECam/calib", all_calibs,
datetime.datetime(2015, 3, 16, tzinfo=datetime.timezone.utc)
astropy.time.Time("2015-03-16 00:00:00", scale="utc")
))
self.assertEqual(len(late_calibs), 4)
for calib in late_calibs:
Expand All @@ -825,7 +791,7 @@ def test_filter_calibs_by_date_never(self):
warnings.simplefilter("ignore", category=astropy.utils.exceptions.ErfaWarning)
future_calibs = list(_filter_calibs_by_date(
self.central_butler, "DECam/calib", all_calibs,
datetime.datetime(2050, 1, 1, tzinfo=datetime.timezone.utc)
astropy.time.Time("2050-01-01 00:00:00", scale="utc")
))
self.assertEqual(len(future_calibs), 0)

Expand All @@ -834,14 +800,14 @@ def test_filter_calibs_by_date_unbounded(self):
all_calibs = set(self.central_butler.registry.queryDatasets(["camera", "crosstalk"]))
valid_calibs = set(_filter_calibs_by_date(
self.central_butler, "DECam/calib", all_calibs,
datetime.datetime(2015, 3, 15, tzinfo=datetime.timezone.utc)
astropy.time.Time("2015-03-15 00:00:00", scale="utc")
))
self.assertEqual(valid_calibs, all_calibs)

def test_filter_calibs_by_date_empty(self):
valid_calibs = set(_filter_calibs_by_date(
self.central_butler, "DECam/calib", [],
datetime.datetime(2015, 3, 15, tzinfo=datetime.timezone.utc)
astropy.time.Time("2015-03-15 00:00:00", scale="utc")
))
self.assertEqual(len(valid_calibs), 0)

Expand All @@ -853,22 +819,6 @@ class MiddlewareInterfaceWriteableTest(unittest.TestCase):
setup takes longer than for MiddlewareInterfaceTest, so it should be
used sparingly.
"""
@classmethod
def setUpClass(cls):
super().setUpClass()

cls.env_patcher = unittest.mock.patch.dict(os.environ,
{"URL_APDB": "postgresql://localhost/postgres",
"K_REVISION": "prompt-proto-service-042",
})
cls.env_patcher.start()

@classmethod
def tearDownClass(cls):
super().tearDownClass()

cls.env_patcher.stop()

def _create_copied_repo(self):
"""Create a fresh repository that's a copy of the test data.
Expand Down Expand Up @@ -910,6 +860,13 @@ def setUp(self):
self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo)
self.addCleanup(tempfile.TemporaryDirectory.cleanup, second_local_repo)

env_patcher = unittest.mock.patch.dict(os.environ,
{"URL_APDB": f"sqlite:///{local_repo.name}/apdb.db",
"K_REVISION": "prompt-proto-service-042",
})
env_patcher.start()
self.addCleanup(env_patcher.stop)

# coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371
ra = 155.4702849608958
dec = -4.950050405424033
Expand Down

0 comments on commit 8ab3583

Please sign in to comment.