Skip to content

Commit

Permalink
Merge pull request #876 from effigies/fix/nipype-plugin
Browse files Browse the repository at this point in the history
FIX: Remove accidental MRIQC dependency, allow app config to be passed to workflow plugin
  • Loading branch information
mgxd authored Jun 21, 2024
2 parents 699fa75 + 841dcf8 commit 44814ec
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 11 deletions.
34 changes: 23 additions & 11 deletions niworkflows/engine/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#
"""A lightweight NiPype MultiProc execution plugin."""

# Import packages
import os
import sys
from copy import deepcopy
Expand All @@ -32,6 +31,8 @@
from traceback import format_exception
import gc

from nipype.utils.misc import str2bool


# Run node
def run_node(node, updatehash, taskid):
Expand Down Expand Up @@ -239,8 +240,6 @@ def _clear_task(self, taskid):
raise NotImplementedError

def _clean_queue(self, jobid, graph, result=None):
from mriqc import config

if self._status_callback:
self._status_callback(self.procs[jobid], "exception")
if result is None:
Expand All @@ -250,7 +249,7 @@ def _clean_queue(self, jobid, graph, result=None):
}

crashfile = self._report_crash(self.procs[jobid], result=result)
if config.nipype.stop_on_first_crash:
if str2bool(self._config["execution"]["stop_on_first_crash"]):
raise RuntimeError("".join(result["traceback"]))
if jobid in self.mapnodesubids:
# remove current jobid
Expand Down Expand Up @@ -292,9 +291,7 @@ def _submit_mapnode(self, jobid):
return False

def _local_hash_check(self, jobid, graph):
from mriqc import config

if not config.nipype.local_hash_check:
if not str2bool(self.procs[jobid].config["execution"]["local_hash_check"]):
return False

try:
Expand Down Expand Up @@ -368,9 +365,8 @@ def _remove_node_dirs(self):
"""Remove directories whose outputs have already been used up."""
import numpy as np
from shutil import rmtree
from mriqc import config

if config.nipype.remove_node_directories:
if str2bool(self._config["execution"]["remove_node_directories"]):
indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]
for idx in indices:
if idx in self.mapnodesubids:
Expand Down Expand Up @@ -413,8 +409,6 @@ def __init__(self, pool=None, plugin_args=None):
A Nipype-compatible dictionary of settings.
"""
from mriqc import config

super().__init__(plugin_args=plugin_args)
self._taskresult = {}
self._task_obj = {}
Expand All @@ -424,6 +418,24 @@ def __init__(self, pool=None, plugin_args=None):
# change to it when workers are set up
self._cwd = os.getcwd()

# Retrieve a nipreps-style configuration object
try:
config = plugin_args["app_config"]
except (KeyError, TypeError):
from types import SimpleNamespace
from nipype.utils.profiler import get_system_total_memory_gb

config = SimpleNamespace(
environment=SimpleNamespace(
# Nipype default
total_memory=get_system_total_memory_gb()
),
# concurrent.futures default
_process_initializer=None,
# Just needs to exist
file_path=None,
)

# Read in options or set defaults.
self.processors = self.plugin_args.get("n_procs", mp.cpu_count())
self.memory_gb = self.plugin_args.get(
Expand Down
99 changes: 99 additions & 0 deletions niworkflows/engine/tests/test_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging
from types import SimpleNamespace

import pytest
from nipype.pipeline import engine as pe
from nipype.interfaces import utility as niu

from ..plugin import MultiProcPlugin


def add(x, y):
return x + y


def addall(inlist):
import time

time.sleep(0.2) # Simulate some work
return sum(inlist)


@pytest.fixture
def workflow(tmp_path):
workflow = pe.Workflow(name="test_wf", base_dir=tmp_path)

inputnode = pe.Node(niu.IdentityInterface(fields=["x", "y"]), name="inputnode")
outputnode = pe.Node(niu.IdentityInterface(fields=["z"]), name="outputnode")

# Generate many nodes and claim a lot of memory
add_nd = pe.MapNode(
niu.Function(function=add, input_names=["x", "y"], output_names=["z"]),
name="add",
iterfield=["x"],
mem_gb=0.8,
)

# Regular node
sum_nd = pe.Node(niu.Function(function=addall, input_names=["inlist"]), name="sum")

# Run without submitting is another code path
add_more_nd = pe.Node(
niu.Function(function=add, input_names=["x", "y"], output_names=["z"]),
name="add_more",
run_without_submitting=True,
)

workflow.connect(
[
(inputnode, add_nd, [("x", "x"), ("y", "y")]),
(add_nd, sum_nd, [("z", "inlist")]),
(sum_nd, add_more_nd, [("out", "x")]),
(inputnode, add_more_nd, [("y", "y")]),
(add_more_nd, outputnode, [("z", "z")]),
]
)

inputnode.inputs.x = list(range(30))
inputnode.inputs.y = 4

# Avoid unnecessary sleeps
workflow.config["execution"]["poll_sleep_duration"] = 0

return workflow


def test_plugin_defaults(workflow, caplog):
"""Test the plugin works without any arguments."""
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
workflow.run(plugin=MultiProcPlugin())


def test_plugin_args_noconfig(workflow, caplog):
"""Test the plugin works with typical nipype arguments."""
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
workflow.run(
plugin=MultiProcPlugin(),
plugin_args={"n_procs": 2, "memory_gb": 0.1},
)


def test_plugin_app_config(workflow, caplog, capsys):
"""Test the plugin works with a nipreps-style configuration."""

def init_print():
print("Custom init")

app_config = SimpleNamespace(
environment=SimpleNamespace(total_memory_gb=1),
_process_initializer=init_print(),
file_path='/does/not/need/to/exist/for/testing',
)
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
workflow.run(
plugin=MultiProcPlugin(),
plugin_args={"n_procs": 2, "app_config": app_config},
)

captured = capsys.readouterr()
assert "Custom init" in captured.out

0 comments on commit 44814ec

Please sign in to comment.