Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🤖 Merge 8.2.x-sync into master #5834

Merged
merged 5 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5821.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed issue where large uncommitted changes could cause `cylc install` to hang.
29 changes: 20 additions & 9 deletions cylc/flow/install_plugins/log_vc_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,22 @@
from pathlib import Path
from subprocess import Popen, DEVNULL, PIPE
from typing import (
Any, Dict, Iterable, List, Optional, TYPE_CHECKING, TextIO, Union, overload
Any,
Dict,
Iterable,
List,
Optional,
TYPE_CHECKING,
TextIO,
Union,
overload,
)

from cylc.flow import LOG as _LOG, LoggerAdaptor
from cylc.flow.exceptions import CylcError
import cylc.flow.flags
from cylc.flow.pipe_poller import pipe_poller
from cylc.flow.util import format_cmd
from cylc.flow.workflow_files import WorkflowFiles

if TYPE_CHECKING:
Expand Down Expand Up @@ -171,7 +181,7 @@ def get_vc_info(path: Union[Path, str]) -> Optional[Dict[str, Any]]:
):
LOG.debug(f"Source dir {path} is not a {vcs} repository")
elif cylc.flow.flags.verbosity > -1:
LOG.warning(f"$ {vcs} {' '.join(args)}\n{exc}")
LOG.warning(f"$ {vcs} {format_cmd(args)}\n{exc}")
continue

info['version control system'] = vcs
Expand Down Expand Up @@ -217,9 +227,7 @@ def _run_cmd(
args: The args to pass to the version control command.
cwd: Directory to run the command in.
stdout: Where to redirect output (either PIPE or a
text stream/file object). Note: only use PIPE for
commands that will not generate a large output, otherwise
the pipe might get blocked.
text stream/file object).

Returns:
Stdout output if stdout=PIPE, else None as the output has been
Expand All @@ -231,6 +239,7 @@ def _run_cmd(
OSError: Non-zero return code for VCS command.
"""
cmd = [vcs, *args]
LOG.debug(f'$ {format_cmd(cmd)}')
try:
proc = Popen( # nosec
cmd,
Expand All @@ -245,13 +254,15 @@ def _run_cmd(
# This will only be raised if the VCS command is not installed,
# otherwise Popen() will succeed with a non-zero return code
raise VCSNotInstalledError(vcs, exc)
ret_code = proc.wait()
out, err = proc.communicate()
if ret_code:
if stdout == PIPE:
out, err = pipe_poller(proc, proc.stdout, proc.stderr)
else:
out, err = proc.communicate()
if proc.returncode:
if any(err.lower().startswith(msg) for msg in NO_BASE_ERRS[vcs]):
# No base commit in repo
raise VCSMissingBaseError(vcs, cwd)
raise OSError(ret_code, err)
raise OSError(proc.returncode, err)
return out


Expand Down
73 changes: 73 additions & 0 deletions cylc/flow/pipe_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Utility for preventing pipes from getting clogged up.

If you're reading files from Popen (i.e. to extract command output) where the
command output has the potential to be long-ish, then you should use this
function to protect against the buffer filling up.

Note, there is a more advanced version of this baked into the subprocpool.
"""

from select import select


def pipe_poller(proc, *files, chunk_size=4096):
"""Read from a process without hitting buffer issues.

Standin for subprocess.Popen.communicate.

When PIPE'ing from subprocesses, the output goes into a buffer. If the
buffer gets full, the subprocess will hang trying to write to it.

This function polls the process, reading output from the buffers into
memory to prevent them from filling up.

Args:
proc:
The process to poll.
files:
The files you want to read from, likely anything you've directed to
PIPE.
chunk_size:
The amount of text to read from the buffer on each pass.

Returns:
tuple - The text read from each of the files in the order they were
specified.

"""
_files = {
file: b'' if 'b' in getattr(file, 'mode', 'r') else ''
for file in files
}

def _read(timeout=1.0):
# read any data from files
nonlocal chunk_size, files
for file in select(list(files), [], [], timeout)[0]:
buffer = file.read(chunk_size)
if len(buffer) > 0:
_files[file] += buffer

while proc.poll() is None:
# read from the buffers
_read()
# double check the buffers now that the process has finished
_read(timeout=0.01)

return tuple(_files.values())
16 changes: 8 additions & 8 deletions cylc/flow/task_queues/independent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from collections import deque
from contextlib import suppress
from typing import List, Set, Dict, Counter, Any, TYPE_CHECKING
from typing import TYPE_CHECKING, List, Set, Dict, Counter, Any

from cylc.flow.task_queues import TaskQueueManagerBase

Expand All @@ -40,11 +40,11 @@ def push_task(self, itask: 'TaskProxy') -> None:
if itask.tdef.name in self.members:
self.deque.appendleft(itask)

def release(self, active: Counter[str]) -> 'List[TaskProxy]':
def release(self, active: Counter[str]) -> List['TaskProxy']:
"""Release tasks if below the active limit."""
# The "active" argument counts active tasks by name.
released: 'List[TaskProxy]' = []
held: 'List[TaskProxy]' = []
released: List['TaskProxy'] = []
held: List['TaskProxy'] = []
n_active: int = 0
for mem in self.members:
n_active += active[mem]
Expand Down Expand Up @@ -113,20 +113,20 @@ def __init__(self,
config["limit"], config["members"]
)

self.force_released: 'Set[TaskProxy]' = set()
self.force_released: Set['TaskProxy'] = set()

def push_task(self, itask: 'TaskProxy') -> None:
"""Push a task to the appropriate queue."""
for queue in self.queues.values():
queue.push_task(itask)

def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]':
def release_tasks(self, active: Counter[str]) -> List['TaskProxy']:
"""Release tasks up to the queue limits."""
released: 'List[TaskProxy]' = []
released: List['TaskProxy'] = []
for queue in self.queues.values():
released += queue.release(active)
if self.force_released:
released += list(self.force_released)
released.extend(self.force_released)
self.force_released = set()
return released

