Skip to content

Commit

Permalink
Merge pull request #53 from ecmwf/develop
Browse files Browse the repository at this point in the history
3.4 release
  • Loading branch information
corentincarton authored Oct 14, 2024
2 parents c254802 + c34200d commit 7198098
Show file tree
Hide file tree
Showing 23 changed files with 876 additions and 132 deletions.
2 changes: 1 addition & 1 deletion docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- python
- pip
- jupyterlab
- ecflow
- ecflow >= 5.13
- jinja2
- requests
- pytest
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- python
- pip
- jupyterlab
- ecflow
- ecflow >= 5.13
- jinja2
- requests
- pytest
Expand Down
11 changes: 10 additions & 1 deletion pyflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import absolute_import

from .attributes import (
Aviso,
Complete,
Cron,
Crons,
Expand All @@ -16,6 +17,7 @@
Late,
Limit,
Meter,
Mirror,
RepeatDate,
RepeatDateTime,
RepeatEnumerated,
Expand Down Expand Up @@ -47,4 +49,11 @@
from .nodes import AnchorFamily, Family, Suite, Task, ecflow_name
from .resource import DataResource, FileResource, Resources, WebResource
from .script import FileScript, PythonScript, Script, TemplateFileScript, TemplateScript
from .version import __version__

try:
# NOTE: the `_version.py` file must not be present in the git repository
# as it is generated by setuptools at install time
from ._version import __version__
except ImportError: # pragma: no cover
# Local copy or not installed with setuptools
__version__ = ""
139 changes: 137 additions & 2 deletions pyflow/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,28 @@ def _build(self, ecflow_parent):
ecflow_parent.add_variable(str(self.name), str(self.value))


class GeneratedVariable(Exportable):
"""
An attribute for referencing an **ecFlow** generated variable.
The variable value will be generated automatically by ecFlow.
Parameters:
name(str): The name of the variable.
Example::
GeneratedVariable('FOO')
"""

def __init__(self, name):
if not is_variable(name):
raise ValueError("'{}' is not a valid variable name".format(name))
super().__init__(name)

def _build(self, *args, **kwargs):
return


class Edit:
"""
An attribute for setting multiple **ecFlow** variables.
Expand Down Expand Up @@ -539,7 +561,15 @@ def day_of_week(self):


for dow, day in enumerate(
("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
(
"monday",
"tuesday",
"wednesday",
"thursday",
"friday",
"saturday",
"sunday",
)
):
setattr(RepeatDate, day, property(lambda self: Eq(self.day_of_week, dow)))

Expand Down Expand Up @@ -722,7 +752,10 @@ def make_variable(node, name, value):
if len(value) == 3:
if isinstance(value[2], int):
return RepeatDate(
name, as_date(value[0]), as_date(value[1]), value[2]
name,
as_date(value[0]),
as_date(value[1]),
value[2],
)
else:
return RepeatDate(name, as_date(value[0]), as_date(value[1]), 1)
Expand Down Expand Up @@ -1255,3 +1288,105 @@ def _add(self, time, adder, rel=1):
adder(ecflow.TimeSlot(hour, mins), rel)
else:
adder(ecflow.TimeSlot(hour, mins))


###################################################################


class Aviso(Attribute):
"""
An attribute that allows a node to be triggered by an external Aviso notification.
Parameters:
name(str): The name of the attribute.
listener(str): The listener configuration.
url(str): The URL to the Aviso server.
schema(str): The schema used to process Aviso notifications.
polling(str, int): The time interval used to poll the Aviso server.
auth(str): The path to the Aviso authentication credentials file.
Example::
pyflow.Aviso("AVISO_NOTIFICATION",
r'{ "event": "mars", "request": { "class": "od"} }',
"https://aviso.ecm:8888/v1",
"/path/to/schema.json"
60,
"/path/to/auth.json")
"""

def __init__(self, name, listener, url, schema, polling, auth):
super().__init__(name)
self.listener = str(listener)
self.url = str(url)
self.schema = str(schema)
self.polling = str(polling)
self.auth = str(auth)

def _build(self, ecflow_parent):
# The listener configuration must be provided as a single-quoted JSON string
quoted_listener_cfg = "'{}'".format(self.listener)

aviso = ecflow.AvisoAttr(
self.name,
quoted_listener_cfg,
self.url,
self.schema,
self.polling,
self.auth,
)

ecflow_parent.add_aviso(aviso)


###################################################################


class Mirror(Attribute):
"""
An attribute that allows a node status to be synchronized with a node from another ecFlow server.
Parameters:
name(str): The name of the attribute.
remote_path(str): The path to the mirrored node on the remote ecFlow server.
remote_host(str): The host used to connect to the remote ecFlow server.
remote_port(str, int): The port used to connect to the remote ecFlow server.
polling(str, int): The time interval used to poll the remote ecFlow server.
ssl(bool): The flag indicating if SSL communication is enabled.
auth(str): The path to the ecFlow authentication credentials file.
Example::
pyflow.Mirror("NODE_MIRROR"
"/suite/family/task",
"remote-ecflow-server",
"3141",
60,
False
"/path/to/auth.json")
"""

def __init__(self, name, remote_path, remote_host, remote_port, polling, ssl, auth):
super().__init__(name)
self.remote_path = str(remote_path)
self.remote_host = str(remote_host)
self.remote_port = str(remote_port)
self.polling = str(polling)
self.ssl = bool(ssl)
self.auth = str(auth)

def _build(self, ecflow_parent):
mirror = ecflow.MirrorAttr(
self.name,
self.remote_path,
self.remote_host,
self.remote_port,
self.polling,
self.ssl,
self.auth,
)

ecflow_parent.add_mirror(mirror)
3 changes: 2 additions & 1 deletion pyflow/configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ def choices(cls):
Return an iterable of (named) tuples of the name, help string and options
"""
Configurable = namedtuple(
"Configurable", ["name", "help", "choices", "multichoice", "default"]
"Configurable",
["name", "help", "choices", "multichoice", "default"],
)
for attr_name, configuration_class in cls.configurations():
choices = configuration_class.available_choices()
Expand Down
4 changes: 3 additions & 1 deletion pyflow/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def __init__(self, cron):

if minute is None and hour is None:
ts = ecflow.TimeSeries(
ecflow.TimeSlot(0, 0), ecflow.TimeSlot(23, 59), ecflow.TimeSlot(0, 1)
ecflow.TimeSlot(0, 0),
ecflow.TimeSlot(23, 59),
ecflow.TimeSlot(0, 1),
)
else:
if minute is None:
Expand Down
4 changes: 3 additions & 1 deletion pyflow/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def deploy_uniqueness_check(self, source, target):
with open(target, "r") as f:
old_content = f.read()
diff = difflib.unified_diff(
old_content.splitlines(), source.splitlines(), lineterm=""
old_content.splitlines(),
source.splitlines(),
lineterm="",
)
diff_str = "\n".join(diff)
print(
Expand Down
104 changes: 73 additions & 31 deletions pyflow/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,9 @@ def copy_file_to(self, source_file, target_file):
mkdir -p "{}"
cp "{}" "{}"
""".format(
os.path.dirname(os.path.abspath(target_file)), source_file, target_file
os.path.dirname(os.path.abspath(target_file)),
source_file,
target_file,
)
)

Expand Down Expand Up @@ -666,7 +668,11 @@ def run_simple_command(self, cmd):

if self.indirect_host is not None:
return "ssh -o StrictHostKeyChecking=no {}@{} ssh -o StrictHostKeyChecking=no {}@{} {}".format(
self.indirect_user, self.indirect_host, self.user, self.hostname, cmd
self.indirect_user,
self.indirect_host,
self.user,
self.hostname,
cmd,
)
else:
return "ssh -o StrictHostKeyChecking=no {}@{} {}".format(
Expand Down Expand Up @@ -958,14 +964,21 @@ class TroikaHost(Host):
def __init__(self, name, user, **kwargs):
self.troika_exec = kwargs.pop("troika_exec", "troika")
self.troika_config = kwargs.pop("troika_config", "")
self.troika_version = tuple(
map(int, kwargs.pop("troika_version", "0.2.1").split("."))
)
super().__init__(name, user=user, **kwargs)

def troika_command(self, command):
cmd = " ".join(
[
f"{self.troika_exec}",
f"%TROIKA:{self.troika_exec}%",
"-vv",
f"-c {self.troika_config}" if self.troika_config else "",
(
f"-c %TROIKA_CONFIG:{self.troika_config}%"
if self.troika_config
else ""
),
f"{command}",
f"-u {self.user}",
]
Expand Down Expand Up @@ -1015,37 +1028,66 @@ def script_submit_arguments(self, submit_arguments):
"""
Accepted submit arguments:
"""
resources = {
"queue": "--qos=",
"job_name": "--job-name=",
"tasks": "--ntasks=",
"nodes": "--nodes=",
"threads_per_task": "--cpus-per-task=",
"tasks_per_node": "--ntasks-per-node=",
"hyperthreads": "--threads-per-core=",
"memory_per_task": "--mem-per-cpu=",
"accounting": "--account=",
"working_dir": "--chdir=",
"time": "--time=",
"output": "--output=",
"error": "--error=",
"priority": "--priority=",
"tmpdir": "--gres=ssdtmp:",
"sthost": "--export=STHOST=",

deprecated = {
"tasks": "total_tasks",
"nodes": "total_nodes",
"threads_per_task": "cpus_per_task",
"hyperthreads": "threads_per_core",
"memory_per_task": "memory_per_cpu",
"accounting": "billing_account",
"working_dir": "working_dir",
"tmpdir": "tmpdir_size",
"export": "export_vars",
}

slurm_resources = {
"hint": " --hint=",
"distribution": " --distribution=",
"reservation": "--reservation=",
"partition": "--partition=",
"exclusive": "--exclusive",
}

for arg in submit_arguments.keys():
if arg not in resources.keys():
raise KeyError(f"Submit argument {arg} not supported!")
if self.troika_version < (0, 2, 2):
slurm_resources.update(
{
"distribution": "--distribution=",
"reservation": "--reservation=",
}
)

def _translate_hint(val):
if val == "multithread":
return "enable_hyperthreading", "yes"
elif val == "nomultithread":
return "enable_hyperthreading", "no"
else:
return "hint", val

def _translate_sthost(val):
return "export_vars", f"STHOST={val}"

special = {
"hint": _translate_hint,
"sthost": _translate_sthost,
}

args = []
for key, resource in resources.items():
if key in submit_arguments and resource is not None:
args.append("#SBATCH {}{}".format(resource, submit_arguments[key]))
for arg, val in submit_arguments.items():
if arg in special:
arg, val = special[arg](val)

if arg in slurm_resources:
resource = slurm_resources[arg]
if resource is not None:
args.append("#SBATCH {}{}".format(resource, val))
elif arg == "RAW_PRAGMA":
for pragma in val:
args.append(pragma)
else:
if arg in deprecated:
print(
f"WARNING! '{arg}' is deprecated, use '{deprecated[arg]}' instead"
)
arg = deprecated[arg]
if arg is not None:
args.append("#TROIKA {}={}".format(arg, val))

return args
4 changes: 3 additions & 1 deletion pyflow/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def definition_to_html(d):
)

n = re.sub(
r" (\%) ", r' <span style="text-weight: bold; color: purple">\1</span> ', n
r" (\%) ",
r' <span style="text-weight: bold; color: purple">\1</span> ',
n,
)

n = re.sub(
Expand Down
Loading

0 comments on commit 7198098

Please sign in to comment.