Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

made reinstall work on multiple workflows #5803

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5803.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Updated 'reinstall' functionality to support multiple workflows
46 changes: 28 additions & 18 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from cylc.flow.id import upgrade_legacy_ids
from cylc.flow.host_select import select_workflow_host
from cylc.flow.hostuserutil import is_remote_host
from cylc.flow.id_cli import parse_ids
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.loggingutil import (
close_log,
RotatingLogFileHandler,
Expand Down Expand Up @@ -354,7 +354,11 @@ def _open_logs(id_: str, no_detach: bool, restart_num: int) -> None:
)


def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
async def scheduler_cli(
options: 'Values',
workflow_id_raw: str,
parse_workflow_id: bool = True
) -> None:
"""Run the workflow.

This function should contain all of the command line facing
Expand All @@ -368,15 +372,18 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
# Parse workflow name but delay Cylc 7 suite.rc deprecation warning
# until after the start-up splash is printed.
# TODO: singleton
(workflow_id,), _ = parse_ids(
workflow_id_raw,
constraint='workflows',
max_workflows=1,
# warn_depr=False, # TODO
)
if parse_workflow_id:
(workflow_id,), _ = await parse_ids_async(
workflow_id_raw,
constraint='workflows',
max_workflows=1,
# warn_depr=False, # TODO
)
else:
workflow_id = workflow_id_raw

# resume the workflow if it is already running
_resume(workflow_id, options)
await _resume(workflow_id, options)

# check the workflow can be safely restarted with this version of Cylc
db_file = Path(get_workflow_srv_dir(workflow_id), 'db')
Expand All @@ -400,9 +407,7 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
# NOTE: asyncio.run opens an event loop, runs your coro,
# then shutdown async generators and closes the event loop
scheduler = Scheduler(workflow_id, options)
asyncio.run(
_setup(scheduler)
)
await _setup(scheduler)

# daemonize if requested
# NOTE: asyncio event loops cannot persist across daemonization
Expand All @@ -419,9 +424,14 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
)

# run the workflow
ret = asyncio.run(
_run(scheduler)
)
if options.no_detach:
ret = await _run(scheduler)
else:
# Note: The daemonization messes with asyncio so we have to start a
# new event loop if detaching
ret = asyncio.run(
_run(scheduler)
)

# exit
# NOTE: we must clean up all asyncio / threading stuff before exiting
Expand All @@ -432,7 +442,7 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
sys.exit(ret)


def _resume(workflow_id, options):
async def _resume(workflow_id, options):
"""Resume the workflow if it is already running."""
try:
detect_old_contact_file(workflow_id)
Expand All @@ -448,7 +458,7 @@ def _resume(workflow_id, options):
'wFlows': [workflow_id]
}
}
pclient('graphql', mutation_kwargs)
await pclient.async_request('graphql', mutation_kwargs)
sys.exit(0)
except CylcError as exc:
LOG.error(exc)
Expand Down Expand Up @@ -651,4 +661,4 @@ def _play(parser: COP, options: 'Values', id_: str):
*options.starttask,
relative=True,
)
return scheduler_cli(options, id_)
return asyncio.run(scheduler_cli(options, id_))
35 changes: 21 additions & 14 deletions cylc/flow/scripts/reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from pathlib import Path
import sys
from typing import Optional, TYPE_CHECKING, List, Callable
from functools import partial

from ansimarkup import parse as cparse

Expand All @@ -83,12 +84,13 @@
from cylc.flow.install import (
reinstall_workflow,
)
from cylc.flow.id_cli import parse_id
from cylc.flow.network.multi import call_multi
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
OptionSettings,
WORKFLOW_ID_ARG_DOC,
ID_MULTI_ARG_DOC
)

from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import (
get_workflow_source_dir,
Expand All @@ -101,7 +103,6 @@

_input = input # to enable testing


REINSTALL_CYLC_ROSE_OPTIONS = [
OptionSettings(
['--clear-rose-install-options'],
Expand All @@ -127,7 +128,12 @@

def get_option_parser() -> COP:
parser = COP(
__doc__, comms=True, argdoc=[WORKFLOW_ID_ARG_DOC]
__doc__,
comms=True,
multiworkflow=True,
argdoc=[
ID_MULTI_ARG_DOC,
],
)

try:
Expand All @@ -149,27 +155,28 @@ def get_option_parser() -> COP:
def main(
_parser: COP,
opts: 'Values',
args: Optional[str] = None
*ids: str
) -> None:
"""CLI wrapper."""
reinstall_cli(opts, args)
call_multi(
partial(reinstall_cli, opts),
*ids,
constraint='workflows',
report=lambda x: print('Done')
)


def reinstall_cli(
async def reinstall_cli(
opts: 'Values',
args: Optional[str] = None,
print_reload_tip: bool = True,
workflow_id,
*tokens_list,
print_reload_tip: bool = True
) -> bool:
"""Implement cylc reinstall.

