diff --git a/changes.d/5821.fix.md b/changes.d/5821.fix.md
new file mode 100644
index 00000000000..0c6c8b7918d
--- /dev/null
+++ b/changes.d/5821.fix.md
@@ -0,0 +1 @@
+Fixed issue where large uncommitted changes could cause `cylc install` to hang.
diff --git a/cylc/flow/install_plugins/log_vc_info.py b/cylc/flow/install_plugins/log_vc_info.py
index d2379c5cc0d..29d861f7654 100644
--- a/cylc/flow/install_plugins/log_vc_info.py
+++ b/cylc/flow/install_plugins/log_vc_info.py
@@ -63,12 +63,22 @@
from pathlib import Path
from subprocess import Popen, DEVNULL, PIPE
from typing import (
- Any, Dict, Iterable, List, Optional, TYPE_CHECKING, TextIO, Union, overload
+ Any,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ TYPE_CHECKING,
+ TextIO,
+ Union,
+ overload,
)
from cylc.flow import LOG as _LOG, LoggerAdaptor
from cylc.flow.exceptions import CylcError
import cylc.flow.flags
+from cylc.flow.pipe_poller import pipe_poller
+from cylc.flow.util import format_cmd
from cylc.flow.workflow_files import WorkflowFiles
if TYPE_CHECKING:
@@ -171,7 +181,7 @@ def get_vc_info(path: Union[Path, str]) -> Optional[Dict[str, Any]]:
):
LOG.debug(f"Source dir {path} is not a {vcs} repository")
elif cylc.flow.flags.verbosity > -1:
- LOG.warning(f"$ {vcs} {' '.join(args)}\n{exc}")
+ LOG.warning(f"$ {vcs} {format_cmd(args)}\n{exc}")
continue
info['version control system'] = vcs
@@ -217,9 +227,7 @@ def _run_cmd(
args: The args to pass to the version control command.
cwd: Directory to run the command in.
stdout: Where to redirect output (either PIPE or a
- text stream/file object). Note: only use PIPE for
- commands that will not generate a large output, otherwise
- the pipe might get blocked.
+ text stream/file object).
Returns:
Stdout output if stdout=PIPE, else None as the output has been
@@ -231,6 +239,7 @@ def _run_cmd(
OSError: Non-zero return code for VCS command.
"""
cmd = [vcs, *args]
+ LOG.debug(f'$ {format_cmd(cmd)}')
try:
proc = Popen( # nosec
cmd,
@@ -245,13 +254,15 @@ def _run_cmd(
# This will only be raised if the VCS command is not installed,
# otherwise Popen() will succeed with a non-zero return code
raise VCSNotInstalledError(vcs, exc)
- ret_code = proc.wait()
- out, err = proc.communicate()
- if ret_code:
+ if stdout == PIPE:
+ out, err = pipe_poller(proc, proc.stdout, proc.stderr)
+ else:
+ out, err = proc.communicate()
+ if proc.returncode:
if any(err.lower().startswith(msg) for msg in NO_BASE_ERRS[vcs]):
# No base commit in repo
raise VCSMissingBaseError(vcs, cwd)
- raise OSError(ret_code, err)
+ raise OSError(proc.returncode, err)
return out
diff --git a/cylc/flow/pipe_poller.py b/cylc/flow/pipe_poller.py
new file mode 100644
index 00000000000..d769f77c469
--- /dev/null
+++ b/cylc/flow/pipe_poller.py
@@ -0,0 +1,73 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Utility for preventing pipes from getting clogged up.
+
+If you're reading files from Popen (i.e. to extract command output) where the
+command output has the potential to be long-ish, then you should use this
+function to protect against the buffer filling up.
+
+Note, there is a more advanced version of this baked into the subprocpool.
+"""
+
+from select import select
+
+
+def pipe_poller(proc, *files, chunk_size=4096):
+ """Read from a process without hitting buffer issues.
+
+ Standin for subprocess.Popen.communicate.
+
+ When PIPE'ing from subprocesses, the output goes into a buffer. If the
+ buffer gets full, the subprocess will hang trying to write to it.
+
+ This function polls the process, reading output from the buffers into
+ memory to prevent them from filling up.
+
+ Args:
+ proc:
+ The process to poll.
+ files:
+ The files you want to read from, likely anything you've directed to
+ PIPE.
+ chunk_size:
+ The amount of text to read from the buffer on each pass.
+
+ Returns:
+ tuple - The text read from each of the files in the order they were
+ specified.
+
+ """
+ _files = {
+ file: b'' if 'b' in getattr(file, 'mode', 'r') else ''
+ for file in files
+ }
+
+ def _read(timeout=1.0):
+ # read any data from files
+ nonlocal chunk_size, files
+ for file in select(list(files), [], [], timeout)[0]:
+ buffer = file.read(chunk_size)
+ if len(buffer) > 0:
+ _files[file] += buffer
+
+ while proc.poll() is None:
+ # read from the buffers
+ _read()
+ # double check the buffers now that the process has finished
+ _read(timeout=0.01)
+
+ return tuple(_files.values())
diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py
index d0e82995898..185edee4f2b 100644
--- a/cylc/flow/task_queues/independent.py
+++ b/cylc/flow/task_queues/independent.py
@@ -18,7 +18,7 @@
from collections import deque
from contextlib import suppress
-from typing import List, Set, Dict, Counter, Any, TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Set, Dict, Counter, Any
from cylc.flow.task_queues import TaskQueueManagerBase
@@ -40,11 +40,11 @@ def push_task(self, itask: 'TaskProxy') -> None:
if itask.tdef.name in self.members:
self.deque.appendleft(itask)
- def release(self, active: Counter[str]) -> 'List[TaskProxy]':
+ def release(self, active: Counter[str]) -> List['TaskProxy']:
"""Release tasks if below the active limit."""
# The "active" argument counts active tasks by name.
- released: 'List[TaskProxy]' = []
- held: 'List[TaskProxy]' = []
+ released: List['TaskProxy'] = []
+ held: List['TaskProxy'] = []
n_active: int = 0
for mem in self.members:
n_active += active[mem]
@@ -113,20 +113,20 @@ def __init__(self,
config["limit"], config["members"]
)
- self.force_released: 'Set[TaskProxy]' = set()
+ self.force_released: Set['TaskProxy'] = set()
def push_task(self, itask: 'TaskProxy') -> None:
"""Push a task to the appropriate queue."""
for queue in self.queues.values():
queue.push_task(itask)
- def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]':
+ def release_tasks(self, active: Counter[str]) -> List['TaskProxy']:
"""Release tasks up to the queue limits."""
- released: 'List[TaskProxy]' = []
+ released: List['TaskProxy'] = []
for queue in self.queues.values():
released += queue.release(active)
if self.force_released:
- released += list(self.force_released)
+ released.extend(self.force_released)
self.force_released = set()
return released
diff --git a/tests/unit/test_indep_task_queues.py b/tests/unit/test_indep_task_queues.py
index bb11fbce463..d144f58eecf 100644
--- a/tests/unit/test_indep_task_queues.py
+++ b/tests/unit/test_indep_task_queues.py
@@ -21,8 +21,8 @@
import pytest
+from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_queues.independent import IndepQueueManager
-from cylc.flow.task_state import TASK_STATUS_PREPARING
MEMBERS = {"a", "b", "c", "d", "e", "f"}
@@ -61,9 +61,7 @@
@pytest.mark.parametrize(
- "active,"
- "expected_released,"
- "expected_foo_groups",
+ "active, expected_released, expected_foo_groups",
[
(
Counter(["b1", "b2", "s1", "o1"]),
@@ -73,29 +71,24 @@
]
)
def test_queue_and_release(
- active,
- expected_released,
- expected_foo_groups):
+ active,
+ expected_released,
+ expected_foo_groups
+):
"""Test task queue and release."""
# configure the queue
queue_mgr = IndepQueueManager(QCONFIG, ALL_TASK_NAMES, DESCENDANTS)
# add newly ready tasks to the queue
for name in READY_TASK_NAMES:
- itask = Mock()
+ itask = Mock(spec=TaskProxy)
itask.tdef.name = name
itask.state.is_held = False
queue_mgr.push_task(itask)
# release tasks, given current active task counter
released = queue_mgr.release_tasks(active)
- assert sorted([r.tdef.name for r in released]) == sorted(expected_released)
-
- # check released tasks change state to "preparing", and not is_queued
- # Commented out pending https://github.com/cylc/cylc-flow/issues/5812
- # for r in released:
- # assert r.state.reset.called_with(TASK_STATUS_PREPARING)
- # assert r.state.reset.called_with(is_queued=False)
+ assert sorted(r.tdef.name for r in released) == sorted(expected_released)
# check that adopted orphans end up in the default queue
orphans = ["orphan1", "orphan2"]
diff --git a/tests/unit/test_pipe_poller.py b/tests/unit/test_pipe_poller.py
new file mode 100644
index 00000000000..a41e9635c6b
--- /dev/null
+++ b/tests/unit/test_pipe_poller.py
@@ -0,0 +1,33 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from subprocess import Popen, PIPE
+
+from cylc.flow.pipe_poller import pipe_poller
+
+
+def test_pipe_poller_str():
+ proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=True)
+ (stdout,) = pipe_poller(proc, proc.stdout)
+ assert proc.returncode == 0
+ assert stdout == 'Hello World!\n'
+
+
+def test_pipe_poller_bytes():
+ proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=False)
+ (stdout,) = pipe_poller(proc, proc.stdout)
+ assert proc.returncode == 0
+ assert stdout == b'Hello World!\n'