Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cylc play service #525

Merged
merged 5 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 20 additions & 53 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,6 @@ def snake_to_kebab(snake):
raise TypeError(type(snake))


def check_cylc_version(version):
"""Check the provided Cylc version is available on the CLI.

Sets CYLC_VERSION=version and tests the result of cylc --version
to make sure the requested version is installed and selectable via
the CYLC_VERSION environment variable.
"""
proc = Popen(
['cylc', '--version'],
env={**os.environ, 'CYLC_VERSION': version},
stdin=DEVNULL,
stdout=PIPE,
stderr=PIPE,
text=True
)
ret = proc.wait(timeout=5)
out, err = proc.communicate()
return ret or out.strip() == version


def _build_cmd(cmd: List, args: Dict) -> List:
"""Add args to command.

Expand Down Expand Up @@ -289,32 +269,19 @@ async def scan(
return cls._return("Scan requested")

@classmethod
async def play(cls, workflows, args, workflows_mgr, log):
async def play(
cls,
workflows: Iterable[Tokens],
args: Dict[str, Any],
workflows_mgr: 'WorkflowsManager',
log: 'Logger',
) -> List[Union[bool, str]]:
"""Calls `cylc play`."""
response = []
# get ready to run the command
try:
# check that the request cylc version is available
cylc_version = None
if 'cylc_version' in args:
cylc_version = args['cylc_version']
if not check_cylc_version(cylc_version):
return cls._error(
f'cylc version not available: {cylc_version}'
)
args = dict(args)
args.pop('cylc_version')

# build the command
cmd = ['cylc', 'play', '--color=never']
cmd = _build_cmd(cmd, args)

except Exception as exc:
# oh noes, something went wrong, send back confirmation
return cls._error(exc)
# start each requested flow
cylc_version = args.pop('cylc_version', None)
for tokens in workflows:
try:
cmd = _build_cmd(['cylc', 'play', '--color=never'], args)

if tokens['user'] and tokens['user'] != getuser():
return cls._error(
'Cannot start workflows for other users.'
Expand All @@ -329,9 +296,15 @@ async def play(cls, workflows, args, workflows_mgr, log):
cmd_repr = f'CYLC_VERSION={cylc_version} {cmd_repr}'
log.info(f'$ {cmd_repr}')

# run cylc run
env = os.environ.copy()
env.pop('CYLC_ENV_NAME', None)
if cylc_version:
env['CYLC_VERSION'] = cylc_version

# run cylc play
proc = Popen(
cmd,
env=env,
stdin=DEVNULL,
stdout=PIPE,
stderr=PIPE,
Expand All @@ -346,22 +319,16 @@ async def play(cls, workflows, args, workflows_mgr, log):
f'Could not start {tokens["workflow"]}'
f' - {cmd_repr}'
)
raise Exception(
msg
)
raise Exception(msg)

except Exception as exc:
# oh noes, something went wrong, send back confirmation
return cls._error(exc)

else:
# send a success message
return cls._return(
'Workflow started'
)
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
# trigger a re-scan
await workflows_mgr.scan()
return response
# send a success message
return cls._return('Workflow(s) started')

@staticmethod
async def enqueue(stream, queue):
Expand Down
126 changes: 124 additions & 2 deletions cylc/uiserver/tests/test_resolvers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
from typing import Any, Dict, List, Optional
from async_timeout import timeout
import logging
import os
import pytest
from unittest import mock
from unittest.mock import MagicMock, Mock
from subprocess import Popen, TimeoutExpired

from cylc.flow import CYLC_LOG
from cylc.flow.id import Tokens
Expand All @@ -13,6 +16,7 @@
Services,
process_cat_log_stderr,
)
from cylc.uiserver.workflows_mgr import WorkflowsManager

services = Services()

Expand Down Expand Up @@ -47,6 +51,124 @@ def test_Services_anciliary_methods(func, message, expect):
assert func(message) == expect


@pytest.mark.parametrize(
'workflows, args, env, popen_ret_code, expected_ret, expected_env',
[
pytest.param(
[Tokens('wflow1'), Tokens('~murray/wflow2')],
{},
{},
0,
[True, "Workflow(s) started"],
{},
id="multiple"
),
pytest.param(
[Tokens('~feynman/wflow1')],
{},
{},
None,
[False, "Cannot start workflows for other users."],
{},
id="other user's wflow"
),
pytest.param(
[Tokens('wflow1')],
{},
{},
1,
[False, "strange"],
{},
id="command failed"
),
pytest.param(
[Tokens('wflow1')],
{'cylc_version': 'top'},
{'CYLC_VERSION': 'bottom', 'CYLC_ENV_NAME': 'quark'},
0,
[True, "Workflow(s) started"],
{'CYLC_VERSION': 'top'},
id="cylc version overrides env"
),
]
)
async def test_play(
monkeypatch: pytest.MonkeyPatch,
workflows: List[Tokens],
args: Dict[str, Any],
env: Dict[str, str],
popen_ret_code: Optional[int],
expected_ret: list,
expected_env: Dict[str, str],
):
"""It runs cylc play correctly.

Params:
workflows: list of workflow tokens
args: any args/options for cylc play
env: any environment variables
popen_ret_code: return code from cylc play
expected_ret: expected return value
expected_env: any expected environment variables
"""
for k, v in env.items():
monkeypatch.setenv(k, v)
monkeypatch.setattr('cylc.uiserver.resolvers.getuser', lambda: 'murray')
mock_popen = Mock(
spec=Popen,
return_value=Mock(
spec=Popen,
wait=Mock(return_value=popen_ret_code),
communicate=Mock(return_value=('charm', 'strange')),
)
)
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)

ret = await Services.play(
workflows,
{'some': 'opt', **args},
workflows_mgr=Mock(spec=WorkflowsManager),
log=Mock(),
)

assert ret == expected_ret

expected_env = {**os.environ, **expected_env}
expected_env.pop('CYLC_ENV_NAME', None)

for i, call_args in enumerate(mock_popen.call_args_list):
cmd_str = ' '.join(call_args.args[0])
assert cmd_str.startswith('cylc play')
assert '--some opt' in cmd_str
assert workflows[i]['workflow'] in cmd_str

assert call_args.kwargs['env'] == expected_env


async def test_play_timeout(monkeypatch: pytest.MonkeyPatch):
"""It returns an error if cylc play times out."""
def timeout(*args, **kwargs):
raise TimeoutExpired('cylc play', 42)

mock_popen = Mock(
spec=Popen,
return_value=Mock(
spec=Popen,
wait=timeout,
)
)
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)

ret = await Services.play(
[Tokens('wflow1')],
{},
workflows_mgr=Mock(spec=WorkflowsManager),
log=Mock(),
)

assert ret == [False, "Command 'cylc play' timed out after 42 seconds"]


async def test_cat_log(workflow_run_dir):
"""This is a functional test for cat_log subscription resolver.

Expand Down Expand Up @@ -80,7 +202,7 @@ async def test_cat_log(workflow_run_dir):
log_file = log_dir / '01-start-01.log'
log_file.write_text(log_file_content)
expected = log_file.read_text()
info = mock.MagicMock()
info = MagicMock()
info.root_value = 2
# mock the context
info.context = {'sub_statuses': {2: "start"}}
Expand Down