diff --git a/niworkflows/engine/plugin.py b/niworkflows/engine/plugin.py index 8c074e9cbe8..14c2547f0c3 100644 --- a/niworkflows/engine/plugin.py +++ b/niworkflows/engine/plugin.py @@ -401,6 +401,16 @@ def __init__(self, pool=None, plugin_args=None): """ Initialize the plugin. + If passed a nipreps-style configuration object in `plugin_args["app_config"]`, + the following fields must be present: + + app_config.environment.total_memory : :obj:`float` + Memory available to the workflow in gigabytes. + app_config._process_initializer : :obj:`callable` + A function that accepts a file path and returns None, to be run in each worker. + app_config.file_path : :obj:`str` + The path to a file that will be passed to the initializer. + Arguments --------- pool : :obj:`~concurrent.futures.Executor` diff --git a/niworkflows/engine/tests/test_plugin.py b/niworkflows/engine/tests/test_plugin.py index e17ecd3f903..6956ba19c6f 100644 --- a/niworkflows/engine/tests/test_plugin.py +++ b/niworkflows/engine/tests/test_plugin.py @@ -72,28 +72,28 @@ def test_plugin_defaults(workflow, caplog): def test_plugin_args_noconfig(workflow, caplog): """Test the plugin works with typical nipype arguments.""" caplog.set_level(logging.CRITICAL, logger="nipype.workflow") - workflow.run( - plugin=MultiProcPlugin(), - plugin_args={"n_procs": 2, "memory_gb": 0.1}, - ) + workflow.run(plugin=MultiProcPlugin(plugin_args={"n_procs": 2, "memory_gb": 0.1})) + +def touch_file(file_path: str) -> None: + """Module-level functions play more nicely with multiprocessing.""" + with open(file_path, "w") as f: + f.write("flag") -def test_plugin_app_config(workflow, caplog, capsys): + +def test_plugin_app_config(tmp_path, workflow, caplog): """Test the plugin works with a nipreps-style configuration.""" - def init_print(): - print("Custom init") + init_flag = tmp_path / "init_flag.txt" app_config = SimpleNamespace( - environment=SimpleNamespace(total_memory_gb=1), - _process_initializer=init_print(), - file_path='/does/not/need/to/exist/for/testing', + environment=SimpleNamespace(total_memory=1), + _process_initializer=touch_file, + file_path=str(init_flag), ) - caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + caplog.set_level(logging.INFO, logger="nipype.workflow") workflow.run( - plugin=MultiProcPlugin(), - plugin_args={"n_procs": 2, "app_config": app_config}, + plugin=MultiProcPlugin(plugin_args={"n_procs": 2, "app_config": app_config}) ) - captured = capsys.readouterr() - assert "Custom init" in captured.out + assert init_flag.exists() and init_flag.read_text() == "flag"