Skip to content

Commit

Permalink
response to review 1
Browse files Browse the repository at this point in the history
remove ghost mode terminology
  • Loading branch information
wxtim committed May 9, 2024
1 parent 19faa47 commit d7ee653
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 46 deletions.
1 change: 0 additions & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
)
from cylc.flow.print_tree import print_tree
from cylc.flow.task_qualifiers import ALT_QUALIFIERS
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.run_modes.nonlive import mode_validate_checks
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
Expand Down
25 changes: 20 additions & 5 deletions cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Prerequisite:
MESSAGE_TEMPLATE = r'%s/%s %s'

DEP_STATE_SATISFIED = 'satisfied naturally'
DEP_STATE_ARTIFICIAL = 'Artificially satisfied'
DEP_STATE_OVERRIDDEN = 'force satisfied'
DEP_STATE_UNSATISFIED = False

Expand Down Expand Up @@ -198,20 +199,26 @@ def _conditional_is_satisfied(self):
'"%s":\n%s' % (self.get_raw_conditional_expression(), err_msg))
return res

def satisfy_me(self, outputs: Iterable['Tokens']) -> 'Set[Tokens]':
def satisfy_me(
self, outputs: Iterable['Tokens'], mode: str = 'live'
) -> 'Set[Tokens]':
"""Attempt to satisfy me with given outputs.
Updates cache with the result.
Return outputs that match.
"""
if mode != 'live':
satisfied_message = self.DEP_STATE_ARTIFICIAL + f' by {mode} mode'
else:
satisfied_message = self.DEP_STATE_SATISFIED
valid = set()
for output in outputs:
prereq = (output['cycle'], output['task'], output['task_sel'])
if prereq not in self.satisfied:
continue
valid.add(output)
self.satisfied[prereq] = self.DEP_STATE_SATISFIED
self.satisfied[prereq] = satisfied_message
if self.conditional_expression is None:
self._all_satisfied = all(self.satisfied.values())
else:
Expand Down Expand Up @@ -292,6 +299,14 @@ def get_resolved_dependencies(self):
E.G: ['1/foo', '2/bar']
"""
return [f'{point}/{name}' for
(point, name, _), satisfied in self.satisfied.items() if
satisfied == self.DEP_STATE_SATISFIED]
return [
f'{point}/{name}' for
(point, name, _), satisfied in self.satisfied.items()
if (
satisfied == self.DEP_STATE_SATISFIED
or (
isinstance(satisfied, str)
and satisfied.startswith(self.DEP_STATE_ARTIFICIAL)
)
)
]
7 changes: 4 additions & 3 deletions cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ def process_outputs(itask: 'TaskProxy') -> List[str]:
# Send the rest of our outputs, unless they are succeed or failed,
# which we hold back, to prevent warnings about pre-requisites being
# unmet being shown because a "finished" output happens to come first.
for output, message in itask.state.outputs._required.items():
for message in itask.state.outputs.iter_required_messages():
trigger = itask.state.outputs._message_to_trigger[message]
# Send message unless it be succeeded/failed.
if output in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]:
if trigger in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]:
continue
if not conf_outputs or output in conf_outputs:
if not conf_outputs or trigger in conf_outputs:
result.append(message)

# Send succeeded/failed last.
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ def release_queued_tasks(self) -> bool:
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
run_mode=self.config.run_mode()
run_mode=self.get_run_mode()
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
Expand Down Expand Up @@ -1745,16 +1745,15 @@ async def _main_loop(self) -> None:

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.clock_expire_tasks()
self.release_queued_tasks()
if sim_time_check(
self.task_events_mgr,
self.pool.get_tasks(),
self.workflow_db_mgr,
):
# A simulated task state change occurred.
self.reset_inactivity_timer()
self.pool.clock_expire_tasks()
self.release_queued_tasks()

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@
help=(
f"Run mode: {RunMode.WORKFLOW_MODES} (default live)."
" Live mode executes the tasks as defined in the runtime section."
" Simulation, Skip and Dummy partially or wholly ignore"
" the task defined in runtime configuration. Simulation and"
" dummy are designed for testing and Skip for flow control."
" Simulation, skip and dummy modes ignore part of tasks'"
" runtime configurations. Simulation and dummy modes are"
" designed for testing, and skip mode is for flow control."
),
metavar="STRING", action='store', dest="run_mode",
choices=list(RunMode.WORKFLOW_MODES),
Expand Down
14 changes: 7 additions & 7 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
Return (list): list of tasks that attempted submission.
"""
itasks, ghost_tasks = self._nonlive_submit_task_jobs(
itasks, jobless_tasks = self._nonlive_submit_task_jobs(
itasks, workflow, run_mode)

# Prepare tasks for job submission
Expand All @@ -278,18 +278,18 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()

if not prepared_tasks and not ghost_tasks:
if not prepared_tasks and not jobless_tasks:
return bad_tasks
elif not prepared_tasks:
return ghost_tasks
return jobless_tasks
auth_itasks = {} # {platform: [itask, ...], ...}

for itask in prepared_tasks:
platform_name = itask.platform['name']
auth_itasks.setdefault(platform_name, [])
auth_itasks[platform_name].append(itask)
# Submit task jobs for each platform
done_tasks = bad_tasks + ghost_tasks
done_tasks = bad_tasks + jobless_tasks

for _, itasks in sorted(auth_itasks.items()):
# Find the first platform where >1 host has not been tried and
Expand Down Expand Up @@ -1022,7 +1022,7 @@ def _nonlive_submit_task_jobs(
the scheduler. (This includes skip and simulation mode tasks).
"""
lively_tasks: 'List[TaskProxy]' = []
ghost_tasks: 'List[TaskProxy]' = []
jobless_tasks: 'List[TaskProxy]' = []
now = time()
now = (now, get_time_string_from_unix_time(now))

Expand Down Expand Up @@ -1057,10 +1057,10 @@ def _nonlive_submit_task_jobs(
self, itask, rtconfig, workflow, now)
# Assign task to list:
if is_done:
ghost_tasks.append(itask)
jobless_tasks.append(itask)
else:
lively_tasks.append(itask)
return lively_tasks, ghost_tasks
return lively_tasks, jobless_tasks

def _submit_task_jobs_callback(self, ctx, workflow, itasks):
"""Callback when submit task jobs command exits."""
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,10 @@ def spawn_on_output(self, itask, output, forced=False):
else:
tasks = [c_task]
for t in tasks:
t.satisfy_me([itask.tokens.duplicate(task_sel=output)])
t.satisfy_me(
[itask.tokens.duplicate(task_sel=output)],
getattr(itask.tdef, 'run_mode', 'live')
)
self.data_store_mgr.delta_task_prerequisite(t)
self.add_to_pool(t)

Expand Down Expand Up @@ -1521,7 +1524,8 @@ def spawn_on_all_outputs(
continue
if completed_only:
c_task.satisfy_me(
[itask.tokens.duplicate(task_sel=message)]
[itask.tokens.duplicate(task_sel=message)],
itask.tdef.run_mode
)
self.data_store_mgr.delta_task_prerequisite(c_task)
self.add_to_pool(c_task)
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ def state_reset(
return False

def satisfy_me(
self, task_messages: 'List[Tokens]'
self, task_messages: 'List[Tokens]', mode='live'
) -> 'Set[Tokens]':
"""Try to satisfy my prerequisites with given output messages.
Expand All @@ -554,7 +554,7 @@ def satisfy_me(
Return a set of unmatched task messages.
"""
used = self.state.satisfy_me(task_messages)
used = self.state.satisfy_me(task_messages, mode)
return set(task_messages) - used

def clock_expire(self) -> bool:
Expand Down
21 changes: 10 additions & 11 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,17 @@ class RunMode:

MODES = {LIVE, SIMULATION, DUMMY, SKIP, WORKFLOW}

WORKFLOW_MODES = sorted(MODES - {WORKFLOW})
WORKFLOW_MODES = [LIVE, DUMMY, SIMULATION, SKIP]
"""Workflow mode not sensible mode for workflow.
n.b. converted to a list to ensure ordering doesn't change in
CLI
"""

LIVELY_MODES = {LIVE, DUMMY}
JOB_MODES = {LIVE, DUMMY}
"""Modes which need to have real jobs submitted."""

GHOSTLY_MODES = {SKIP, SIMULATION}
JOBLESS_MODES = {SKIP, SIMULATION}
"""Modes which completely ignore the standard submission path."""

@staticmethod
Expand All @@ -197,14 +197,14 @@ def get(options: 'Values') -> str:
def is_lively(mode: str) -> bool:
"""Task should be treated as live, mode setting mess with scripts only.
"""
return bool(mode in RunMode.LIVELY_MODES)
return bool(mode in RunMode.JOB_MODES)

Check warning on line 200 in cylc/flow/task_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_state.py#L200

Added line #L200 was not covered by tests

@staticmethod
def is_ghostly(mode: str) -> bool:
"""Task has no reality outside the scheduler and needs no further
processing after run_mode.submit_task_job method finishes.
"""
return bool(mode in RunMode.GHOSTLY_MODES)
return bool(mode in RunMode.JOBLESS_MODES)

@staticmethod
def disable_task_event_handlers(itask):
Expand All @@ -214,16 +214,14 @@ def disable_task_event_handlers(itask):
if we don't deliberately enable them:
"""
mode = itask.tdef.run_mode
if (
return (
mode == RunMode.SIMULATION
or (
mode == RunMode.SKIP
and itask.tdef.rtconfig['skip'][
'disable task event handlers'] is True
)
):
return True
return False
)


def status_leq(status_a, status_b):
Expand Down Expand Up @@ -384,15 +382,16 @@ def __call__(

def satisfy_me(
self,
outputs: Iterable['Tokens']
outputs: Iterable['Tokens'],
mode,
) -> Set['Tokens']:
"""Try to satisfy my prerequisites with given outputs.
Return which outputs I actually depend on.
"""
valid: Set[Tokens] = set()
for prereq in (*self.prerequisites, *self.suicide_prerequisites):
yep = prereq.satisfy_me(outputs)
yep = prereq.satisfy_me(outputs, mode)
if yep:
valid = valid.union(yep)
continue
Expand Down
18 changes: 11 additions & 7 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ def list_tasks(schd):
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'a', 'succeeded'):
'Artificially satisfied by simulation mode'},
{('1', 'b', 'succeeded'): False},
{('1', 'c', 'succeeded'): False},
],
Expand Down Expand Up @@ -669,7 +670,8 @@ def list_tasks(schd):
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'a', 'succeeded'):
'Artificially satisfied by simulation mode'},
{('1', 'b', 'succeeded'): False},
],
id='removed'
Expand Down Expand Up @@ -764,7 +766,8 @@ async def test_restart_prereqs(
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'a', 'succeeded'):
'Artificially satisfied by simulation mode'},
{('1', 'b', 'succeeded'): False},
{('1', 'c', 'succeeded'): False},
],
Expand Down Expand Up @@ -792,7 +795,8 @@ async def test_restart_prereqs(
('1', 'z', 'waiting'),
],
[
{('1', 'a', 'succeeded'): 'satisfied naturally'},
{('1', 'a', 'succeeded'):
'Artificially satisfied by simulation mode'},
{('1', 'b', 'succeeded'): False},
],
id='removed'
Expand Down Expand Up @@ -890,7 +894,7 @@ async def _test_restart_prereqs_sat():
for prereq in task_c.state.prerequisites
for key, satisfied in prereq.satisfied.items()
) == [
('1', 'a', 'succeeded', 'satisfied naturally'),
('1', 'a', 'succeeded', 'Artificially satisfied by simulation mode'),
('1', 'b', 'succeeded', 'satisfied from database')
]

Expand All @@ -907,7 +911,7 @@ async def _test_restart_prereqs_sat():
for prereq in task_c_prereqs
for condition in prereq.conditions
) == [
('1/a', True, 'satisfied naturally'),
('1/a', True, 'Artificially satisfied by simulation mode'),
('1/b', True, 'satisfied from database'),
]

Expand Down Expand Up @@ -1219,7 +1223,7 @@ async def test_detect_incomplete_tasks(
# the task should not have been removed
assert itask in schd.pool.get_tasks()


async def test_future_trigger_final_point(
flow,
scheduler,
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/run_modes/test_skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def test_process_outputs(outputs, required, expect):
rtconfig={'skip': {'outputs': outputs}}),
state=SimpleNamespace(
outputs=SimpleNamespace(
_required={v: v for v in required}
iter_required_messages=lambda: iter(required),
_message_to_trigger={v: v for v in required}
)))

assert process_outputs(itask) == ['submitted'] + expect

0 comments on commit d7ee653

Please sign in to comment.