Skip to content

Commit

Permalink
🚸 Terminate early on failed computation (#14)
Browse files Browse the repository at this point in the history
* 🚸 Terminate early on failed computation

* ✅ Add test for failing imap

* ✅ Add test for failing map
  • Loading branch information
ddelange authored Oct 8, 2021
1 parent 64f7576 commit 3cc8cf7
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ignore = D10,E203,E501,W503
max-line-length = 88
inline-quotes = double
docstring-convention = google
max-cognitive-complexity = 10

[coverage:run]
branch = True
Expand Down
9 changes: 9 additions & 0 deletions src/mapply/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ def multiprocessing_imap(
Returns:
Results in same order as input iterable.
Raises:
Exception: Any error occurred during computation (will terminate the pool early).
"""
n_chunks: Optional[int] = tqdm(iterable, disable=True).__len__() # doesn't exhaust
func = partial(func, *args, **kwargs)
Expand All @@ -97,6 +100,12 @@ def multiprocessing_imap(

try:
return list(stage)
except Exception:
if pool:
logger.debug("Terminating ProcessPool")
pool.close()
pool.terminate()
raise
finally:
if pool:
logger.debug("Closing ProcessPool")
Expand Down
14 changes: 14 additions & 0 deletions tests/test_parallel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import pytest

from mapply.parallel import multiprocessing_imap


def foo(x, power):
if not isinstance(power, float):
raise ValueError("To check we reraise errors from subprocesses")
return pow(x, power)


Expand All @@ -19,3 +23,13 @@ def test_multiprocessing_imap(size=100, power=1.1):
assert multicore_list1 == multicore_list2
assert multicore_list1 == multicore_list3
assert multicore_list1 == [foo(x, power=power) for x in range(size)]
with pytest.raises(ValueError, match="reraise"):
# hit with ProcessPool
multiprocessing_imap(
foo, range(size), power=None, progressbar=False, n_workers=2
)
with pytest.raises(ValueError, match="reraise"):
# hit without ProcessPool
multiprocessing_imap(
foo, range(size), power=None, progressbar=False, n_workers=1
)

0 comments on commit 3cc8cf7

Please sign in to comment.