Skip to content

Commit

Permalink
cylc-set: glob in pool for now.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Dec 1, 2023
1 parent fb4addd commit aaafdc6
Show file tree
Hide file tree
Showing 60 changed files with 294 additions and 230 deletions.
4 changes: 4 additions & 0 deletions cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def __init__(self, point):
# * `False` (prerequisite unsatisfied).
self._all_satisfied = None

def __str__(self):
# TODO make this more useful
return f"{self.point}: {self.satisfied}, {self.conditional_expression}"

def instantaneous_hash(self):
"""Generate a hash of this prerequisite in its current state.
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ def log_start(self) -> None:

# Note that the following lines must be present at the top of
# the workflow log file for use in reference test runs.
LOG.info(
"Task log key: [<cycle>/<name>/<job>(<flows>):<status>]"
)
LOG.info(
f'Run mode: {self.config.run_mode()}',
extra=RotatingLogFileHandler.header_extra
Expand Down
20 changes: 10 additions & 10 deletions cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Task output message manager and constants."""

from typing import List
from typing import Set

# Standard task output strings, used for triggering.
TASK_OUTPUT_EXPIRED = "expired"
Expand Down Expand Up @@ -271,8 +271,8 @@ def _get_item(self, message, trigger):
else:
return self._by_message[message]

