Skip to content

Commit

Permalink
FIX linter
Browse files Browse the repository at this point in the history
  • Loading branch information
tomMoral committed Apr 9, 2024
1 parent aaf6100 commit c8105f7
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 34 deletions.
1 change: 1 addition & 0 deletions loky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
:class:`ProcessPoolExecutor` and a function :func:`get_reusable_executor` which
hide the pool management under the hood.
"""

from concurrent.futures import (
ALL_COMPLETED,
FIRST_COMPLETED,
Expand Down
16 changes: 7 additions & 9 deletions loky/process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ def get_worker_rank():


def set_worker_rank(pid, rank_mapper):
"""Set worker's rank and world size from the process pid and an rank_mapper.
"""
"""Set worker's rank and world size from the process pid and an rank_mapper."""
global _WORKER_RANK, _WORKER_WORLD
if pid in rank_mapper:
_WORKER_RANK = rank_mapper[pid]
_WORKER_WORLD = rank_mapper['world']
_WORKER_WORLD = rank_mapper["world"]


class _ThreadWakeup:
Expand Down Expand Up @@ -413,7 +412,7 @@ def _process_worker(
timeout,
worker_exit_lock,
current_depth,
rank_mapper
rank_mapper,
):
"""Evaluates calls from call_queue and places the results in result_queue.
Expand Down Expand Up @@ -486,7 +485,7 @@ def _process_worker(
if call_item is None:
# Notify queue management thread about worker shutdown
result_queue.put(pid)

is_clean = worker_exit_lock.acquire(True, timeout=30)

# Early notify any loky executor running in this worker process
Expand Down Expand Up @@ -1064,7 +1063,6 @@ class TerminatedWorkerError(BrokenProcessPool):


class ShutdownExecutorError(RuntimeError):

"""
Raised when a ProcessPoolExecutor is shutdown while a future was in the
running or pending state.
Expand Down Expand Up @@ -1240,11 +1238,11 @@ def _start_executor_manager_thread(self):
def _adjust_process_count(self):
# Compute available worker ranks for newly spawned workers
given_ranks = set(
v for k, v in self._rank_mapper.items() if k != 'world'
v for k, v in self._rank_mapper.items() if k != "world"
)
all_ranks = set(range(self._max_workers))
available_ranks = all_ranks - given_ranks

while len(self._processes) < self._max_workers:
worker_exit_lock = self._context.BoundedSemaphore(1)
rank = available_ranks.pop()
Expand Down Expand Up @@ -1277,7 +1275,7 @@ def _adjust_process_count(self):
# They will be passed to the workers when sending the tasks with
# the CallItem.
for pid, rank in list(self._rank_mapper.items()):
if pid != 'world' and rank >= self._max_workers:
if pid != "world" and rank >= self._max_workers:
self._rank_mapper[pid] = available_ranks.pop()
mp.util.debug(
f"Adjusted process count to {self._max_workers}: "
Expand Down
4 changes: 2 additions & 2 deletions loky/reusable_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,13 @@ def _resize(self, max_workers):
# then no processes have been spawned and we can just
# update _max_workers and return
self._max_workers = max_workers
self._rank_mapper['world'] = max_workers
self._rank_mapper["world"] = max_workers
return

self._wait_job_completion()

# Set the new size to be broadcasted to the workers
self._rank_mapper['world'] = max_workers
self._rank_mapper["world"] = max_workers

# Some process might have returned due to timeout so check how many
# children are still alive. Use the _process_management_lock to
Expand Down
26 changes: 14 additions & 12 deletions tests/_test_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,6 @@ def test_no_failure_on_large_data_send(self):
@pytest.mark.skipif(
sys.version_info >= (3, 8),
reason="Python version supports pickling objects of size > 2 ** 31GB",
)
def test_expected_failure_on_large_data_send(self):
data = b"\x00" * int(2.2e9)
Expand Down Expand Up @@ -1130,25 +1129,28 @@ def test_child_env_executor(self):

@staticmethod
def _worker_rank(x):
time.sleep(.2)
time.sleep(0.2)
rank, world = loky.get_worker_rank()
return dict(pid=os.getpid(), name=mp.current_process().name,
rank=rank, world=world)
return dict(
pid=os.getpid(),
name=mp.current_process().name,
rank=rank,
world=world,
)

@pytest.mark.parametrize('max_workers', [1, 5, 13])
@pytest.mark.parametrize('timeout', [None, 0.01])
@pytest.mark.parametrize("max_workers", [1, 5, 13])
@pytest.mark.parametrize("timeout", [None, 0.01])
def test_workers_rank(self, max_workers, timeout):
executor = self.executor_type(max_workers, timeout=timeout)
results = executor.map(self._worker_rank, range(max_workers * 5))
workers_rank = {}
for f in results:
assert f['world'] == max_workers
rank = workers_rank.get(f['pid'], None)
assert rank is None or rank == f['rank']
workers_rank[f['pid']] = f['rank']
assert f["world"] == max_workers
rank = workers_rank.get(f["pid"], None)
assert rank is None or rank == f["rank"]
workers_rank[f["pid"]] = f["rank"]
assert set(workers_rank.values()) == set(range(max_workers)), (
', '.join('{}: {}'.format(k, v)
for k, v in executor._rank_mapper.items())
", ".join(f"{k}: {v}") for k, v in executor._rank_mapper.items()
)
executor.shutdown(wait=True, kill_workers=True)

Expand Down
2 changes: 0 additions & 2 deletions tests/test_loky_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from loky.backend.context import _cpu_count_user, _MAX_WINDOWS_WORKERS



def test_version():
assert hasattr(
loky, "__version__"
Expand Down Expand Up @@ -61,7 +60,6 @@ def test_cpu_count_os_sched_getaffinity():
except NotImplementedError:
pytest.skip()


res = check_output(
[
taskset_bin,
Expand Down
21 changes: 12 additions & 9 deletions tests/test_reusable_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,10 +685,14 @@ def test_resize_after_timeout(self):

@staticmethod
def _worker_rank(x):
time.sleep(.2)
time.sleep(0.2)
rank, world = get_worker_rank()
return dict(pid=os.getpid(), name=mp.current_process().name,
rank=rank, world=world)
return dict(
pid=os.getpid(),
name=mp.current_process().name,
rank=rank,
world=world
)

def test_workers_rank_resize(self):

Expand All @@ -703,13 +707,12 @@ def test_workers_rank_resize(self):
executor.map(sleep, [0.01] * 6)
workers_rank = {}
for f in results:
assert f['world'] == size
rank = workers_rank.get(f['pid'], None)
assert rank is None or rank == f['rank']
workers_rank[f['pid']] = f['rank']
assert f["world"] == size
rank = workers_rank.get(f["pid"], None)
assert rank is None or rank == f["rank"]
workers_rank[f["pid"]] = f["rank"]
assert set(workers_rank.values()) == set(range(size)), (
', '.join('{}: {}'.format(k, v)
for k, v in executor._rank_mapper.items())
", ".join(f"{k}: {v}" for k, v in executor._rank_mapper.items())
)


Expand Down

0 comments on commit c8105f7

Please sign in to comment.