Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start-tasks: wait on xtriggers #6528

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6528.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make start-tasks wait on xtriggers (see "cylc play --start-task").
4 changes: 3 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,10 @@ def _load_pool_from_tasks(self):
"""Load task pool with specified tasks, for a new run."""
LOG.info(f"Start task: {self.options.starttask}")
# flow number set in this call:
self.pool.force_trigger_tasks(
self.pool.set_prereqs_and_outputs(
self.options.starttask,
outputs=[],
prereqs=["all"],
flow=[FLOW_NEW],
flow_descr=f"original flow from {self.options.starttask}"
)
Expand Down
13 changes: 7 additions & 6 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,13 @@
OptionSettings(
["--start-task", "--starttask", "-t"],
help=(
"Start from this task instance, given by '<cycle>/<name>'."
" This can be used multiple times to start from multiple"
" tasks at once. Dependence on tasks with cycle points earlier"
" than the earliest start-task will be ignored. A"
" sub-graph of the workflow will run if selected tasks do"
" not lead on to the full graph."),
"Start from this task instance instead of the beginning of the"
" graph. Dependence on other tasks is ignored but clock and"
" xtriggers are respected. Reuse the option for multiple start"
" tasks. Any dependence on cycle points prior to the earliest"
" start task will be ignored throughout the graph. Check that"
" start tasks flow into the full downstream graph, if needed."
),
metavar="TASK_ID",
action='append',
dest="starttask",
Expand Down
30 changes: 0 additions & 30 deletions tests/functional/cylc-play/05-start-task.t

This file was deleted.

12 changes: 0 additions & 12 deletions tests/functional/cylc-play/05-start-task/flow.cylc

This file was deleted.

7 changes: 0 additions & 7 deletions tests/functional/cylc-play/05-start-task/reference.log

This file was deleted.

77 changes: 77 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2305,3 +2305,80 @@ async def test_job_insert_on_crash(one_conf, flow, scheduler, start):

# and a job entry should be added
assert len(task_1.jobs) == 1


async def test_start_tasks(
flow,
scheduler,
start,
log_filter,
capture_submission,
):
"""Check starting from "start-tasks" with and without clock-triggers.

"""
id_ = flow(
{
'scheduler': {
'cycle point format': '%Y',
},
'scheduling': {
'initial cycle point': '2040',
'runahead limit': 'P0Y',
'xtriggers': {
'wall_clock_satisfied': "wall_clock(offset='-P100Y')"
},
'graph': {
'P1Y': """
foo
@wall_clock => bar
@wall_clock_satisfied => baz
qux
"""
}
}
}
)
schd = scheduler(
id_,
starttask=['2050/foo', '2050/bar', '2050/baz'],
paused_start=False
)

async with start(schd) as log:
# capture any job submissions
submitted_tasks = capture_submission(schd)
assert submitted_tasks == set()

# It should start up with:
# - 2050/foo and 2051/foo (spawned to runahead limit)
# - 2050/bar waiting on its (unsatisfied) clock-trigger
# - 2050/baz waiting on its (satisfied) clock-trigger
# - no qux instances (not listed as a start-task)
itasks = schd.pool.get_tasks()
assert (
set(itask.identity for itask in itasks) == set([
"2050/foo",
"2051/foo",
"2050/bar",
"2050/baz",
])
)

# Check xtriggers
for itask in itasks:
schd.pool.xtrigger_mgr.call_xtriggers_async(itask)
schd.pool.rh_release_and_queue(itask)

# Release tasks that are ready to run.
schd.release_tasks_to_run()

# It should submit 2050/foo, 2051/foo, 2050/baz
# It should not submit 2050/bar (waiting on clock trigger)
assert (
set(itask.identity for itask in submitted_tasks) == set([
"2050/foo",
"2051/foo",
"2050/baz",
])
)
Loading