Skip to content

Commit

Permalink
Merge pull request #438 from lsst/tickets/DM-44488
Browse files Browse the repository at this point in the history
DM-44488: support code for error-handling in ctrl_mpexec
  • Loading branch information
TallJimbo authored Aug 28, 2024
2 parents fe85a52 + 80c412e commit be41127
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
continue
item.metadata.set_dict("failure", failure_info) # type: ignore

log.exception(
log.debug(
"Task failed with only partial outputs; see exception message for details.",
exc_info=error,
)
Expand Down
28 changes: 24 additions & 4 deletions python/lsst/pipe/base/tests/mocks/_pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar

from astropy.units import Quantity
from lsst.daf.butler import DataCoordinate, DatasetRef, DeferredDatasetHandle, SerializedDatasetType
from lsst.daf.butler import DataCoordinate, DatasetRef, DeferredDatasetHandle, Quantum, SerializedDatasetType
from lsst.pex.config import Config, ConfigDictField, ConfigurableField, Field, ListField
from lsst.utils.doImport import doImportType
from lsst.utils.introspection import get_full_type_name
from lsst.utils.iteration import ensure_iterable

from ... import connectionTypes as cT
from ..._status import AnnotatedPartialOutputsError, RepeatableQuantumError
from ...config import PipelineTaskConfig
from ...connections import InputQuantizedConnection, OutputQuantizedConnection, PipelineTaskConnections
from ...pipeline_graph import PipelineGraph
Expand Down Expand Up @@ -291,18 +292,18 @@ def runQuantum(
# Possibly raise an exception.
if self.data_id_match is not None and self.data_id_match.match(quantum.dataId):
assert self.fail_exception is not None, "Exception type must be defined"
message = f"Simulated failure: task={self.getName()} dataId={quantum.dataId}"

if self.memory_required is not None:
if butlerQC.resources.max_mem < self.memory_required:
_LOG.info(
"Simulating out-of-memory failure for task '%s' on quantum %s",
self.getName(),
quantum.dataId,
)
raise self.fail_exception(message)
self._fail(quantum)
else:
_LOG.info("Simulating failure of task '%s' on quantum %s", self.getName(), quantum.dataId)
raise self.fail_exception(message)
self._fail(quantum)

# Populate the bit of provenance we store in all outputs.
_LOG.info("Reading input data for task '%s' on quantum %s", self.getName(), quantum.dataId)
Expand Down Expand Up @@ -351,6 +352,25 @@ def runQuantum(

_LOG.info("Finished mocking task '%s' on quantum %s", self.getName(), quantum.dataId)

def _fail(self, quantum: Quantum) -> None:
"""Raise the configured exception.
Parameters
----------
quantum : `lsst.daf.butler.Quantum`
Quantum producing the error.
"""
message = f"Simulated failure: task={self.getName()} dataId={quantum.dataId}"
if self.fail_exception is AnnotatedPartialOutputsError:
# This exception is expected to always chain another.
try:
raise RepeatableQuantumError(message)
except RepeatableQuantumError as err:
raise AnnotatedPartialOutputsError() from err
else:
assert self.fail_exception is not None, "Method should not be called."
raise self.fail_exception(message)


class MockPipelineDefaultTargetConnections(PipelineTaskConnections, dimensions=()):
pass
Expand Down
4 changes: 2 additions & 2 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def test_annotate_exception(self):
task = AddMultTask()
msg = "something failed!"
error = ValueError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
with self.assertLogs("addMult", level="DEBUG") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
Expand All @@ -346,7 +346,7 @@ def metadata(self):
task = AddMultTask()
msg = "something failed!"
error = TestError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
with self.assertLogs("addMult", level="DEBUG") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
Expand Down

0 comments on commit be41127

Please sign in to comment.