Skip to content

Commit

Permalink
Add pipeline subsetting control option.
Browse files Browse the repository at this point in the history
Add an option to pipeline subsetting which controls how named
subsets are handled when they contain a task label which is not
contained in the new subsetted pipeline.
  • Loading branch information
natelust committed Nov 30, 2023
1 parent cdd3437 commit d166f9b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 10 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-41203.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added an option to the interface for creating subsets of whole pipelines which allows control over how named subsets within the pipeline are modified when labels are missing from the new subsetted pipeline. The previous behavior is the new default, that is to drop any named subsets within the pipeline that contain a task label for which there is no task with that label defined. The new option is to to edit each named subset to remove the extra label from the named subset, but otherwise leaving it in the new subsetted pipeline. The interface has been modified in `Pipeline` and also the lower level `PipelineIR`, though the latter should rarely be used directly. The new argument is implemented as an enum option, and can be most easily accessed from the `Pipeline` class as `Pipeline.PipelineSubsetCtrl.(DROP/EDIT)`
17 changes: 15 additions & 2 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class Pipeline:
A description of that this pipeline does.
"""

PipelineSubsetCtrl = pipelineIR.PipelineSubsetCtrl

def __init__(self, description: str):
pipeline_dict = {"description": description, "tasks": {}}
self._pipelineIR = pipelineIR.PipelineIR(pipeline_dict)
Expand Down Expand Up @@ -330,7 +332,11 @@ def from_uri(cls, uri: ResourcePathExpression) -> Pipeline:
pipeline = pipeline.subsetFromLabels(label_specifier)
return pipeline

def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
def subsetFromLabels(
self,
labelSpecifier: LabelSpecifier,
subsetCtrl: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP,
) -> Pipeline:
"""Subset a pipeline to contain only labels specified in labelSpecifier
Parameters
Expand All @@ -342,6 +348,13 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
-------
pipeline : `Pipeline`
A new pipeline object that is a subset of the old pipeline
subsetCtrl : `PipelineSubsetCtrl`
Control object which decides how subsets with missing labels are
handled. Setting to `PipelineSubsetCtrl.DROP` (the default) will
cause any subsets that have labels which are not in the set of all
task labels to be dropped. Setting to `PipelineSubsetCtrl.EDIT` will

Check failure on line 355 in python/lsst/pipe/base/pipeline.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

W505

doc line too long (80 > 79 characters)
cause the subset to instead be edited to remove the nonexistent
label.
Raises
------
Expand Down Expand Up @@ -394,7 +407,7 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
labelSet.add(label)
if labelSpecifier.end is not None and label == labelSpecifier.end:
break
return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet))
return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet, subsetCtrl))

@staticmethod
def _parse_file_specifier(uri: ResourcePathExpression) -> tuple[ResourcePath, LabelSpecifier | None]:
Expand Down
38 changes: 31 additions & 7 deletions python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@
from lsst.utils.introspection import find_outside_stacklevel


class PipelineSubsetCtrl(enum.Enum):
DROP = enum.auto()
"""Drop any subsets that contain labels which are no longer in the set of
task labels when subsetting an entire pipeline
"""
EDIT = enum.auto()
"""Edit any subsets that contain labels which are no longer in the set of
task labels to remove the missing label, but leave the subset when
subsetting a pipeline.
"""


class _Tags(enum.Enum):
KeepInstrument = enum.auto()

Expand Down Expand Up @@ -807,14 +819,23 @@ def _remove_contracts(self, label: str) -> None:
new_contracts.append(contract)
self.contracts = new_contracts

def subset_from_labels(self, labelSpecifier: set[str]) -> PipelineIR:
def subset_from_labels(
self, labelSpecifier: set[str], subsetCtrl: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP
) -> PipelineIR:
"""Subset a pipelineIR to contain only labels specified in
labelSpecifier.
Parameters
----------
labelSpecifier : `set` of `str`
set containing labels that describes how to subset a pipeline.
Set containing labels that describes how to subset a pipeline.
subsetCtrl : `PipelineSubsetCtrl`
Control object which decides how subsets with missing labels are
handled. Setting to `PipelineSubsetCtrl.DROP` (the default) will
cause any subsets that have labels which are not in the set of all
task labels to be dropped. Setting to `PipelineSubsetCtrl.EDIT` will

Check failure on line 836 in python/lsst/pipe/base/pipelineIR.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

W505

doc line too long (80 > 79 characters)
cause the subset to instead be edited to remove the nonexistent
label.
Returns
-------
Expand All @@ -831,9 +852,7 @@ def subset_from_labels(self, labelSpecifier: set[str]) -> PipelineIR:
This method attempts to prune any contracts that contain labels which
are not in the declared subset of labels. This pruning is done using a
string based matching due to the nature of contracts and may prune more
than it should. Any labeled subsets defined that no longer have all
members of the subset present in the pipeline will be removed from the
resulting pipeline.
than it should.
"""
pipeline = copy.deepcopy(self)

Expand Down Expand Up @@ -867,8 +886,13 @@ def subset_from_labels(self, labelSpecifier: set[str]) -> PipelineIR:
labeled_subsets = copy.copy(pipeline.labeled_subsets)
# remove any labeled subsets that no longer have a complete set
for label, labeled_subset in labeled_subsets.items():
if labeled_subset.subset - pipeline.tasks.keys():
pipeline.labeled_subsets.pop(label)
if extraTaskLabels := (labeled_subset.subset - pipeline.tasks.keys()):
match subsetCtrl:
case PipelineSubsetCtrl.DROP:
pipeline.labeled_subsets.pop(label)
case PipelineSubsetCtrl.EDIT:
for extra in extraTaskLabels:
labeled_subset.subset.discard(extra)

return pipeline

Expand Down
36 changes: 35 additions & 1 deletion tests/test_pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import unittest

import lsst.utils.tests
from lsst.pipe.base.pipelineIR import ConfigIR, PipelineIR
from lsst.pipe.base.pipelineIR import ConfigIR, PipelineIR, PipelineSubsetCtrl

# Find where the test pipelines exist and store it in an environment variable.
os.environ["TESTDIR"] = os.path.dirname(__file__)
Expand Down Expand Up @@ -529,6 +529,40 @@ def testReadNamedSubsets(self):
with self.assertRaises(ValueError):
PipelineIR.from_string(pipeline_str)

def testSubsetttingPipeline(self):
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
tasks:
modA: test.modA
modB:
class: test.modB
modC: test.modC
modD: test.modD
subsets:
subset1:
- modA
- modB
subset2:
subset:
- modC
- modD
description: "A test named subset"
"""
)
pipeline = PipelineIR.from_string(pipeline_str)
# verify that creating a pipeline subset with the default drop behavior
# removes any labeled subset that contains a label not in the set of
# all task labels.
pipelineSubset1 = pipeline.subset_from_labels({"modA", "modB", "modC"})
self.assertEqual(pipelineSubset1.labeled_subsets.keys(), {"subset1"})
# verify that creating a pipeline subset with the edit behavior
# edits any labeled subset that contains a label not in the set of
# all task labels.
pipelineSubset2 = pipeline.subset_from_labels({"modA", "modB", "modC"}, PipelineSubsetCtrl.EDIT)
self.assertEqual(pipelineSubset2.labeled_subsets.keys(), {"subset1", "subset2"})
self.assertEqual(pipelineSubset2.labeled_subsets["subset2"].subset, {"modC"})

def testInstrument(self):
# Verify that if instrument is defined it is parsed out
pipeline_str = textwrap.dedent(
Expand Down

0 comments on commit d166f9b

Please sign in to comment.