This is the bit which contains all the CLI logic.
"""
run_dir: Optional[Path]
workflow_id: str
workflow_id, *_ = parse_id(
args,
constraint='workflows',
)
run_dir = Path(get_workflow_run_dir(workflow_id))
if not run_dir.is_dir():
raise WorkflowFilesError(
Expand Down
25 changes: 15 additions & 10 deletions cylc/flow/scripts/validate_reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
ContactFileExists,
CylcError,
)
from cylc.flow.id_cli import parse_id
from cylc.flow.id_cli import parse_id_async
from cylc.flow.loggingutil import set_timestamps
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
Expand All @@ -62,19 +62,20 @@
from cylc.flow.scheduler_cli import PLAY_OPTIONS, scheduler_cli
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
_main as cylc_validate
wrapped_main as cylc_validate
)
from cylc.flow.scripts.reinstall import (
REINSTALL_CYLC_ROSE_OPTIONS,
REINSTALL_OPTIONS,
reinstall_cli as cylc_reinstall,
)
from cylc.flow.scripts.reload import (
reload_cli as cylc_reload
run as cylc_reload
)
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import detect_old_contact_file

import asyncio

CYLC_ROSE_OPTIONS = COP.get_cylc_rose_options()
VR_OPTIONS = combine_options(
Expand Down Expand Up @@ -124,16 +125,16 @@ def check_tvars_and_workflow_stopped(

@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', workflow_id: str):
sys.exit(vro_cli(parser, options, workflow_id))
sys.exit(asyncio.run(vr_cli(parser, options, workflow_id)))


def vro_cli(parser: COP, options: 'Values', workflow_id: str):
async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
"""Run Cylc (re)validate - reinstall - reload in sequence."""
# Attempt to work out whether the workflow is running.
# We are trying to avoid reinstalling then subsequently being
# unable to play or reload because we cannot identify workflow state.
unparsed_wid = workflow_id
workflow_id, *_ = parse_id(
workflow_id, *_ = await parse_id_async(
workflow_id,
constraint='workflows',
)
Expand Down Expand Up @@ -166,10 +167,14 @@ def vro_cli(parser: COP, options: 'Values', workflow_id: str):
# Force on the against_source option:
options.against_source = True # Make validate check against source.
log_subcommand('validate --against-source', workflow_id)
cylc_validate(parser, options, workflow_id)
await cylc_validate(parser, options, workflow_id)

log_subcommand('reinstall', workflow_id)
reinstall_ok = cylc_reinstall(options, workflow_id, print_reload_tip=False)
reinstall_ok = await cylc_reinstall(
options, workflow_id,
[],
print_reload_tip=False
)
if not reinstall_ok:
LOG.warning(
'No changes to source: No reinstall or'
Expand All @@ -180,7 +185,7 @@ def vro_cli(parser: COP, options: 'Values', workflow_id: str):
# Run reload if workflow is running or paused:
if workflow_running:
log_subcommand('reload', workflow_id)
cylc_reload(options, workflow_id)
await cylc_reload(options, workflow_id)

# run play anyway, to play a stopped workflow:
else:
Expand All @@ -197,4 +202,4 @@ def vro_cli(parser: COP, options: 'Values', workflow_id: str):
source='', # Intentionally blank
)
log_subcommand(*sys.argv[1:])
scheduler_cli(options, workflow_id)
await scheduler_cli(options, workflow_id, parse_workflow_id=False)
1 change: 1 addition & 0 deletions tests/functional/cylc-reinstall/00-simple.t
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ run_ok "${TEST_NAME}-reinstall" cylc reinstall "${RND_WORKFLOW_NAME}/run1"
cmp_ok "${TEST_NAME}-reinstall.stdout" <<__OUT__
REINSTALLED $RND_WORKFLOW_NAME/run1 from ${RND_WORKFLOW_SOURCE}
Successfully reinstalled.
Done
__OUT__
popd || exit 1
purge_rnd_workflow
Expand Down
Loading
Loading