From ca55275bd7ca289b654e02aa452811affa4015cf Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Wed, 18 Jan 2023 10:49:41 -0500 Subject: [PATCH 01/15] Allow to spawn workers inside daemon --- src/orion/executor/multiprocess_backend.py | 16 ++++++-- .../client/test_experiment_client.py | 2 +- tests/unittests/executor/test_executor.py | 37 ++++++++++++++----- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/orion/executor/multiprocess_backend.py b/src/orion/executor/multiprocess_backend.py index 03cb557e7..cc772ba05 100644 --- a/src/orion/executor/multiprocess_backend.py +++ b/src/orion/executor/multiprocess_backend.py @@ -83,8 +83,8 @@ def Process(*args, **kwds): if v.major == 3 and v.minor >= 8: args = args[1:] - if Pool.ALLOW_DAEMON: - return Process(*args, **kwds) + if not Pool.ALLOW_DAEMON: + return PyPool.Process(*args, **kwds) return _Process(*args, **kwds) @@ -167,13 +167,21 @@ def __init__(self, n_workers=-1, backend="multiprocess", **kwargs): if n_workers <= 0: n_workers = multiprocessing.cpu_count() + self.pool_config = dict(n_workers=n_workers, backend=backend) self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers) def __setstate__(self, state): - self.pool = state["pool"] + log.warning("Nesting executor") + + self.pool_config = state["pool_config"] + + backend = self.pool_config.get("backend", ThreadPool) + n_workers = self.pool_config.get("n_workers", -1) + + self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers) def __getstate__(self): - return dict(pool=self.pool) + return dict(pool_config=self.pool_config) def __enter__(self): return self diff --git a/tests/unittests/client/test_experiment_client.py b/tests/unittests/client/test_experiment_client.py index 77aff7fad..448d348e0 100644 --- a/tests/unittests/client/test_experiment_client.py +++ b/tests/unittests/client/test_experiment_client.py @@ -1000,7 +1000,7 @@ def main(*args, **kwargs): def test_run_experiment_twice(): - """""" + """Makes sure the executor is not freed after workon""" with create_experiment(config, base_trial) as (cfg, experiment, client): client.workon(main, max_trials=10) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 855f61383..072514c3e 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -1,3 +1,4 @@ +import multiprocessing import time import pytest @@ -235,7 +236,7 @@ def nested(executor): return sum(f.get() for f in futures) -@pytest.mark.parametrize("backend", [xfail_dask_if_not_installed(Dask), SingleExecutor]) +@pytest.mark.parametrize("backend", backends) def test_nested_submit(backend): with backend(5) as executor: futures = [executor.submit(nested, executor) for i in range(5)] @@ -246,17 +247,35 @@ def test_nested_submit(backend): assert r.value == 35 -@pytest.mark.parametrize("backend", [multiprocess, thread]) -def test_nested_submit_failure(backend): +def inc(a): + return a + 1 + + +def nested_pool(): + data = [1, 2, 3, 4, 5, 6] + with multiprocessing.Pool(5) as p: + result = p.map_async(inc, data) + result.wait() + data = result.get() + + return sum(data) + + +@pytest.mark.parametrize("backend", backends) +def test_nested_submit_pool(backend): with backend(5) as executor: + futures = [executor.submit(nested_pool) for i in range(5)] + + results = executor.async_get(futures, timeout=2) + + for r in results: + assert r.value == 27 - if backend == multiprocess: - exception = NotImplementedError - elif backend == thread: - exception = TypeError - with pytest.raises(exception): - [executor.submit(nested, executor) for i in range(5)] +@pytest.mark.parametrize("backend", [multiprocess, thread]) +def test_nested_submit_failure(backend): + with backend(5) as executor: + [executor.submit(nested, executor) for i in range(5)] @pytest.mark.parametrize("executor", executors) From e4891a339c6ea6c2c0bc6af3e670bdd33fffbd2f Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Wed, 18 Jan 2023 10:58:29 -0500 Subject: [PATCH 02/15] - --- src/orion/executor/multiprocess_backend.py | 9 +++------ tests/unittests/executor/test_executor.py | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/orion/executor/multiprocess_backend.py b/src/orion/executor/multiprocess_backend.py index cc772ba05..d4c115b9e 100644 --- a/src/orion/executor/multiprocess_backend.py +++ b/src/orion/executor/multiprocess_backend.py @@ -167,21 +167,18 @@ def __init__(self, n_workers=-1, backend="multiprocess", **kwargs): if n_workers <= 0: n_workers = multiprocessing.cpu_count() - self.pool_config = dict(n_workers=n_workers, backend=backend) + self.pool_config = {"n_workers": n_workers, "backend": backend} self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers) def __setstate__(self, state): - log.warning("Nesting executor") - + log.warning("Nesting multiprocess executor") self.pool_config = state["pool_config"] - backend = self.pool_config.get("backend", ThreadPool) n_workers = self.pool_config.get("n_workers", -1) - self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers) def __getstate__(self): - return dict(pool_config=self.pool_config) + return {"pool_config": self.pool_config} def __enter__(self): return self diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 072514c3e..aaad58d94 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -273,7 +273,7 @@ def test_nested_submit_pool(backend): @pytest.mark.parametrize("backend", [multiprocess, thread]) -def test_nested_submit_failure(backend): +def test_nested_submit_works(backend): with backend(5) as executor: [executor.submit(nested, executor) for i in range(5)] From a218223ed817ec954f672af21cf1218070008bb9 Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Wed, 18 Jan 2023 16:13:59 -0500 Subject: [PATCH 03/15] Disable dask in nested multiproc test --- tests/unittests/executor/test_executor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index aaad58d94..7e9339444 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -263,6 +263,9 @@ def nested_pool(): @pytest.mark.parametrize("backend", backends) def test_nested_submit_pool(backend): + if backend is Dask: + pytest.skip("Dask does not support nesting") + with backend(5) as executor: futures = [executor.submit(nested_pool) for i in range(5)] From 1307ae806ee683f0d7923ac411840a4940dcaa7c Mon Sep 17 00:00:00 2001 From: Setepenre Date: Thu, 19 Jan 2023 13:55:41 -0500 Subject: [PATCH 04/15] Update multiprocess_backend.py --- src/orion/executor/multiprocess_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/orion/executor/multiprocess_backend.py b/src/orion/executor/multiprocess_backend.py index d4c115b9e..9c0f50c9b 100644 --- a/src/orion/executor/multiprocess_backend.py +++ b/src/orion/executor/multiprocess_backend.py @@ -173,7 +173,7 @@ def __init__(self, n_workers=-1, backend="multiprocess", **kwargs): def __setstate__(self, state): log.warning("Nesting multiprocess executor") self.pool_config = state["pool_config"] - backend = self.pool_config.get("backend", ThreadPool) + backend = self.pool_config.get("backend") n_workers = self.pool_config.get("n_workers", -1) self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers) From ad8ab820a7886bf331f318dead9812319900279d Mon Sep 17 00:00:00 2001 From: Setepenre Date: Thu, 19 Jan 2023 13:59:08 -0500 Subject: [PATCH 05/15] Update test_executor.py --- tests/unittests/executor/test_executor.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 7e9339444..2c2c5cbe4 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -275,12 +275,6 @@ def test_nested_submit_pool(backend): assert r.value == 27 -@pytest.mark.parametrize("backend", [multiprocess, thread]) -def test_nested_submit_works(backend): - with backend(5) as executor: - [executor.submit(nested, executor) for i in range(5)] - - @pytest.mark.parametrize("executor", executors) def test_executors_have_default_args(executor): From 1b94f7b8291211eb39614e1b18382c962ed36e6c Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Thu, 19 Jan 2023 14:16:14 -0500 Subject: [PATCH 06/15] Add pytorch manual test --- tests/unittests/executor/test_executor.py | 60 +++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 2c2c5cbe4..a0f5af72d 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -296,3 +296,63 @@ def test_executors_del_does_not_raise(backend): del executor.client del executor + + +def pytorch_workon(pid): + import torch + from torchvision import datasets, transforms + + transform = transforms.Compose( + [ + transforms.ToTensor(), + ] + ) + + dataset = datasets.MNIST( + f"../data/{pid}/", + train=True, + download=True, + transform=transform, + ) + + loader = torch.utils.data.DataLoader(dataset, num_workers=10, batch_size=64) + + for i in loader: + pass + + return i + + +def check_pytorch_dataloader(): + import sys + + failures = 0 + + backends = [ + thread, + multiprocess, + SingleExecutor, + Dask, + ] + + for executor_ctor in backends: + try: + with executor_ctor(10) as executor: + futures = [executor.submit(pytorch_workon, i) for i in range(5)] + + results = executor.async_get(futures, timeout=2) + + for r in results: + assert r.value == 27 + + print("[ OK]", executor_ctor) + + except Exception as err: + print("[ FAIL]", executor_ctor, err) + failures += 1 + + sys.exit(failures) + + +if __name__ == "__main__": + check_pytorch_dataloader() From 23ed92fd6a1effce5696a230047dba4cf5728011 Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Thu, 19 Jan 2023 14:42:33 -0500 Subject: [PATCH 07/15] Add pytorch just-in-case test --- src/orion/executor/multiprocess_backend.py | 2 +- tests/unittests/executor/test_executor.py | 30 +++++++++++++--------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/orion/executor/multiprocess_backend.py b/src/orion/executor/multiprocess_backend.py index 9c0f50c9b..93046d017 100644 --- a/src/orion/executor/multiprocess_backend.py +++ b/src/orion/executor/multiprocess_backend.py @@ -86,7 +86,7 @@ def Process(*args, **kwds): if not Pool.ALLOW_DAEMON: return PyPool.Process(*args, **kwds) - return _Process(*args, **kwds) + return _Process(*args, **kwds, daemon=False) def shutdown(self): # NB: https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index a0f5af72d..36beabb6d 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -252,6 +252,10 @@ def inc(a): def nested_pool(): + import multiprocessing.process as proc + + assert not proc._current_process._config.get("daemon") + data = [1, 2, 3, 4, 5, 6] with multiprocessing.Pool(5) as p: result = p.map_async(inc, data) @@ -299,6 +303,10 @@ def test_executors_del_does_not_raise(backend): def pytorch_workon(pid): + import multiprocessing.process as proc + + assert not proc._current_process._config.get("daemon") + import torch from torchvision import datasets, transforms @@ -308,16 +316,11 @@ def pytorch_workon(pid): ] ) - dataset = datasets.MNIST( - f"../data/{pid}/", - train=True, - download=True, - transform=transform, - ) + dataset = datasets.FakeData(128, transform=transform) - loader = torch.utils.data.DataLoader(dataset, num_workers=10, batch_size=64) + loader = torch.utils.data.DataLoader(dataset, num_workers=2, batch_size=64) - for i in loader: + for i, _ in enumerate(loader): pass return i @@ -325,29 +328,32 @@ def pytorch_workon(pid): def check_pytorch_dataloader(): import sys + import traceback failures = 0 backends = [ thread, - multiprocess, SingleExecutor, + multiprocess, + # Dask fails 100% Dask, ] for executor_ctor in backends: try: - with executor_ctor(10) as executor: - futures = [executor.submit(pytorch_workon, i) for i in range(5)] + with executor_ctor(2) as executor: + futures = [executor.submit(pytorch_workon, i) for i in range(2)] results = executor.async_get(futures, timeout=2) for r in results: - assert r.value == 27 + assert r.value == 1 print("[ OK]", executor_ctor) except Exception as err: + traceback.print_exc() print("[ FAIL]", executor_ctor, err) failures += 1 From 1db1e182ab1004dbc44b987f977673d91aa77551 Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Fri, 20 Jan 2023 12:36:23 -0500 Subject: [PATCH 08/15] Add pytorch test --- tests/unittests/executor/test_executor.py | 49 +++++++++-------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 36beabb6d..7a8af8822 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -268,7 +268,7 @@ def nested_pool(): @pytest.mark.parametrize("backend", backends) def test_nested_submit_pool(backend): if backend is Dask: - pytest.skip("Dask does not support nesting") + pytest.xfail("Dask does not support nesting") with backend(5) as executor: futures = [executor.submit(nested_pool) for i in range(5)] @@ -326,39 +326,28 @@ def pytorch_workon(pid): return i -def check_pytorch_dataloader(): - import sys - import traceback - - failures = 0 - - backends = [ - thread, - SingleExecutor, - multiprocess, - # Dask fails 100% - Dask, - ] - - for executor_ctor in backends: - try: - with executor_ctor(2) as executor: - futures = [executor.submit(pytorch_workon, i) for i in range(2)] +def has_pytorch(): + try: + pass - results = executor.async_get(futures, timeout=2) + return True + except: + return False - for r in results: - assert r.value == 1 - print("[ OK]", executor_ctor) +@pytest.mark.parametrize("backend", backends) +def test_pytorch_dataloader(backend): + if backend is Dask: + pytest.xfail("Dask does not support nesting") - except Exception as err: - traceback.print_exc() - print("[ FAIL]", executor_ctor, err) - failures += 1 + if not has_pytorch(): + pytest.skip("Pytorch is not installed skipping") + return - sys.exit(failures) + with backend(2) as executor: + futures = [executor.submit(pytorch_workon, i) for i in range(2)] + results = executor.async_get(futures, timeout=2) -if __name__ == "__main__": - check_pytorch_dataloader() + for r in results: + assert r.value == 1 From fccccda7cae9988aecb4efb1415f5b9ffafa665f Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Fri, 20 Jan 2023 12:37:29 -0500 Subject: [PATCH 09/15] - --- tests/unittests/executor/test_executor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 7a8af8822..19bdfa650 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -328,8 +328,6 @@ def pytorch_workon(pid): def has_pytorch(): try: - pass - return True except: return False From 3fcd5bf8aa7af9adbba51c5cb93682733d6bc819 Mon Sep 17 00:00:00 2001 From: Pierre Delaunay Date: Mon, 23 Jan 2023 09:19:52 -0500 Subject: [PATCH 10/15] - --- tests/unittests/executor/test_executor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 19bdfa650..7a8af8822 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -328,6 +328,8 @@ def pytorch_workon(pid): def has_pytorch(): try: + pass + return True except: return False From d7ffefaa148df49c41d588537dc6966ab96a8887 Mon Sep 17 00:00:00 2001 From: Setepenre Date: Tue, 15 Aug 2023 17:43:47 +0000 Subject: [PATCH 11/15] - --- tests/unittests/executor/test_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 7a8af8822..d441ceae5 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -328,7 +328,7 @@ def pytorch_workon(pid): def has_pytorch(): try: - pass + import torch return True except: From 7efdfbccefd77ed320fdf04458c344cd13e3a654 Mon Sep 17 00:00:00 2001 From: Setepenre Date: Tue, 15 Aug 2023 22:35:18 +0000 Subject: [PATCH 12/15] - --- tests/unittests/executor/test_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index d441ceae5..7a8af8822 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -328,7 +328,7 @@ def pytorch_workon(pid): def has_pytorch(): try: - import torch + pass return True except: From efaeb17e2c31889a0cfdaa357bef70ef3d6bfc93 Mon Sep 17 00:00:00 2001 From: Setepenre Date: Thu, 17 Aug 2023 14:05:08 +0000 Subject: [PATCH 13/15] - --- tests/unittests/executor/test_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 7a8af8822..d441ceae5 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -328,7 +328,7 @@ def pytorch_workon(pid): def has_pytorch(): try: - pass + import torch return True except: From f03d2fde423915064990912144f36e3fa42d7cb6 Mon Sep 17 00:00:00 2001 From: Setepenre Date: Thu, 17 Aug 2023 14:05:33 +0000 Subject: [PATCH 14/15] - --- tests/unittests/executor/test_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index d441ceae5..7a8af8822 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -328,7 +328,7 @@ def pytorch_workon(pid): def has_pytorch(): try: - import torch + pass return True except: From ec93ddb696902ede081badb430530a5b54c75d8e Mon Sep 17 00:00:00 2001 From: Setepenre Date: Thu, 17 Aug 2023 14:08:35 +0000 Subject: [PATCH 15/15] - --- tests/unittests/executor/test_executor.py | 25 +++++++++-------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/tests/unittests/executor/test_executor.py b/tests/unittests/executor/test_executor.py index 7a8af8822..42b71be15 100644 --- a/tests/unittests/executor/test_executor.py +++ b/tests/unittests/executor/test_executor.py @@ -1,4 +1,5 @@ import multiprocessing +import multiprocessing.process as proc import time import pytest @@ -8,6 +9,14 @@ from orion.executor.multiprocess_backend import PoolExecutor from orion.executor.single_backend import SingleExecutor +try: + import torch + from torchvision import datasets, transforms + + HAS_PYTORCH = True +except: + HAS_PYTORCH = False + def multiprocess(n): return PoolExecutor(n, "multiprocess") @@ -303,13 +312,8 @@ def test_executors_del_does_not_raise(backend): def pytorch_workon(pid): - import multiprocessing.process as proc - assert not proc._current_process._config.get("daemon") - import torch - from torchvision import datasets, transforms - transform = transforms.Compose( [ transforms.ToTensor(), @@ -326,21 +330,12 @@ def pytorch_workon(pid): return i -def has_pytorch(): - try: - pass - - return True - except: - return False - - @pytest.mark.parametrize("backend", backends) def test_pytorch_dataloader(backend): if backend is Dask: pytest.xfail("Dask does not support nesting") - if not has_pytorch(): + if not HAS_PYTORCH: pytest.skip("Pytorch is not installed skipping") return