Skip to content

Commit

Permalink
Merge pull request #114 from lsst-dm/tickets/DM-42469
Browse files Browse the repository at this point in the history
DM-42469: Update testers and fan-out for the nextVisit schema change
  • Loading branch information
hsinfang authored Jan 24, 2024
2 parents 1c6b97a + c8311b0 commit 217c987
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 9 deletions.
12 changes: 10 additions & 2 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lsst.ctrl.mpexec
from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor
from lsst.daf.butler import Butler, CollectionType, Timespan
from lsst.daf.butler.registry import MissingDatasetTypeError
import lsst.dax.apdb
import lsst.geom
from lsst.meas.algorithms.htmIndexer import HtmIndexer
Expand Down Expand Up @@ -862,10 +863,14 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
skip_existing_in=None,
task_factory=factory,
)
qgraph = executor.make_quantum_graph(pipeline, where=where)
try:
qgraph = executor.make_quantum_graph(pipeline, where=where)
except MissingDatasetTypeError as e:
_log.error(f"Building quantum graph for {pipeline_file} failed ", exc_info=e)
continue
if len(qgraph) == 0:
# Diagnostic logs are the responsibility of GraphBuilder.
_log.error(f"Could not build quantum graph for {pipeline_file}; "
_log.error(f"Empty quantum graph for {pipeline_file}; "
"see previous logs for details.")
continue
# Past this point, partial execution creates datasets.
Expand Down Expand Up @@ -904,6 +909,9 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
raise NonRetriableError("APDB modified") from e
else:
raise
finally:
# Refresh so that registry queries know the processed products.
self.butler.registry.refresh()
break
else:
# TODO: a good place for a custom exception?
Expand Down
8 changes: 4 additions & 4 deletions python/activator/visit.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,34 @@ class Dome(enum.IntEnum):
dome: Dome
duration: float # script execution, not exposure
nimages: int # number of snaps expected, 0 if unknown
instrument: str # short name
survey: str # survey name
totalCheckpoints: int

def __str__(self):
"""Return a short string that represents the visit but does not
include complete metadata.
"""
return f"(groupId={self.groupId}, survey={self.survey}, salIndex={self.salIndex})"
return f"(groupId={self.groupId}, survey={self.survey}, " \
f"salIndex={self.salIndex}, instrument={self.instrument})"


@dataclass(frozen=True, kw_only=True)
class FannedOutVisit(BareVisit):
# Extra information is added by the fan-out service at USDF.
instrument: str # short name
detector: int
private_sndStamp: float # time of visit publication; TAI in unix seconds

def __str__(self):
"""Return a short string that disambiguates the visit but does not
include "metadata" fields.
"""
return f"(instrument={self.instrument}, groupId={self.groupId}, survey={self.survey} " \
return f"(groupId={self.groupId}, survey={self.survey}, " \
f"detector={self.detector})"

def get_bare_visit(self):
"""Return visit-level info as a dict"""
info = asdict(self)
info.pop("instrument")
info.pop("detector")
info.pop("private_sndStamp")
return info
Expand Down
1 change: 1 addition & 0 deletions python/tester/upload_hsc_rc2.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def prepare_one_visit(kafka_url, group_id, butler, visit_id):
# all items in refs share the same visit info and one event is to be sent
for data_id in refs.dataIds.limit(1).expanded():
visit = SummitVisit(
instrument="HSC",
groupId=group_id,
nimages=1,
filters=data_id.records["physical_filter"].name,
Expand Down
4 changes: 2 additions & 2 deletions python/tester/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ def send_next_visit(url, group, visit_infos):
for info in visit_infos:
_log.debug(f"Sending next_visit for group: {info.groupId} "
f"filters: {info.filters} ra: {info.position[0]} dec: {info.position[1]} "
f"survey: {info.survey}")
f"instrument: {info.instrument} survey: {info.survey}")
records_level = dict(value=asdict(info))
value_schema_level = dict(value_schema_id=1, records=[records_level])
value_schema_level = dict(value_schema_id=97, records=[records_level])

r = requests.post(url, data=json.dumps(value_schema_level), headers=header)
_log.debug(r.content)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_visit.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def setUp(self):
super().setUp()

visit_info = dict(
instrument="NotACam",
groupId="2023-01-23T23:33:14.762",
nimages=2,
filters="k2022",
Expand All @@ -98,7 +99,6 @@ def setUp(self):
)
self.visit = BareVisit(**visit_info)
self.fannedOutVisit = FannedOutVisit(
instrument="NotACam",
detector=42,
private_sndStamp=1_674_516_794.0,
**visit_info
Expand Down

0 comments on commit 217c987

Please sign in to comment.