Expand Down
23 changes: 8 additions & 15 deletions tests/unit/test_indep_task_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import pytest

from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_queues.independent import IndepQueueManager
from cylc.flow.task_state import TASK_STATUS_PREPARING


MEMBERS = {"a", "b", "c", "d", "e", "f"}
Expand Down Expand Up @@ -61,9 +61,7 @@


@pytest.mark.parametrize(
"active,"
"expected_released,"
"expected_foo_groups",
"active, expected_released, expected_foo_groups",
[
(
Counter(["b1", "b2", "s1", "o1"]),
Expand All @@ -73,29 +71,24 @@
]
)
def test_queue_and_release(
active,
expected_released,
expected_foo_groups):
active,
expected_released,
expected_foo_groups
):
"""Test task queue and release."""
# configure the queue
queue_mgr = IndepQueueManager(QCONFIG, ALL_TASK_NAMES, DESCENDANTS)

# add newly ready tasks to the queue
for name in READY_TASK_NAMES:
itask = Mock()
itask = Mock(spec=TaskProxy)
itask.tdef.name = name
itask.state.is_held = False
queue_mgr.push_task(itask)

# release tasks, given current active task counter
released = queue_mgr.release_tasks(active)
assert sorted([r.tdef.name for r in released]) == sorted(expected_released)

# check released tasks change state to "preparing", and not is_queued
# Commented out pending https://github.com/cylc/cylc-flow/issues/5812
# for r in released:
# assert r.state.reset.called_with(TASK_STATUS_PREPARING)
# assert r.state.reset.called_with(is_queued=False)
assert sorted(r.tdef.name for r in released) == sorted(expected_released)

# check that adopted orphans end up in the default queue
orphans = ["orphan1", "orphan2"]
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/test_pipe_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from subprocess import Popen, PIPE

from cylc.flow.pipe_poller import pipe_poller


def test_pipe_poller_str():
proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=True)
(stdout,) = pipe_poller(proc, proc.stdout)
assert proc.returncode == 0
assert stdout == 'Hello World!\n'


def test_pipe_poller_bytes():
proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=False)
(stdout,) = pipe_poller(proc, proc.stdout)
assert proc.returncode == 0
assert stdout == b'Hello World!\n'
Loading