def add_implied_outputs(self, output: str) -> List[str]:
"""Return a list with implied outputs prepended.
def add_implied_outputs(self, output: str) -> Set[str]:
"""Return a set with implied outputs prepended.
- started implies submitted
- any custom output implies started
Expand All @@ -281,25 +281,25 @@ def add_implied_outputs(self, output: str) -> List[str]:
"""
if output == TASK_OUTPUT_STARTED:
return [TASK_OUTPUT_SUBMITTED, output]
return {TASK_OUTPUT_SUBMITTED, output}

elif output in self._get_custom_triggers():
return [TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, output]
return {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, output}

elif output in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]:
required_custom = [
required_custom = {
msg for msg in self._get_custom_triggers()
if msg in self._required.values()
]
return [
}
return {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
*required_custom,
output
]
}

else:
return [output]
return {output}

@staticmethod
def is_valid_std_name(name):
Expand Down
249 changes: 147 additions & 102 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ def spawn_on_all_outputs(
self.rh_release_and_queue(c_task)

def can_be_spawned(self, name: str, point: 'PointBase') -> bool:
"""Return True if a requested task is spawnable, else False."""
"""Return True if a point/name is within graph bounds."""

if name not in self.config.taskdefs:
LOG.debug('No task definition %s', name)
Expand Down Expand Up @@ -1576,10 +1576,21 @@ def set( # noqa: A003
):
"""Set prerequisites or outputs of target tasks.
By default, set all required outputs.
Default: set all required outputs.
Setting prerequisites spawns the target task.
Setting outputs spawns children of the target task.
Use a transient task proxy to spawn children. (Even if the parent was
previously spawned in this flow its children might not have been).
Task matching:
- globs (cycle and name) only match in the pool
- family names are not expanded
- future tasks must be specified individually
Args:
items: identifiers for matching task definitions
items: task ID match patterns
prerequisites: prerequisites to set
outputs: outputs to set and spawn children of
flow: Flow numbers for spawned or merged tasks
Expand All @@ -1592,53 +1603,60 @@ def set( # noqa: A003
# Illegal flow command opts
return

# Note this filters out invalid cycle point for target task.
_, task_items = self.match_taskdefs(items)
itasks, future_tasks, unmatched = self.filter_task_proxies(
items,
future=True,
warn=False,
)

# pool tasks
for itask in itasks:
self.merge_flows(itask, flow_nums)
if not outputs and not prerequisites:
# Default: set required outputs.
outputs = itask.tdef.get_required_outputs()
if outputs:
self._set_outputs_itask(itask, outputs)
if prerequisites:
self._set_prereqs_itask(
itask, prerequisites, flow_nums, flow_wait)

for (_, point), taskdef in sorted(task_items.items()):
# future task definitions
for name, point in future_tasks:
taskdef = self.config.get_taskdef(name)
if not outputs and not prerequisites:
# Default: set required outputs.
outputs = taskdef.get_required_outputs()
if outputs:
self._set_outputs(
point, taskdef, outputs, flow_nums, flow_wait)
trans = self._spawn_transient_task(
point, taskdef, outputs, flow_nums, flow_wait
)
if trans is not None:
self._set_outputs_itask(trans, outputs)
if prerequisites:
self._set_prereqs(
self._set_prereqs_tdef(
point, taskdef, prerequisites, flow_nums, flow_wait)

def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait):
"""Set given outputs of a task, and spawn associated children.
def _spawn_transient_task(
self,
point: 'PointBase',
taskdef: 'TaskDef',
outputs: List[str],
flow_nums: 'FlowNums',
flow_wait: bool
) -> Optional['TaskProxy']:
"""Spawn a transient task proxy and update its outputs from the DB."""

Do not spawn the target task if it is not already in the pool, but
update the DB to reflect the set outputs, and spawn the children.
"""
itask = self._get_task_by_id(
Tokens(
cycle=str(point),
task=taskdef.name
).relative_id
itask = self.spawn_task(
taskdef.name,
point,
flow_nums,
flow_wait=flow_wait,
force=True,
transient=True
)
if itask is not None:
# The parent task already exists in the pool.
transient = False
self.merge_flows(itask, flow_nums)
else:
# Spawn a transient task instance to use for spawning children.
transient = True
itask = self.spawn_task(
taskdef.name,
point,
flow_nums,
flow_wait=flow_wait,
force=True,
transient=True
)
# force=True: spawn it even if previously spawned in this flow,
# because even if it was, its children might not have been. It
# is transient and won't be added to the pool, but its outputs
# will be updated in the DB, and any event handler activity
# will be recorded in the previous-submit log directory.

# Update outputs that were already completed.
for outputs_str, fnums in (
self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
Expand All @@ -1648,83 +1666,110 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait):
itask.state.outputs.set_completed_by_msg(msg)
break

for output in outputs:
msg = itask.state.outputs.get_msg(output)
if msg is None:
return itask

def _set_outputs_itask(
self,
itask: 'TaskProxy',
req_outputs: List[str],
) -> None:
"""Set requested outputs on a task and spawn their children."""

# TODO TIDIER "set:"" LOG MESSAGES

# convert from labels to messages
requested = []
allout: Set[str] = set() # requested plus implied outputs
for output in req_outputs:
o_msg = itask.state.outputs.get_msg(output)
if o_msg is None:
LOG.warning(
f"Output not found: {itask.identity}:{output}")
continue
requested.append(o_msg)
allout = allout.union(
itask.state.outputs.add_implied_outputs(o_msg)
)

for out in itask.state.outputs.add_implied_outputs(msg):
# Handle outputs as if completed naturally.
if itask.state.outputs.is_completed(out):
# already completed
continue
info = " "
if out != msg:
info = " implied "
LOG.info(
f"Completing{info}output: {itask.identity}:{out}"
f" ({itask.flows_str()})")
self.task_events_mgr.process_message(itask, logging.INFO, out)
for out in allout:
info = f'output "{out}" of {itask.identity}'
if out not in requested:
info = f"implied {info}"
if itask.state.outputs.is_completed(out):
LOG.info(f"set: {info} already completed")
continue
LOG.info(f"set: completing {info}")

if transient:
# tasks states table gets updated from the task pool
self.workflow_db_mgr.put_update_task_state(itask)
self.task_events_mgr.process_message(itask, logging.INFO, out)

def _set_prereqs(self, point, taskdef, prereqs, flow_nums, flow_wait):
"""Set given prerequisites of a target task.
if itask.transient:
# (note tasks states table gets updated from the task pool)
LOG.warning(f"TWAT {itask}")
self.workflow_db_mgr.put_update_task_state(itask)

Spawn the task first if not already in the pool.
def _get_valid_prereqs(self, prereqs, taskdef, point):
"""Get valid prerequisites for a task.
Spawn a transient task proxy without incrementing submit
number or checking the flow.
"""
available = set()
itask = TaskProxy(self.tokens, taskdef, point, transient=True)
for p in itask.state.prerequisites:
for pp in p.satisfied.keys():
available.add(pp)

requested = set()
for p in prereqs:
t = Tokens(p, relative=True)
# Default to :succeeded
t['task_sel'] = t['task_sel'] or TASK_OUTPUT_SUCCEEDED
requested.add((t['cycle'], t['task'], t['task_sel']))

good = available & requested
bad = requested - available
if bad:
for b in bad:
LOG.warning(
f"{point}/{taskdef.name} does not depend on"
f" {b[0]}/{b[1]}:{b[2]}"
)

return good

def _set_prereqs_itask(self, itask, prereqs, flow_nums, flow_wait):
"""Set prerequisites of a task in the pool."""

if prereqs == ["all"]:
itask = self.get_or_spawn_task(point, taskdef.name, flow_nums)
if itask is None:
# E.g. already spawned in flow.
return
itask.state.set_all_satisfied()
else:
# Check if the given prerequisites are valid for the task.
# Spawn a transient task proxy to get the available prerequisites
# without incrementing submit number or checking the flow.
available = set()
for p in TaskProxy( # transient task
self.tokens, taskdef, point, transient=True
).state.prerequisites:
for pp in p.satisfied.keys():
available.add(pp)

requested = set()
for p in prereqs:
t = Tokens(p, relative=True)
# Default to :succeeded
t['task_sel'] = t['task_sel'] or TASK_OUTPUT_SUCCEEDED
requested.add((t['cycle'], t['task'], t['task_sel']))

good = available & requested
bad = requested - available
if bad:
for b in bad:
LOG.warning(
f"{point}/{taskdef.name} does not depend on"
f" {b[0]}/{b[1]}:{b[2]}"
)
if not good:
# No valid prerequisites requested.
return
itask.satisfy_me(
# TODO: IS THIS NEEDED? (JUST LOG BAD ONES FROM SATISFY_ME?)
self._get_valid_prereqs(prereqs, itask.tdef, itask.point)
)

# Now spawn that sucker for real.
itask = self.get_or_spawn_task(
point,
taskdef.name,
flow_nums,
flow_wait=flow_wait
self.data_store_mgr.delta_task_prerequisite(itask)

# if (
# self.runahead_limit_point is not None
# and itask.point <= self.runahead_limit_point
# ):
# self.rh_release_and_queue(itask)

def _set_prereqs_tdef(self, point, taskdef, prereqs, flow_nums, flow_wait):
"""Set given prerequisites of a future task."""

itask = self.spawn_task(taskdef.name, point, flow_nums, flow_wait)
if itask is None:
# E.g. already spawned in flow.
return
if prereqs == ["all"]:
itask.state.set_all_satisfied()
else:
itask.satisfy_me(
self._get_valid_prereqs(prereqs, taskdef, point)
)
if itask is None:
# E.g. already spawned in flow.
return
itask.satisfy_me(good)

self.data_store_mgr.delta_task_prerequisite(itask)
self.add_to_pool(itask)
Expand Down
Loading

0 comments on commit aaafdc6

Please sign in to comment.