diff --git a/niworkflows/engine/plugin.py b/niworkflows/engine/plugin.py index 354d3bf5046..8c074e9cbe8 100644 --- a/niworkflows/engine/plugin.py +++ b/niworkflows/engine/plugin.py @@ -22,7 +22,6 @@ # """A lightweight NiPype MultiProc execution plugin.""" -# Import packages import os import sys from copy import deepcopy @@ -32,6 +31,8 @@ from traceback import format_exception import gc +from nipype.utils.misc import str2bool + # Run node def run_node(node, updatehash, taskid): @@ -239,8 +240,6 @@ def _clear_task(self, taskid): raise NotImplementedError def _clean_queue(self, jobid, graph, result=None): - from mriqc import config - if self._status_callback: self._status_callback(self.procs[jobid], "exception") if result is None: @@ -250,7 +249,7 @@ def _clean_queue(self, jobid, graph, result=None): } crashfile = self._report_crash(self.procs[jobid], result=result) - if config.nipype.stop_on_first_crash: + if str2bool(self._config["execution"]["stop_on_first_crash"]): raise RuntimeError("".join(result["traceback"])) if jobid in self.mapnodesubids: # remove current jobid @@ -292,9 +291,7 @@ def _submit_mapnode(self, jobid): return False def _local_hash_check(self, jobid, graph): - from mriqc import config - - if not config.nipype.local_hash_check: + if not str2bool(self.procs[jobid].config["execution"]["local_hash_check"]): return False try: @@ -368,9 +365,8 @@ def _remove_node_dirs(self): """Remove directories whose outputs have already been used up.""" import numpy as np from shutil import rmtree - from mriqc import config - if config.nipype.remove_node_directories: + if str2bool(self._config["execution"]["remove_node_directories"]): indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0] for idx in indices: if idx in self.mapnodesubids: @@ -413,8 +409,6 @@ def __init__(self, pool=None, plugin_args=None): A Nipype-compatible dictionary of settings. """ - from mriqc import config - super().__init__(plugin_args=plugin_args) self._taskresult = {} self._task_obj = {} @@ -424,6 +418,24 @@ def __init__(self, pool=None, plugin_args=None): # change to it when workers are set up self._cwd = os.getcwd() + # Retrieve a nipreps-style configuration object + try: + config = plugin_args["app_config"] + except (KeyError, TypeError): + from types import SimpleNamespace + from nipype.utils.profiler import get_system_total_memory_gb + + config = SimpleNamespace( + environment=SimpleNamespace( + # Nipype default + total_memory=get_system_total_memory_gb() + ), + # concurrent.futures default + _process_initializer=None, + # Just needs to exist + file_path=None, + ) + # Read in options or set defaults. self.processors = self.plugin_args.get("n_procs", mp.cpu_count()) self.memory_gb = self.plugin_args.get( diff --git a/niworkflows/engine/tests/test_plugin.py b/niworkflows/engine/tests/test_plugin.py new file mode 100644 index 00000000000..e17ecd3f903 --- /dev/null +++ b/niworkflows/engine/tests/test_plugin.py @@ -0,0 +1,99 @@ +import logging +from types import SimpleNamespace + +import pytest +from nipype.pipeline import engine as pe +from nipype.interfaces import utility as niu + +from ..plugin import MultiProcPlugin + + +def add(x, y): + return x + y + + +def addall(inlist): + import time + + time.sleep(0.2) # Simulate some work + return sum(inlist) + + +@pytest.fixture +def workflow(tmp_path): + workflow = pe.Workflow(name="test_wf", base_dir=tmp_path) + + inputnode = pe.Node(niu.IdentityInterface(fields=["x", "y"]), name="inputnode") + outputnode = pe.Node(niu.IdentityInterface(fields=["z"]), name="outputnode") + + # Generate many nodes and claim a lot of memory + add_nd = pe.MapNode( + niu.Function(function=add, input_names=["x", "y"], output_names=["z"]), + name="add", + iterfield=["x"], + mem_gb=0.8, + ) + + # Regular node + sum_nd = pe.Node(niu.Function(function=addall, input_names=["inlist"]), name="sum") + + # Run without submitting is another code path + add_more_nd = pe.Node( + niu.Function(function=add, input_names=["x", "y"], output_names=["z"]), + name="add_more", + run_without_submitting=True, + ) + + workflow.connect( + [ + (inputnode, add_nd, [("x", "x"), ("y", "y")]), + (add_nd, sum_nd, [("z", "inlist")]), + (sum_nd, add_more_nd, [("out", "x")]), + (inputnode, add_more_nd, [("y", "y")]), + (add_more_nd, outputnode, [("z", "z")]), + ] + ) + + inputnode.inputs.x = list(range(30)) + inputnode.inputs.y = 4 + + # Avoid unnecessary sleeps + workflow.config["execution"]["poll_sleep_duration"] = 0 + + return workflow + + +def test_plugin_defaults(workflow, caplog): + """Test the plugin works without any arguments.""" + caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + workflow.run(plugin=MultiProcPlugin()) + + +def test_plugin_args_noconfig(workflow, caplog): + """Test the plugin works with typical nipype arguments.""" + caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + workflow.run( + plugin=MultiProcPlugin(), + plugin_args={"n_procs": 2, "memory_gb": 0.1}, + ) + + +def test_plugin_app_config(workflow, caplog, capsys): + """Test the plugin works with a nipreps-style configuration.""" + + def init_print(): + print("Custom init") + + app_config = SimpleNamespace( + environment=SimpleNamespace(total_memory_gb=1), + _process_initializer=init_print(), + file_path='/does/not/need/to/exist/for/testing', + ) + caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + workflow.run( + plugin=MultiProcPlugin(), + plugin_args={"n_procs": 2, "app_config": app_config}, + ) + + captured = capsys.readouterr() + assert "Custom init" in captured.out