Skip to content

Commit

Permalink
Restore group-by-metadata visit definition with group dimension
Browse files Browse the repository at this point in the history
This requires that we call a method on the Instrument class to
get the group_id from the name.
  • Loading branch information
timj committed Feb 1, 2024
1 parent e499c15 commit 5a12215
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
57 changes: 42 additions & 15 deletions python/lsst/obs/base/defineVisits.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,19 @@ def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[Di
raise NotImplementedError()

@abstractmethod
def group(self, exposures: list[DimensionRecord]) -> Iterable[VisitDefinitionData]:
def group(
self, exposures: list[DimensionRecord], instrument: Instrument
) -> Iterable[VisitDefinitionData]:
"""Group the given exposures into visits.
Parameters
----------
exposures : `list` [ `DimensionRecord` ]
DimensionRecords (for the 'exposure' dimension) describing the
exposures to group.
instrument : `~lsst.pipe.base.Instrument`
Instrument specification that can be used to optionally support
some visit ID definitions.
Returns
-------
Expand Down Expand Up @@ -669,6 +674,7 @@ def run(
# instrument in play, and check for non-science exposures.
exposures = []
instruments = set()
instrument_cls_name: str | None = None
for dataId in data_id_set:
record = dataId.records["exposure"]
assert record is not None, "Guaranteed by expandDataIds call earlier."
Expand All @@ -681,6 +687,9 @@ def run(
f"{record.observation_type}, but is not on sky."
)
instruments.add(dataId["instrument"])
instrument_record = dataId.records["instrument"]
if instrument_record is not None:
instrument_cls_name = instrument_record.class_name
exposures.append(record)
if not exposures:
self.log.info("No on-sky exposures found after filtering.")
Expand All @@ -691,6 +700,12 @@ def run(
f"from the same instrument; got {instruments}."
)
(instrument,) = instruments

# Might need the instrument class for later depending on universe
# and grouping scheme.
assert instrument_cls_name is not None, "Instrument must be defined in this dataId"
instrument_helper = Instrument.from_string(instrument_cls_name)

# Ensure the visit_system our grouping algorithm uses is in the
# registry, if it wasn't already.
visitSystems = self.groupExposures.getVisitSystems()
Expand All @@ -710,7 +725,7 @@ def run(

# Group exposures into visits, delegating to subtask.
self.log.info("Grouping %d exposure(s) into visits.", len(exposures))
definitions = list(self.groupExposures.group(exposures))
definitions = list(self.groupExposures.group(exposures, instrument_helper))
# Iterate over visits, compute regions, and insert dimension data, one
# transaction per visit. If a visit already exists, we skip all other
# inserts.
Expand Down Expand Up @@ -857,7 +872,9 @@ def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[Di
# No grouping.
return {exposure.id: [exposure] for exposure in exposures}

def group(self, exposures: list[DimensionRecord]) -> Iterable[VisitDefinitionData]:
def group(
self, exposures: list[DimensionRecord], instrument: Instrument
) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
visit_systems = {VisitSystem.from_name("one-to-one")}
for exposure in exposures:
Expand Down Expand Up @@ -940,19 +957,27 @@ def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[Di
groups[getattr(exposure, group_key)].append(exposure)
return groups

def group(self, exposures: list[DimensionRecord]) -> Iterable[VisitDefinitionData]:
def group(
self, exposures: list[DimensionRecord], instrument: Instrument
) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
visit_systems = {VisitSystem.from_name("by-group-metadata")}
groups = self.group_exposures(exposures)
has_group_dimension: bool | None = None
for visitName, exposuresInGroup in groups.items():
instrument = exposuresInGroup[0].instrument
visitId = exposuresInGroup[0].group_id
assert all(
e.group_id == visitId for e in exposuresInGroup
), "Grouping by exposure.group_name does not yield consistent group IDs"
instrument_name = exposuresInGroup[0].instrument
assert instrument_name == instrument.getName(), "Inconsistency in instrument name"
visit_ids: set[int] = set()
if has_group_dimension is None:
has_group_dimension = hasattr(exposuresInGroup[0], "group")
if has_group_dimension:
visit_ids = {instrument.group_name_to_group_id(e.group) for e in exposuresInGroup}
else:
visit_ids = {e.group_id for e in exposuresInGroup}
assert len(visit_ids) == 1, "Grouping by exposure group does not yield consistent group IDs"
yield VisitDefinitionData(
instrument=instrument,
id=visitId,
instrument=instrument_name,
id=visit_ids.pop(),
name=visitName,
exposures=exposuresInGroup,
visit_systems=visit_systems,
Expand Down Expand Up @@ -1028,14 +1053,16 @@ def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[Di
groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure)
return groups

def group(self, exposures: list[DimensionRecord]) -> Iterable[VisitDefinitionData]:
def group(
self, exposures: list[DimensionRecord], instrument: Instrument
) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
system_one_to_one = VisitSystem.from_name("one-to-one")
system_seq_start_end = VisitSystem.from_name("by-seq-start-end")

groups = self.group_exposures(exposures)
for visit_key, exposures_in_group in groups.items():
instrument = exposures_in_group[0].instrument
instrument_name = exposures_in_group[0].instrument

# It is possible that the first exposure in a visit has not
# been ingested. This can be determined and if that is the case
Expand Down Expand Up @@ -1082,7 +1109,7 @@ def group(self, exposures: list[DimensionRecord]) -> Iterable[VisitDefinitionDat
visit_id = int(f"9{visit_id}")

yield VisitDefinitionData(
instrument=instrument,
instrument=instrument_name,
id=visit_id,
name=visit_name,
exposures=[exposure],
Expand All @@ -1096,7 +1123,7 @@ def group(self, exposures: list[DimensionRecord]) -> Iterable[VisitDefinitionDat
visit_id = first.id

yield VisitDefinitionData(
instrument=instrument,
instrument=instrument_name,
id=visit_id,
name=visit_name,
exposures=exposures_in_group,
Expand Down
13 changes: 10 additions & 3 deletions tests/test_defineVisits.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,14 @@ def assertVisits(self):
"""Check that the visits were registered as expected."""
visits = list(self.butler.registry.queryDimensionRecords("visit"))
self.assertEqual(len(visits), 2)
self.assertEqual({visit.id for visit in visits}, {2291434132550000, 2291434871810000})

# The visit ID itself depends on which universe we are using.
# It is either calculated or comes from the JSON record.
if "group" in self.butler.dimensions["exposure"].implied:
visit_ids = [20220406025653255, 20220406025807181]
else:
visit_ids = [2291434132550000, 2291434871810000]
self.assertEqual({visit.id for visit in visits}, set(visit_ids))

# Ensure that the definitions are correct (ignoring order).
defmap = defaultdict(set)
Expand All @@ -221,8 +228,8 @@ def assertVisits(self):
self.assertEqual(
dict(defmap),
{
2291434132550000: {2022040500347},
2291434871810000: {2022040500348, 2022040500349},
visit_ids[0]: {2022040500347},
visit_ids[1]: {2022040500348, 2022040500349},
},
)

Expand Down

0 comments on commit 5a12215

Please sign in to comment.