Skip to content

Commit

Permalink
Merge pull request #5817 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
oliver-sanders authored Nov 13, 2023
2 parents 99fa0f5 + 6b1da95 commit 41c38eb
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 177 deletions.
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ $ towncrier create <PR-number>.<break|feat|fix>.md --content "Short description"

<!-- towncrier release notes start -->

## __cylc-8.2.3 (Released 2023-11-02)__

### 🔧 Fixes

[#5660](https://github.com/cylc/cylc-flow/pull/5660) - Re-worked graph n-window algorithm for better efficiency.

[#5753](https://github.com/cylc/cylc-flow/pull/5753) - Fixed bug where execution time limit polling intervals could end up incorrectly applied

[#5776](https://github.com/cylc/cylc-flow/pull/5776) - Ensure that submit-failed tasks are marked as incomplete (so remain visible) when running in back-compat mode.

[#5791](https://github.com/cylc/cylc-flow/pull/5791) - fix a bug where if multiple clock triggers are set for a task only one was being satisfied.

## __cylc-8.2.2 (Released 2023-10-05)__

### 🚀 Enhancements
Expand Down
1 change: 0 additions & 1 deletion changes.d/5660.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5753.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/5776.fix.md

This file was deleted.

1 change: 1 addition & 0 deletions changes.d/5801.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix traceback when using parentheses on right hand side of graph trigger.
4 changes: 2 additions & 2 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import List, Optional, Tuple, Any, Union

from contextlib import suppress
from packaging.version import parse as parse_version, Version
from packaging.version import Version

from cylc.flow import LOG
from cylc.flow import __version__ as CYLC_VERSION
Expand Down Expand Up @@ -1869,7 +1869,7 @@ def get_version_hierarchy(version: str) -> List[str]:
['', '8', '8.0', '8.0.1', '8.0.1a2', '8.0.1a2.dev']
"""
smart_ver: Version = parse_version(version)
smart_ver = Version(version)
base = [str(i) for i in smart_ver.release]
hierarchy = ['']
hierarchy += ['.'.join(base[:i]) for i in range(1, len(base) + 1)]
Expand Down
37 changes: 22 additions & 15 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
Dict,
List,
Tuple,
Optional
Optional,
Union
)

import cylc.flow.flags
Expand Down Expand Up @@ -85,10 +86,10 @@ class GraphParser:
store dependencies for the whole workflow (call parse_graph multiple times
and key results by graph section).
The general form of a dependency is "EXPRESSION => NODE", where:
* On the right, NODE is a task or family name
The general form of a dependency is "LHS => RHS", where:
* On the left, an EXPRESSION of nodes involving parentheses, and
logical operators '&' (AND), and '|' (OR).
* On the right, an EXPRESSION of nodes NOT involving '|'
* Node names may be parameterized (any number of parameters):
NODE<i,j,k>
NODE<i=0,j,k> # specific parameter value
Expand Down Expand Up @@ -517,32 +518,33 @@ def _proc_dep_pair(
"Suicide markers must be"
f" on the right of a trigger: {left}")

# Check that parentheses match.
mismatch_msg = 'Mismatched parentheses in: "{}"'
if left and left.count("(") != left.count(")"):
raise GraphParseError(mismatch_msg.format(left))
if right.count("(") != right.count(")"):
raise GraphParseError(mismatch_msg.format(right))

# Ignore cycle point offsets on the right side.
# (Note we can't ban this; all nodes get process as left and right.)
if '[' in right:
return

# Check that parentheses match.
if left and left.count("(") != left.count(")"):
raise GraphParseError(
"Mismatched parentheses in: \"" + left + "\"")

# Split right side on AND.
rights = right.split(self.__class__.OP_AND)
if '' in rights or right and not all(rights):
raise GraphParseError(
f"Null task name in graph: {left} => {right}")

lefts: Union[List[str], List[Optional[str]]]
if not left or (self.__class__.OP_OR in left or '(' in left):
# Treat conditional or bracketed expressions as a single entity.
# Treat conditional or parenthesised expressions as a single entity
# Can get [None] or [""] here
lefts: List[Optional[str]] = [left]
lefts = [left]
else:
# Split non-conditional left-side expressions on AND.
# Can get [""] here too
# TODO figure out how to handle this wih mypy:
# assign List[str] to List[Optional[str]]
lefts = left.split(self.__class__.OP_AND) # type: ignore
lefts = left.split(self.__class__.OP_AND)
if '' in lefts or left and not all(lefts):
raise GraphParseError(
f"Null task name in graph: {left} => {right}")
Expand Down Expand Up @@ -847,9 +849,14 @@ def _compute_triggers(
trigs += [f"{name}{offset}:{trigger}"]

for right in rights:
right = right.strip('()') # parentheses don't matter
m = self.__class__.REC_RHS_NODE.match(right)
# This will match, bad nodes are detected earlier (type ignore):
suicide_char, name, output, opt_char = m.groups() # type: ignore
if not m:
# Bad nodes should have been detected earlier; fail loudly
raise ValueError( # pragma: no cover
f"Unexpected graph expression: '{right}'"
)
suicide_char, name, output, opt_char = m.groups()
suicide = (suicide_char == self.__class__.SUICIDE)
optional = (opt_char == self.__class__.OPTIONAL)
if output:
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sys
from typing import TYPE_CHECKING

from packaging.version import parse as parse_version
from packaging.version import Version

from cylc.flow import LOG, __version__
from cylc.flow.exceptions import (
Expand Down Expand Up @@ -468,7 +468,7 @@ def _version_check(
if not db_file.is_file():
# not a restart
return True
this_version = parse_version(__version__)
this_version = Version(__version__)
last_run_version = WorkflowDatabaseManager.check_db_compatibility(db_file)

for itt, (this, that) in enumerate(zip_longest(
Expand Down
42 changes: 24 additions & 18 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@
)

if TYPE_CHECKING:
from cylc.flow.id import Tokens
from cylc.flow.cycling import PointBase
from cylc.flow.task_action_timer import TaskActionTimer
from cylc.flow.taskdef import TaskDef
from cylc.flow.id import Tokens


class TaskProxy:
"""Represent an instance of a cycling task in a running workflow.
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for wall_clock xtrigger).
.clock_trigger_times:
Memoization of clock trigger times (Used for wall_clock xtrigger):
{offset string: seconds from epoch}
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -152,7 +152,7 @@ class TaskProxy:

# Memory optimization - constrain possible attributes to this list.
__slots__ = [
'clock_trigger_time',
'clock_trigger_times',
'expire_time',
'identity',
'is_late',
Expand Down Expand Up @@ -244,7 +244,7 @@ def __init__(
self.try_timers: Dict[str, 'TaskActionTimer'] = {}
self.non_unique_events = Counter() # type: ignore # TODO: figure out

self.clock_trigger_time: Optional[float] = None
self.clock_trigger_times: Dict[str, int] = {}
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
Expand Down Expand Up @@ -355,25 +355,31 @@ def get_point_as_seconds(self):
self.point_as_seconds += utc_offset_in_seconds
return self.point_as_seconds

def get_clock_trigger_time(self, offset_str):
"""Compute, cache, and return trigger time relative to cycle point.
def get_clock_trigger_time(
self,
point: 'PointBase', offset_str: Optional[str] = None
) -> int:
"""Compute, cache and return trigger time relative to cycle point.
Args:
offset_str: ISO8601Interval string, e.g. "PT2M".
Can be None for zero offset.
point: Task's cycle point.
offset_str: ISO8601 interval string, e.g. "PT2M".
Can be None for zero offset.
Returns:
Absolute trigger time in seconds since Unix epoch.
"""
if self.clock_trigger_time is None:
if offset_str is None:
trigger_time = self.point
offset_str = offset_str if offset_str else 'P0Y'
if offset_str not in self.clock_trigger_times:
if offset_str == 'P0Y':
trigger_time = point
else:
trigger_time = self.point + ISO8601Interval(offset_str)
self.clock_trigger_time = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch
)
return self.clock_trigger_time
trigger_time = point + ISO8601Interval(offset_str)

offset = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch)
self.clock_trigger_times[offset_str] = offset
return self.clock_trigger_times[offset_str]

def get_try_num(self):
"""Return the number of automatic tries (try number)."""
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def get_xtrig_ctx(
# External (clock xtrigger): convert offset to trigger_time.
# Datetime cycling only.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
itask.point,
ctx.func_kwargs["offset"]
)
else:
Expand Down
67 changes: 67 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the behaviour of xtrigger manager.
"""


async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
offsets that xtrigger manager gets both of them.
https://github.com/cylc/cylc-flow/issues/5783
n.b. Clock 3 exists to check the memoization path is followed,
and causing this test to give greater coverage.
"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
'clock_2': 'wall_clock(offset=P10Y)',
'clock_3': 'wall_clock(offset=P10Y)',
},
'graph': {
'R1': '@clock_1 & @clock_2 & @clock_3 => foo'
}
}
})
schd = scheduler(id_)
async with start(schd):
foo_proxy = schd.pool.get_tasks()[0]
clock_1_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_1')
clock_2_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')
clock_3_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')

assert clock_1_ctx.func_kwargs['trigger_time'] == task_point
assert clock_2_ctx.func_kwargs['trigger_time'] == ten_years_ahead
assert clock_3_ctx.func_kwargs['trigger_time'] == ten_years_ahead

schd.xtrigger_mgr.call_xtriggers_async(foo_proxy)
assert foo_proxy.state.xtriggers == {
'clock_1': True,
'clock_2': False,
'clock_3': False,
}
Loading

0 comments on commit 41c38eb

Please sign in to comment.