From c227baf09cec1101658088798b0a47796e99e3ab Mon Sep 17 00:00:00 2001 From: Olivier Iffrig Date: Tue, 21 Nov 2023 11:09:05 +0000 Subject: [PATCH 01/26] Use Troika directives in TroikaHost (WIP) --- pyflow/host.py | 81 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/pyflow/host.py b/pyflow/host.py index 6050edc..3b2ea22 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -1014,35 +1014,68 @@ def script_submit_arguments(self, submit_arguments): """ Accepted submit arguments: """ + + TROIKA_0_2_2 = False + resources = { - "queue": "--qos=", - "job_name": "--job-name=", - "tasks": "--ntasks=", - "nodes": "--nodes=", - "threads_per_task": "--cpus-per-task=", - "tasks_per_node": "--ntasks-per-node=", - "hyperthreads": "--threads-per-core=", - "memory_per_task": "--mem-per-cpu=", - "accounting": "--account=", - "working_dir": "--chdir=", - "time": "--time=", - "output": "--output=", - "error": "--error=", - "priority": "--priority=", - "tmpdir": "--gres=ssdtmp:", - "sthost": "--export=STHOST=", + "queue": "queue", + "job_name": "job_name", + "tasks": "total_tasks", + "nodes": "total_nodes", + "threads_per_task": "cpus_per_task", + "tasks_per_node": "tasks_per_node", + "hyperthreads": "threads_per_core", + "memory_per_task": "memory_per_cpu", + "accounting": "billing_account", + "working_dir": "working_dir", + "time": "time", + "output": "output", + "error": "error", + "priority": "priority", + "tmpdir": "tmpdir_size", + "enable_hyperthreading": "enable_hyperthreading", + "distribution": "distribution", # XXX: troika >=0.2.2 + "reservation": "reservation", # XXX: troika >=0.2.2 + } + + slurm_resources = { + "tmpdir": "--gres=ssdtmp:", # XXX: ECMWF-specific + "sthost": "--export=STHOST=", # XXX: ECMWF-specific "hint": " --hint=", - "distribution": " --distribution=", - "reservation": "--reservation=", } - for arg in submit_arguments.keys(): - if arg not in resources.keys(): - raise KeyError(f"Submit argument {arg} not supported!") + if not TROIKA_0_2_2: + slurm_resources.update({ + "distribution": "--distribution=", + "reservation": "--reservation=", + }) + + def _translate_hint(val): + if val == "multithread": + return "enable_hyperthreading", "yes" + elif val == "nomultithread": + return "enable_hyperthreading", "no" + else: + return "hint", val + + special = { + "hint": _translate_hint, + } args = [] - for key, resource in resources.items(): - if key in submit_arguments and resource is not None: - args.append("#SBATCH {}{}".format(resource, submit_arguments[key])) + for arg, val in submit_arguments.items(): + if arg in special: + arg, val = special[arg](val) + + if arg in slurm_resources: + resource = slurm_resources[arg] + if resource is not None: + args.append("#SBATCH {}{}".format(resource, val)) + elif arg in resources: + resource = resources[arg] + if resource is not None: + args.append("#TROIKA {}={}".format(resource, val)) + else: + raise KeyError(f"Submit argument {arg} not supported!") return args From fc9dae3bcb180f0c07a31d1869b2587161a221e5 Mon Sep 17 00:00:00 2001 From: Olivier Iffrig Date: Tue, 21 Nov 2023 18:08:32 +0000 Subject: [PATCH 02/26] Reformat code --- pyflow/host.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pyflow/host.py b/pyflow/host.py index 3b2ea22..e4e45dd 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -1045,10 +1045,12 @@ def script_submit_arguments(self, submit_arguments): } if not TROIKA_0_2_2: - slurm_resources.update({ - "distribution": "--distribution=", - "reservation": "--reservation=", - }) + slurm_resources.update( + { + "distribution": "--distribution=", + "reservation": "--reservation=", + } + ) def _translate_hint(val): if val == "multithread": From 1421203c597ae42b7ddd8b1863be93ddc6c79ab9 Mon Sep 17 00:00:00 2001 From: Olivier Iffrig Date: Wed, 22 Nov 2023 13:56:27 +0000 Subject: [PATCH 03/26] Add option to control compatibility with Troika <0.2.2 --- pyflow/host.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pyflow/host.py b/pyflow/host.py index e4e45dd..4e38074 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -957,6 +957,7 @@ class TroikaHost(Host): def __init__(self, name, user, **kwargs): self.troika_exec = kwargs.pop("troika_exec", "troika") self.troika_config = kwargs.pop("troika_config", "") + self.troika_0_2_2 = kwargs.pop("troika_0_2_2", False) super().__init__(name, user=user, **kwargs) def troika_command(self, command): @@ -1015,8 +1016,6 @@ def script_submit_arguments(self, submit_arguments): Accepted submit arguments: """ - TROIKA_0_2_2 = False - resources = { "queue": "queue", "job_name": "job_name", @@ -1044,7 +1043,7 @@ def script_submit_arguments(self, submit_arguments): "hint": " --hint=", } - if not TROIKA_0_2_2: + if not self.troika_0_2_2: slurm_resources.update( { "distribution": "--distribution=", From 926cd0944a1550dabb059259cbf4d9510a75e2fe Mon Sep 17 00:00:00 2001 From: Olivier Iffrig Date: Thu, 23 Nov 2023 10:31:14 +0000 Subject: [PATCH 04/26] Handle STHOST via troika's export_vars --- pyflow/host.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyflow/host.py b/pyflow/host.py index 4e38074..3971501 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -1033,13 +1033,13 @@ def script_submit_arguments(self, submit_arguments): "priority": "priority", "tmpdir": "tmpdir_size", "enable_hyperthreading": "enable_hyperthreading", + "export": "export_vars", "distribution": "distribution", # XXX: troika >=0.2.2 "reservation": "reservation", # XXX: troika >=0.2.2 } slurm_resources = { "tmpdir": "--gres=ssdtmp:", # XXX: ECMWF-specific - "sthost": "--export=STHOST=", # XXX: ECMWF-specific "hint": " --hint=", } @@ -1059,8 +1059,12 @@ def _translate_hint(val): else: return "hint", val + def _translate_sthost(val): + return "export", f"STHOST={val}" + special = { "hint": _translate_hint, + "sthost": _translate_sthost, } args = [] From 64b738f81779848d6152d8b8058871100fa9c989 Mon Sep 17 00:00:00 2001 From: Olivier Iffrig Date: Tue, 12 Dec 2023 11:21:09 +0000 Subject: [PATCH 05/26] Add partition key for Troika host --- pyflow/host.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyflow/host.py b/pyflow/host.py index 3971501..d64aecf 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -1034,6 +1034,7 @@ def script_submit_arguments(self, submit_arguments): "tmpdir": "tmpdir_size", "enable_hyperthreading": "enable_hyperthreading", "export": "export_vars", + "partition": "partition", "distribution": "distribution", # XXX: troika >=0.2.2 "reservation": "reservation", # XXX: troika >=0.2.2 } From 1b9d1227fb3d3a8e9db81e2e385824559e052797 Mon Sep 17 00:00:00 2001 From: Corentin Carton de Wiart Date: Thu, 2 May 2024 10:33:17 +0100 Subject: [PATCH 06/26] update troika host to propagate pragma directly to troika --- pyflow/host.py | 33 +++++++++------------- tests/test_host.py | 68 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 79 insertions(+), 22 deletions(-) diff --git a/pyflow/host.py b/pyflow/host.py index d64aecf..d17242d 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -957,15 +957,15 @@ class TroikaHost(Host): def __init__(self, name, user, **kwargs): self.troika_exec = kwargs.pop("troika_exec", "troika") self.troika_config = kwargs.pop("troika_config", "") - self.troika_0_2_2 = kwargs.pop("troika_0_2_2", False) + self.troika_version = tuple(kwargs.pop("troika_version", "0.2.1").split(".")) super().__init__(name, user=user, **kwargs) def troika_command(self, command): cmd = " ".join( [ - f"{self.troika_exec}", + f"%TROIKA:{self.troika_exec}%", "-vv", - f"-c {self.troika_config}" if self.troika_config else "", + f"-c %TROIKA_CONFIG:{self.troika_config}%" if self.troika_config else "", f"{command}", f"-u {self.user}", ] @@ -1016,9 +1016,7 @@ def script_submit_arguments(self, submit_arguments): Accepted submit arguments: """ - resources = { - "queue": "queue", - "job_name": "job_name", + deprecated = { "tasks": "total_tasks", "nodes": "total_nodes", "threads_per_task": "cpus_per_task", @@ -1027,16 +1025,8 @@ def script_submit_arguments(self, submit_arguments): "memory_per_task": "memory_per_cpu", "accounting": "billing_account", "working_dir": "working_dir", - "time": "time", - "output": "output", - "error": "error", - "priority": "priority", "tmpdir": "tmpdir_size", - "enable_hyperthreading": "enable_hyperthreading", "export": "export_vars", - "partition": "partition", - "distribution": "distribution", # XXX: troika >=0.2.2 - "reservation": "reservation", # XXX: troika >=0.2.2 } slurm_resources = { @@ -1044,7 +1034,7 @@ def script_submit_arguments(self, submit_arguments): "hint": " --hint=", } - if not self.troika_0_2_2: + if self.troika_version < (0, 2, 2): slurm_resources.update( { "distribution": "--distribution=", @@ -1077,11 +1067,14 @@ def _translate_sthost(val): resource = slurm_resources[arg] if resource is not None: args.append("#SBATCH {}{}".format(resource, val)) - elif arg in resources: - resource = resources[arg] - if resource is not None: - args.append("#TROIKA {}={}".format(resource, val)) + elif arg == "RAW_PRAGMA": + for pragma in val: + args.append(pragma) else: - raise KeyError(f"Submit argument {arg} not supported!") + if arg in deprecated: + print(f"WARNING! '{arg}' is deprecated, use '{deprecated[arg]}' instead") + arg = deprecated[arg] + if arg is not None: + args.append("#TROIKA {}={}".format(arg, val)) return args diff --git a/tests/test_host.py b/tests/test_host.py index e495b64..04ad143 100644 --- a/tests/test_host.py +++ b/tests/test_host.py @@ -1,4 +1,5 @@ import pytest +import unittest import pyflow @@ -227,8 +228,71 @@ def test_host_kill_cmd(class_name): assert host.kill_cmd == expected_kill_cmd -# def test_pre_post_amble(): -# assert False +# class TestTroikaHost(unittest.TestCase): +# def setUp(self): +# self.troika_host = pyflow.TroikaHost( +# name='test_host', +# user='test_user', +# hostname='test_hostname', +# scratch_directory='/scratch', +# log_directory='/log', +# resources_directory='/resources', +# limit=10, +# extra_paths=['/usr/bin', '/bin'], +# extra_variables={'VAR1': 'value1'}, +# environment_variables={'ENV1': 'env_value1'}, +# module_source='module.sh', +# modules=['python', 'gcc'], +# purge_models=True, +# label_host=False, +# ecflow_path='/ecflow', +# server_ecfvars=False +# ) + +# def test_initialization(self): +# self.assertEqual(self.troika_host.name, 'test_host') +# self.assertEqual(self.troika_host.user, 'test_user') +# self.assertEqual(self.troika_host.hostname, 'test_hostname') +# self.assertEqual(self.troika_host.scratch_directory, '/scratch/test_user') +# self.assertEqual(self.troika_host.log_directory, '/log/test_user') +# self.assertEqual(self.troika_host.limit, 10) +# self.assertListEqual(self.troika_host.extra_paths, ['/usr/bin', '/bin']) +# self.assertDictEqual(self.troika_host.extra_variables, {'VAR1': 'value1'}) +# self.assertDictEqual(self.troika_host.environment_variables, {'ENV1': 'env_value1'}) +# self.assertEqual(self.troika_host.module_source, 'module.sh') +# self.assertListEqual(self.troika_host.modules, ['python', 'gcc']) +# self.assertTrue(self.troika_host.purge_models) +# self.assertFalse(self.troika_host.label_host) +# self.assertEqual(self.troika_host.ecflow_path, '/ecflow') +# self.assertFalse(self.troika_host.server_ecfvars) + +# def test_troika_command(self): +# command = self.troika_host.troika_command('test_command') +# expected_command = '%TROIKA:troika% -vv -c %TROIKA_CONFIG:% -u test_user test_command -u test_user' +# self.assertEqual(command, expected_command) + +# def test_job_command_property(self): +# self.assertIn('submit', self.troika_host.job_cmd) +# self.assertIn('test_hostname', self.troika_host.job_cmd) + +# def test_kill_command_property(self): +# self.assertIn('kill', self.troika_host.kill_cmd) +# self.assertIn('test_hostname', self.troika_host.kill_cmd) + +# def test_status_command_property(self): +# self.assertIn('monitor', self.troika_host.status_cmd) +# self.assertIn('test_hostname', self.troika_host.status_cmd) + +# def test_check_command_property(self): +# self.assertIn('check', self.troika_host.check_cmd) +# self.assertIn('test_hostname', self.troika_host.check_cmd) + +# def test_script_submit_arguments(self): +# submit_args = {'tasks': 2, 'sthost': 'test', 'tmpdir': '500', 'hint': 'nomultithread'} +# results = self.troika_host.script_submit_arguments(submit_args) +# self.assertIn('export STHOST=test', results) +# self.assertIn('--gres=ssdtmp:500', results) +# self.assertIn('--hint=nomultithread', results) if __name__ == "__main__": From 96df9aeff93b7e7e07e2ed690ff9c4e24eab6d7c Mon Sep 17 00:00:00 2001 From: Olivier Iffrig Date: Wed, 12 Jun 2024 10:59:44 +0000 Subject: [PATCH 07/26] TroikaHost: drop special case for tmpdir --- pyflow/host.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyflow/host.py b/pyflow/host.py index d17242d..3e5ee2b 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -1030,7 +1030,6 @@ def script_submit_arguments(self, submit_arguments): } slurm_resources = { - "tmpdir": "--gres=ssdtmp:", # XXX: ECMWF-specific "hint": " --hint=", } From 6359d74344eea9a6e9f88a4e2028e7f17af7e859 Mon Sep 17 00:00:00 2001 From: Marcos Bento Date: Tue, 25 Jun 2024 10:18:47 +0100 Subject: [PATCH 08/26] Add Aviso attribute n.b. This new attribute was introduced in ecFlow 5.13.0 --- pyflow/__init__.py | 1 + pyflow/attributes.py | 50 ++++++++++++++++++++++++ tests/test_attributes.py | 84 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+) diff --git a/pyflow/__init__.py b/pyflow/__init__.py index cddb6a3..93e2810 100644 --- a/pyflow/__init__.py +++ b/pyflow/__init__.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from .attributes import ( + Aviso, Complete, Cron, Crons, diff --git a/pyflow/attributes.py b/pyflow/attributes.py index 4cf8d3d..9d82012 100644 --- a/pyflow/attributes.py +++ b/pyflow/attributes.py @@ -1255,3 +1255,53 @@ def _add(self, time, adder, rel=1): adder(ecflow.TimeSlot(hour, mins), rel) else: adder(ecflow.TimeSlot(hour, mins)) + + +################################################################### + + +class Aviso(Attribute): + """ + An attribute that allows a node to be triggered by an external Aviso notification. + + Parameters: + name(str): The name of the attribute. + listener(str): The listener configuration. + url(str): The URL to the Aviso server. + schema(str): The schema used to process Aviso notifications. + polling(str, int): The time interval used to poll the Aviso server. + auth(str): The path to the Aviso authentication credentials file. + + Example:: + + pyflow.Aviso("AVISO_NOTIFICATION", + r'{ "event": "mars", "request": { "class": "od"} }', + "https://aviso.ecm:8888/v1", + "/path/to/schema.json" + 60, + "/path/to/auth.json") + + """ + + def __init__(self, name, listener, url, schema, polling, auth): + super().__init__(name) + self.listener = str(listener) + self.url = str(url) + self.schema = str(schema) + self.polling = str(polling) + self.auth = str(auth) + + def _build(self, ecflow_parent): + # The listener configuration must be provided as a single-quoted JSON string + quoted_listener_cfg = "'{}'".format(self.listener) + + aviso = ecflow.AvisoAttr( + self.name, + quoted_listener_cfg, + self.url, + self.schema, + self.polling, + self.auth, + ) + + ecflow_parent.add_aviso(aviso) diff --git a/tests/test_attributes.py b/tests/test_attributes.py index f34c1bb..fca56c8 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -1,3 +1,5 @@ +import json +import os from datetime import date, datetime, timedelta import pytest @@ -531,6 +533,88 @@ def test_repeat_datetime(self): s.check_definition() +class TestAviso: + """A set of tests for Aviso attributes.""" + + def test_create_aviso_from_strings(self): + name = "AVISO_ATTRIBUTE" + listener = "{}" + url = "https://aviso.ecm:8888/v1" + schema = "/path/to/schema.json" + polling = "%ECFLOW_AVISO_POLLING%" + auth = "/path/to/auth.json" + + attr = pyflow.Aviso(name, listener, url, schema, polling, auth) + + assert attr.name == name + assert attr.listener == listener + assert attr.url == url + assert attr.schema == schema + assert attr.polling == polling + assert attr.auth == auth + + def test_create_aviso_from_objects(self): + name = "AVISO_ATTRIBUTE" + listener = json.loads(r'{ "event": "mars", "request": { "class": "od"} }') + url = "https://aviso.ecm:8888/v1" + schema = os.path.join(os.path.dirname(__file__), "schema.json") + polling = 60 + auth = os.path.join(os.path.dirname(__file__), "auth.json") + + attr = pyflow.Aviso(name, listener, url, schema, polling, auth) + + assert attr.name == name + assert attr.listener == str(listener) + assert attr.url == url + assert attr.schema == str(schema) + assert attr.polling == str(polling) + assert attr.auth == str(auth) + + def test_create_aviso_on_task(self): + with pyflow.Suite("s") as s: + assert "s" == s.name + with pyflow.Family("f") as f: + assert "f" == f.name + with pyflow.Task("t") as t: + assert "t" == t.name + name = "AVISO_ATTRIBUTE" + listener = r'{ "event": "mars", "request": { "class": "od"} }' + url = "https://aviso.ecm:8888/v1" + schema = "/path/to/schema.json" + polling = "60" + auth = "/path/to/auth.json" + + pyflow.Aviso(name, listener, url, schema, polling, auth) + + s.check_definition() + + def test_definitions_content_with_aviso_attribute(self): + with pyflow.Suite("s") as s: + assert "s" == s.name + with pyflow.Family("f") as f: + assert "f" == f.name + with pyflow.Task("t") as t: + assert "t" == t.name + name = "AVISO_ATTRIBUTE" + listener = r'{ "event": "mars", "request": { "class": "od"} }' + url = "https://aviso.ecm:8888/v1" + schema = "/path/to/schema.json" + polling = "60" + auth = "/path/to/auth.json" + + pyflow.Aviso(name, listener, url, schema, polling, auth) + + defs = s.ecflow_definition() + + assert "aviso --name AVISO_ATTRIBUTE" in str(defs) + assert '--listener \'{ "event": "mars", "request": { "class": "od"} }\'' in str( + defs + ) + assert "--url https://aviso.ecm:8888/v1" in str(defs) + assert "--schema /path/to/schema.json" in str(defs) + assert "--auth /path/to/auth.json" in str(defs) + + if __name__ == "__main__": from os import path From 9e9e2face44c293e9e87a8c06784e4808ba37d10 Mon Sep 17 00:00:00 2001 From: Marcos Bento Date: Tue, 25 Jun 2024 11:25:38 +0100 Subject: [PATCH 09/26] Add Mirror attribute n.b. This new attribute was introduced in ecFlow 5.13.0 --- pyflow/__init__.py | 1 + pyflow/attributes.py | 52 +++++++++++++++++++++ tests/test_attributes.py | 98 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+) diff --git a/pyflow/__init__.py b/pyflow/__init__.py index 93e2810..5c108e9 100644 --- a/pyflow/__init__.py +++ b/pyflow/__init__.py @@ -17,6 +17,7 @@ Late, Limit, Meter, + Mirror, RepeatDate, RepeatDateTime, RepeatEnumerated, diff --git a/pyflow/attributes.py b/pyflow/attributes.py index 9d82012..2ef1502 100644 --- a/pyflow/attributes.py +++ b/pyflow/attributes.py @@ -1305,3 +1305,55 @@ def _build(self, ecflow_parent): ) ecflow_parent.add_aviso(aviso) + + +################################################################### + + +class Mirror(Attribute): + """ + An attribute that allows a node status to be synchronized with a node from another ecFlow server. + + Parameters: + name(str): The name of the attribute. + remote_path(str): The path to the mirrored node on the remote ecFlow server. + remote_host(str): The host used to connect to the remote ecFlow server. + remote_port(str, int): The port used to connect to the remote ecFlow server. + polling(str, int): The time interval used to poll the remote ecFlow server. + ssl(bool): The flag indicating if SSL communication is enabled. + auth(str): The path to the ecFlow authentication credentials file. + + + Example:: + + pyflow.Mirror("NODE_MIRROR" + "/suite/family/task", + "remote-ecflow-server", + "3141", + 60, + False + "/path/to/auth.json") + + """ + + def __init__(self, name, remote_path, remote_host, remote_port, polling, ssl, auth): + super().__init__(name) + self.remote_path = str(remote_path) + self.remote_host = str(remote_host) + self.remote_port = str(remote_port) + self.polling = str(polling) + self.ssl = bool(ssl) + self.auth = str(auth) + + def _build(self, ecflow_parent): + mirror = ecflow.MirrorAttr( + self.name, + self.remote_path, + self.remote_host, + self.remote_port, + self.polling, + self.ssl, + self.auth, + ) + + ecflow_parent.add_mirror(mirror) diff --git a/tests/test_attributes.py b/tests/test_attributes.py index fca56c8..a962c70 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -615,6 +615,104 @@ def test_definitions_content_with_aviso_attribute(self): assert "--auth /path/to/auth.json" in str(defs) +class TestMirror: + """A set of tests for Mirror attributes.""" + + def test_create_mirror_from_strings(self): + name = "MIRROR_ATTRIBUTE" + remote_path = "/s/f/t" + remote_host = "remote-ecflow-server" + remote_port = "3141" + polling = "%ECFLOW_MIRROR_POLLING%" + ssl = True + auth = "/path/to/auth.json" + + attr = pyflow.Mirror( + name, remote_path, remote_host, remote_port, polling, ssl, auth + ) + + assert attr.name == name + assert attr.remote_path == remote_path + assert attr.remote_host == remote_host + assert attr.remote_port == remote_port + assert attr.polling == polling + assert attr.ssl == ssl + assert attr.auth == auth + + def test_create_mirror_from_objects(self): + name = "MIRROR_ATTRIBUTE" + remote_path = "/s/f/t" + remote_host = "remote-ecflow-server" + remote_port = 3141 + polling = 60 + ssl = True + auth = "/path/to/auth.json" + + attr = pyflow.Mirror( + name, remote_path, remote_host, remote_port, polling, ssl, auth + ) + + assert attr.name == name + assert attr.remote_path == remote_path + assert attr.remote_host == remote_host + assert attr.remote_port == str(remote_port) + assert attr.polling == str(polling) + assert attr.ssl == ssl + assert attr.auth == auth + + def test_create_mirror_on_task(self): + with pyflow.Suite("s") as s: + assert "s" == s.name + with pyflow.Family("f") as f: + assert "f" == f.name + with pyflow.Task("t") as t: + assert "t" == t.name + + name = "MIRROR_ATTRIBUTE" + remote_path = "/s/f/t" + remote_host = "remote-ecflow-server" + remote_port = 3141 + polling = 60 + ssl = True + auth = "/path/to/auth.json" + + pyflow.Mirror( + name, remote_path, remote_host, remote_port, polling, ssl, auth + ) + + s.check_definition() + + def test_definitions_content_with_mirror_attribute(self): + with pyflow.Suite("s") as s: + assert "s" == s.name + with pyflow.Family("f") as f: + assert "f" == f.name + with pyflow.Task("t") as t: + assert "t" == t.name + + name = "MIRROR_ATTRIBUTE" + remote_path = "/s/f/t" + remote_host = "remote-ecflow-server" + remote_port = 3141 + polling = 60 + ssl = True + auth = "/path/to/auth.json" + + pyflow.Mirror( + name, remote_path, remote_host, remote_port, polling, ssl, auth + ) + + defs = s.ecflow_definition() + + assert "mirror --name MIRROR_ATTRIBUTE" in str(defs) + assert "--remote_path /s/f/t" in str(defs) + assert "--remote_host remote-ecflow-server" in str(defs) + assert "--remote_port 3141" in str(defs) + assert "--polling 60" in str(defs) + assert "--ssl" in str(defs) + assert "--remote_auth /path/to/auth.json" in str(defs) + + if __name__ == "__main__": from os import path From 9d3d2cde7bd791cab86aa6503687a75765e2f86d Mon Sep 17 00:00:00 2001 From: Jenny Wong Date: Mon, 1 Jul 2024 16:01:55 +0000 Subject: [PATCH 10/26] Add exit hook arg to family and propaage to children --- pyflow/nodes.py | 64 +++++++++++++++++++++++++++++++++--------- tests/test_families.py | 28 ++++++++++++++++++ 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/pyflow/nodes.py b/pyflow/nodes.py index f0b1042..2f3ebcd 100644 --- a/pyflow/nodes.py +++ b/pyflow/nodes.py @@ -233,6 +233,16 @@ def __getattr__(self, item): except KeyError: raise AttributeError("Node {} does not exist".format(item)) + def _add_single_node(self, node): + if node.name in self._nodes: + raise DuplicateNodeError( + self.fullname, node.name, self._nodes[node.name].fullname + ) + + node.parent.remove_node(node) + self._nodes[node.name] = node + node._parent = self + def add_node(self, node): """ Adds a child to current node. @@ -249,15 +259,7 @@ def add_node(self, node): node = self.add_node(n) return node - if node.name in self._nodes: - raise DuplicateNodeError( - self.fullname, node.name, self._nodes[node.name].fullname - ) - - node.parent.remove_node(node) - self._nodes[node.name] = node - node._parent = self - + self._add_single_node(node) return node def clear_type(self, cls): @@ -786,7 +788,14 @@ def generate_stub(self, scripts): class Family(Node): def __init__( - self, name, json=None, modules=None, purge_modules=False, extern=False, **kwargs + self, + name, + json=None, + modules=None, + purge_modules=False, + extern=False, + exit_hook=None, + **kwargs, ): """ Provides both visual and logical grouping of related families and tasks. @@ -800,6 +809,7 @@ def __init__( runtime. extern(bool): Whether the family is a shadow node created to satisfy an Extern_, and should not be generated. + exit_hook(str,list): a script containing some commands to be called at exit time. workdir(string): Working directory for tasks. autocancel(Autocancel_): An attribute for automatic removal of the node which has completed. completes(Complete_): An attribute for setting a condition for setting the node as complete depending on @@ -833,6 +843,7 @@ def __init__( pass """ + self._exit_hook = [] super().__init__( name, json=json, @@ -841,6 +852,8 @@ def __init__( extern=extern, **kwargs, ) + if exit_hook is not None: + self._add_exit_hook(exit_hook) def ecflow_object(self): """ @@ -880,6 +893,22 @@ def _build(self, ecflow_parent): ) ecflow_parent.add_family(self.generate_node()) + def _add_single_node(self, node): + if isinstance(node, (Family, Task)): + node._add_exit_hook(self._exit_hook) + super()._add_single_node(node) + + def _add_exit_hook(self, hook): + if isinstance(hook, str): + hook = [hook] + for hk in hook: + if hk not in self._exit_hook: + self._exit_hook.append(hk) + # Check if properly initialised + if "_nodes" in self.__dict__: + for chld in self.executable_children: + chld._add_exit_hook(hook) + class AnchorFamily(AnchorMixin, Family): def __init__( @@ -1154,10 +1183,12 @@ def __init__( self.script = kwargs.pop("script", Script()) self._clean_workdir = clean_workdir self._submit_arguments = submit_arguments or {} - self._exit_hook = ( - [exit_hook] if isinstance(exit_hook, str) else exit_hook - ) or [] + self._exit_hook = [] super().__init__(name, **kwargs) + # Setting this here ensures that exit hooks inherited from parents are + # ordered first + if exit_hook is not None: + self._add_exit_hook(exit_hook) # Get the host object, and attempt to add this task to its limits automatically. if autolimit: @@ -1258,6 +1289,13 @@ def task_purge_modules(self): return self.host.purge_modules or super().task_purge_modules() + def _add_exit_hook(self, hook: str): + if isinstance(hook, str): + hook = [hook] + for hk in hook: + if hk not in self._exit_hook: + self._exit_hook.append(hk) + def generate_script(self): """ Generates the complete script for the task. diff --git a/tests/test_families.py b/tests/test_families.py index e7fb148..b2af814 100644 --- a/tests/test_families.py +++ b/tests/test_families.py @@ -91,6 +91,34 @@ def test_files_locations(): assert t7.deploy_path == "/a/base/path/f5/f6/t7.ecf" +def test_exit_hook(): + """ + Propagate exit hook to children of a given family + """ + + with Family("f3", exit_hook="hook_f3") as f3: + t3 = Task("t3", exit_hook="hook_t3") + t4 = Task("t4") + + t5 = Task("t5") + + with Suite("S") as s: + with Family("f", exit_hook="hook_f", tasks=t5) as f: + Limit("limit1", 15) + Variable("VARIABLE1", 1234) + t1 = Task("t1", exit_hook="hook_t1") + with Family("f2", families=f3, exit_hook="hook_f2") as f2: + t2 = Task("t2", exit_hook="hook_t2") + + assert t1._exit_hook == ["hook_f", "hook_t1"] + assert f2._exit_hook == ["hook_f", "hook_f2"] + assert t2._exit_hook == ["hook_f", "hook_f2", "hook_t2"] + assert f3._exit_hook == ["hook_f3", "hook_f", "hook_f2"] + assert t3._exit_hook == ["hook_f3", "hook_t3", "hook_f", "hook_f2"] + assert t4._exit_hook == ["hook_f3", "hook_f", "hook_f2"] + assert t5._exit_hook == ["hook_f"] + + if __name__ == "__main__": from os import path From 4042a30bc636a8e9ef70c8d134aa28a8c2212843 Mon Sep 17 00:00:00 2001 From: Jenny Wong Date: Mon, 1 Jul 2024 16:09:41 +0000 Subject: [PATCH 11/26] flake8 --- tests/test_families.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_families.py b/tests/test_families.py index b2af814..b8163ea 100644 --- a/tests/test_families.py +++ b/tests/test_families.py @@ -102,7 +102,7 @@ def test_exit_hook(): t5 = Task("t5") - with Suite("S") as s: + with Suite("S"): with Family("f", exit_hook="hook_f", tasks=t5) as f: Limit("limit1", 15) Variable("VARIABLE1", 1234) @@ -110,6 +110,7 @@ def test_exit_hook(): with Family("f2", families=f3, exit_hook="hook_f2") as f2: t2 = Task("t2", exit_hook="hook_t2") + assert f._exit_hook == ["hook_f"] assert t1._exit_hook == ["hook_f", "hook_t1"] assert f2._exit_hook == ["hook_f", "hook_f2"] assert t2._exit_hook == ["hook_f", "hook_f2", "hook_t2"] From 960da2eb3f6554bfc4e5f7d8dbbdcd9312e775c1 Mon Sep 17 00:00:00 2001 From: Jenny Wong Date: Tue, 2 Jul 2024 08:00:35 +0000 Subject: [PATCH 12/26] Add exit hook arg to suite --- pyflow/nodes.py | 33 +++++++++++++++++++++++++++++++-- tests/test_suite.py | 27 +++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/pyflow/nodes.py b/pyflow/nodes.py index 2f3ebcd..e72e227 100644 --- a/pyflow/nodes.py +++ b/pyflow/nodes.py @@ -912,7 +912,14 @@ def _add_exit_hook(self, hook): class AnchorFamily(AnchorMixin, Family): def __init__( - self, name, json=None, modules=None, purge_modules=False, extern=False, **kwargs + self, + name, + json=None, + modules=None, + purge_modules=False, + extern=False, + exit_hook=None, + **kwargs, ): """ Provides grouping of tasks that require encapsulation. @@ -926,6 +933,7 @@ def __init__( runtime. extern(bool): Whether the anchor family is a shadow node created to satisfy an Extern_, and should not be generated. + exit_hook(str,list): a script containing some commands to be called at exit time. autocancel(Autocancel_): An attribute for automatic removal of the node which has completed. completes(Complete_): An attribute for setting a condition for setting the node as complete depending on other tasks or families. @@ -964,18 +972,20 @@ def __init__( modules=modules, purge_modules=purge_modules, extern=extern, + exit_hook=exit_hook, **kwargs, ) class Suite(AnchorMixin, Node): - def __init__(self, name, host=None, *args, **kwargs): + def __init__(self, name, host=None, exit_hook=None, *args, **kwargs): """ Represents a collection of interrelated **ecFlow** tasks. Parameters: name(str): Name of the suite to create. host(Host_): The host to execute the suite on. If `None`, default **ecFlow** behaviour will be used. + exit_hook(str,list): a script containing some commands to be called at exit time. json(dict): Parsed JSON for creation of the children node(s). workdir(str): The working directory for the tasks, can be fixed or an **ecFlow** variable. modules(tuple): The list of modules to load. @@ -1027,7 +1037,10 @@ def __init__(self, name, host=None, *args, **kwargs): host = EcflowDefaultHost() + self._exit_hook = [] super().__init__(name, host=host, *args, **kwargs) + if exit_hook is not None: + self._add_exit_hook(exit_hook) def ecflow_object(self): """ @@ -1119,6 +1132,22 @@ def deploy_suite(self, target=FileSystem, **options): target.deploy_headers() return target + def _add_single_node(self, node): + if isinstance(node, (Family, Task)): + node._add_exit_hook(self._exit_hook) + super()._add_single_node(node) + + def _add_exit_hook(self, hook): + if isinstance(hook, str): + hook = [hook] + for hk in hook: + if hk not in self._exit_hook: + self._exit_hook.append(hk) + # Check if properly initialised + if "_nodes" in self.__dict__: + for chld in self.executable_children: + chld._add_exit_hook(hook) + class Task(Node): SHELLVAR = re.compile("\\$\\{?([A-Z_][A-Z0-9_]*)") diff --git a/tests/test_suite.py b/tests/test_suite.py index a10a45a..03724f1 100644 --- a/tests/test_suite.py +++ b/tests/test_suite.py @@ -174,6 +174,33 @@ def test_find_node(): s.find_node("/sss/f4") +def test_exit_hook(): + """ + Propagate exit hook to children + """ + + with Family("f2", exit_hook="hook_f2") as f2: + t2 = Task("t2", exit_hook="hook_t2") + with Family("f3", exit_hook="hook_f3") as f3: + t3 = Task("t3", exit_hook="hook_t3") + t4 = Task("t4") + + t5 = Task("t5") + + with Suite("S", exit_hook="hook_s", families=f2, tasks=t5): + with Family("f", exit_hook="hook_f") as f: + t1 = Task("t1", exit_hook="hook_t1") + + assert f._exit_hook == ["hook_s", "hook_f"] + assert t1._exit_hook == ["hook_s", "hook_f", "hook_t1"] + assert f2._exit_hook == ["hook_f2", "hook_s"] + assert t2._exit_hook == ["hook_f2", "hook_t2", "hook_s"] + assert f3._exit_hook == ["hook_f2", "hook_f3", "hook_s"] + assert t3._exit_hook == ["hook_f2", "hook_f3", "hook_t3", "hook_s"] + assert t4._exit_hook == ["hook_f2", "hook_f3", "hook_s"] + assert t5._exit_hook == ["hook_s"] + + @pytest.mark.parametrize("child", [Task, Family]) def test_generate_error(child): with Suite("s") as s: From ce0c70513cd966c193d7f1a0dff0824cff59c907 Mon Sep 17 00:00:00 2001 From: Corentin Carton de Wiart Date: Fri, 12 Jul 2024 10:02:41 +0100 Subject: [PATCH 13/26] added test for troika host --- pyflow/configurator.py | 1 - pyflow/host.py | 14 +++- tests/test_host.py | 154 +++++++++++++++++++++++------------------ 3 files changed, 99 insertions(+), 70 deletions(-) diff --git a/pyflow/configurator.py b/pyflow/configurator.py index f357c96..0be43d2 100644 --- a/pyflow/configurator.py +++ b/pyflow/configurator.py @@ -15,7 +15,6 @@ ********************************************************************************************** """ - import inspect import os from collections import namedtuple diff --git a/pyflow/host.py b/pyflow/host.py index 3e5ee2b..f34136a 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -957,7 +957,9 @@ class TroikaHost(Host): def __init__(self, name, user, **kwargs): self.troika_exec = kwargs.pop("troika_exec", "troika") self.troika_config = kwargs.pop("troika_config", "") - self.troika_version = tuple(kwargs.pop("troika_version", "0.2.1").split(".")) + self.troika_version = tuple( + map(int, kwargs.pop("troika_version", "0.2.1").split(".")) + ) super().__init__(name, user=user, **kwargs) def troika_command(self, command): @@ -965,7 +967,11 @@ def troika_command(self, command): [ f"%TROIKA:{self.troika_exec}%", "-vv", - f"-c %TROIKA_CONFIG:{self.troika_config}%" if self.troika_config else "", + ( + f"-c %TROIKA_CONFIG:{self.troika_config}%" + if self.troika_config + else "" + ), f"{command}", f"-u {self.user}", ] @@ -1071,7 +1077,9 @@ def _translate_sthost(val): args.append(pragma) else: if arg in deprecated: - print(f"WARNING! '{arg}' is deprecated, use '{deprecated[arg]}' instead") + print( + f"WARNING! '{arg}' is deprecated, use '{deprecated[arg]}' instead" + ) arg = deprecated[arg] if arg is not None: args.append("#TROIKA {}={}".format(arg, val)) diff --git a/tests/test_host.py b/tests/test_host.py index 04ad143..7a52819 100644 --- a/tests/test_host.py +++ b/tests/test_host.py @@ -1,5 +1,4 @@ import pytest -import unittest import pyflow @@ -228,71 +227,94 @@ def test_host_kill_cmd(class_name): assert host.kill_cmd == expected_kill_cmd -# class TestTroikaHost(unittest.TestCase): -# def setUp(self): -# self.troika_host = pyflow.TroikaHost( -# name='test_host', -# user='test_user', -# hostname='test_hostname', -# scratch_directory='/scratch', -# log_directory='/log', -# resources_directory='/resources', -# limit=10, -# extra_paths=['/usr/bin', '/bin'], -# extra_variables={'VAR1': 'value1'}, -# environment_variables={'ENV1': 'env_value1'}, -# module_source='module.sh', -# modules=['python', 'gcc'], -# purge_models=True, -# label_host=False, -# ecflow_path='/ecflow', -# server_ecfvars=False -# ) - -# def test_initialization(self): -# self.assertEqual(self.troika_host.name, 'test_host') -# self.assertEqual(self.troika_host.user, 'test_user') -# self.assertEqual(self.troika_host.hostname, 'test_hostname') -# self.assertEqual(self.troika_host.scratch_directory, '/scratch/test_user') -# self.assertEqual(self.troika_host.log_directory, '/log/test_user') -# self.assertEqual(self.troika_host.limit, 10) -# self.assertListEqual(self.troika_host.extra_paths, ['/usr/bin', '/bin']) -# self.assertDictEqual(self.troika_host.extra_variables, {'VAR1': 'value1'}) -# self.assertDictEqual(self.troika_host.environment_variables, {'ENV1': 'env_value1'}) -# self.assertEqual(self.troika_host.module_source, 'module.sh') -# self.assertListEqual(self.troika_host.modules, ['python', 'gcc']) -# self.assertTrue(self.troika_host.purge_models) -# self.assertFalse(self.troika_host.label_host) -# self.assertEqual(self.troika_host.ecflow_path, '/ecflow') -# self.assertFalse(self.troika_host.server_ecfvars) - -# def test_troika_command(self): -# command = self.troika_host.troika_command('test_command') -# expected_command = '%TROIKA:troika% -vv -c %TROIKA_CONFIG:% -u test_user test_command -u test_user' -# self.assertEqual(command, expected_command) - -# def test_job_command_property(self): -# self.assertIn('submit', self.troika_host.job_cmd) -# self.assertIn('test_hostname', self.troika_host.job_cmd) - -# def test_kill_command_property(self): -# self.assertIn('kill', self.troika_host.kill_cmd) -# self.assertIn('test_hostname', self.troika_host.kill_cmd) - -# def test_status_command_property(self): -# self.assertIn('monitor', self.troika_host.status_cmd) -# self.assertIn('test_hostname', self.troika_host.status_cmd) - -# def test_check_command_property(self): -# self.assertIn('check', self.troika_host.check_cmd) -# self.assertIn('test_hostname', self.troika_host.check_cmd) - -# def test_script_submit_arguments(self): -# submit_args = {'tasks': 2, 'sthost': 'test', 'tmpdir': '500', 'hint': 'nomultithread'} -# results = self.troika_host.script_submit_arguments(submit_args) -# self.assertIn('export STHOST=test', results) -# self.assertIn('--gres=ssdtmp:500', results) -# self.assertIn('--hint=nomultithread', results) +def check_pragma(script, pragmas): + for prag in pragmas: + print(prag) + check = False + for line in script[0]: + if prag in line: + check = True + assert check + + +def test_troika_host(): + host1 = pyflow.TroikaHost( + name="test_host", + user="test_user", + ) + host2 = pyflow.TroikaHost( + name="test_host", user="test_user", troika_version="2.2.2" + ) + + submit_args = { + "tasks": 2, # deprecated option, will be translated to total_tasks + "gpus": 1, + "sthost": "/foo/bar", + "distribution": "test", # generates TROIKA pragma for recent version of troika, SBATCH for older versions + } + + with pyflow.Suite("s", host=host1) as s: + with pyflow.Family("f"): + t1 = pyflow.Task("t1", script='echo "boom"', submit_arguments=submit_args) + t2 = pyflow.Task( + "t2", host=host2, script='echo "boom"', submit_arguments=submit_args + ) + + s.check_definition() + + assert ( + s.ECF_JOB_CMD.value + == "%TROIKA:troika% -vv submit -u test_user -o %ECF_JOBOUT% test_host %ECF_JOB%" + ) + assert ( + s.ECF_KILL_CMD.value + == "%TROIKA:troika% -vv kill -u test_user test_host %ECF_JOB%" + ) + + t1_script = t1.generate_script() + t2_script = t2.generate_script() + print(t1_script) + print(t2_script) + + in_script = [ + "#TROIKA total_tasks=2", + "#TROIKA gpus=1", + "#TROIKA export_vars=STHOST=/foo/bar", + "#SBATCH --distribution=test", + ] + check_pragma(t1_script, in_script) + + # check for new versions of troika + in_script = [ + "#TROIKA total_tasks=2", + "#TROIKA gpus=1", + "#TROIKA export_vars=STHOST=/foo/bar", + "#TROIKA distribution=test", + ] + check_pragma(t2_script, in_script) + + +def test_troika_host_options(): + + host = pyflow.TroikaHost( + name="test_host", + user="test_user", + troika_exec="/path/to/troika", + troika_config="/path/to/troika.cfg", + troika_version="2.1.3", + ) + + s = pyflow.Suite("s", host=host) + + assert ( + s.ECF_JOB_CMD.value + == "%TROIKA:/path/to/troika% -vv -c %TROIKA_CONFIG:/path/to/troika.cfg% submit -u test_user -o %ECF_JOBOUT% test_host %ECF_JOB%" # noqa: E501 + ) + assert ( + s.ECF_KILL_CMD.value + == "%TROIKA:/path/to/troika% -vv -c %TROIKA_CONFIG:/path/to/troika.cfg% kill -u test_user test_host %ECF_JOB%" # noqa: E501 + ) + assert s.host.troika_version == (2, 1, 3) if __name__ == "__main__": From ee8e9581835e53e6af4828b6b1ed7c23182c3879 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Thu, 18 Jul 2024 14:11:54 +0000 Subject: [PATCH 14/26] adding support for ecflow generated variables --- pyflow/attributes.py | 22 +++++++++++++++++ pyflow/nodes.py | 53 ++++++++++++++++++++++++++++++++++++++-- tests/test_attributes.py | 10 ++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) diff --git a/pyflow/attributes.py b/pyflow/attributes.py index 4cf8d3d..ac254cc 100644 --- a/pyflow/attributes.py +++ b/pyflow/attributes.py @@ -295,6 +295,28 @@ def _build(self, ecflow_parent): ecflow_parent.add_variable(str(self.name), str(self.value)) +class GeneratedVariable(Exportable): + """ + An attribute for referencing an **ecFlow** generated variable. + The variable value will be generated automatically by ecFlow. + + Parameters: + name(str): The name of the variable. + + Example:: + + GeneratedVariable('FOO') + """ + + def __init__(self, name): + if not is_variable(name): + raise ValueError("'{}' is not a valid variable name".format(name)) + super().__init__(name) + + def _build(self, *args, **kwargs): + return + + class Edit: """ An attribute for setting multiple **ecFlow** variables. diff --git a/pyflow/nodes.py b/pyflow/nodes.py index e72e227..6631dc0 100644 --- a/pyflow/nodes.py +++ b/pyflow/nodes.py @@ -19,6 +19,7 @@ Event, Exportable, Follow, + GeneratedVariable, InLimit, Label, Limit, @@ -148,6 +149,7 @@ def __init__( triggers(Trigger_): An attribute for setting a condition for running the node depending on other tasks or families. variables(Variable_): An attribute for setting an **ecFlow** variable. + generated_variables(GeneratedVariable_): An attribute for setting an **ecFlow** generated variable. zombies(Zombies_): An attribute that defines how a zombie should be handled in an automated fashion. events(Event_): An attribute for declaring an action that a task can trigger while it is running. **kwargs(str): Accept extra keyword arguments as variables to be set on the node. @@ -787,6 +789,8 @@ def generate_stub(self, scripts): class Family(Node): + family_gen_vars = ["FAMILY", "FAMILY1"] + def __init__( self, name, @@ -833,6 +837,7 @@ def __init__( triggers(Trigger_): An attribute for setting a condition for running the node depending on other tasks or families. variables(Variable_): An attribute for setting an **ecFlow** variable. + generated_variables(GeneratedVariable_): An attribute for setting an **ecFlow** generated variable. zombies(Zombies_): An attribute that defines how a zombie should be handled in an automated fashion. events(Event_): An attribute for declaring an action that a task can trigger while it is running. **kwargs(str): Accept extra keyword arguments as variables to be set on the family. @@ -844,12 +849,16 @@ def __init__( """ self._exit_hook = [] + + generated_variables = kwargs.pop("generated_variables", []) + generated_variables += self.family_gen_vars super().__init__( name, json=json, modules=modules, purge_modules=purge_modules, extern=extern, + generated_variables=generated_variables, **kwargs, ) if exit_hook is not None: @@ -978,6 +987,22 @@ def __init__( class Suite(AnchorMixin, Node): + suite_gen_vars = [ + "DATE", + "DAY", + "DD", + "DOW", + "DOY", + "ECF_CLOCK", + "ECF_DATE", + "ECF_JULIAN", + "ECF_TIME", + "ECF_MM", + "ECF_MONTH", + "TIME", + "YYYY", + ] + def __init__(self, name, host=None, exit_hook=None, *args, **kwargs): """ Represents a collection of interrelated **ecFlow** tasks. @@ -1015,6 +1040,7 @@ def __init__(self, name, host=None, exit_hook=None, *args, **kwargs): triggers(Trigger_): An attribute for setting a condition for running the node depending on other tasks or families. variables(Variable_): An attribute for setting an **ecFlow** variable. + generated_variables(GeneratedVariable_): An attribute for setting an **ecFlow** generated variable. zombies(Zombies_): An attribute that defines how a zombie should be handled in an automated fashion. events(Event_): An attribute for declaring an action that a task can trigger while it is running. **kwargs(str): Accept extra keyword arguments as variables to be set on the suite. @@ -1038,7 +1064,14 @@ def __init__(self, name, host=None, exit_hook=None, *args, **kwargs): host = EcflowDefaultHost() self._exit_hook = [] - super().__init__(name, host=host, *args, **kwargs) + + generated_variables = kwargs.pop("generated_variables", []) + generated_variables += self.suite_gen_vars + + super().__init__( + name, host=host, generated_variables=generated_variables, *args, **kwargs + ) + if exit_hook is not None: self._add_exit_hook(exit_hook) @@ -1151,6 +1184,16 @@ def _add_exit_hook(self, hook): class Task(Node): SHELLVAR = re.compile("\\$\\{?([A-Z_][A-Z0-9_]*)") + task_gen_vars = [ + "ECF_JOB", + "ECF_JOBOUT", + "ECF_NAME", + "ECF_PASS", + "ECF_RID", + "ECF_SCRIPT", + "ECF_TRYNO", + "TASK", + ] def __init__( self, @@ -1199,6 +1242,7 @@ def __init__( triggers(Trigger_): An attribute for setting a condition for running the node depending on other tasks or families. variables(Variable_): An attribute for setting an **ecFlow** variable. + generated_variables(GeneratedVariable_): An attribute for setting an **ecFlow** generated variable. zombies(Zombies_): An attribute that defines how a zombie should be handled in an automated fashion. events(Event_): An attribute for declaring an action that a task can trigger while it is running. **kwargs(str): Accept extra keyword arguments as variables to be set on the task. @@ -1213,7 +1257,11 @@ def __init__( self._clean_workdir = clean_workdir self._submit_arguments = submit_arguments or {} self._exit_hook = [] - super().__init__(name, **kwargs) + + generated_variables = kwargs.pop("generated_variables", []) + generated_variables += self.task_gen_vars + + super().__init__(name, generated_variables=generated_variables, **kwargs) # Setting this here ensures that exit hooks inherited from parents are # ordered first if exit_hook is not None: @@ -1464,6 +1512,7 @@ def generate_script(self): ("today", Today), ("triggers", Trigger), ("variables", Variable), + ("generated_variables", GeneratedVariable), ("zombies", Zombies), ("events", Event), ] diff --git a/tests/test_attributes.py b/tests/test_attributes.py index f34c1bb..9329e1d 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -531,6 +531,16 @@ def test_repeat_datetime(self): s.check_definition() +def test_generated_variables(): + with pyflow.Suite("s") as s: + with pyflow.Family("f") as f: + t = pyflow.Task("t") + + assert [var in s.all_exportables for var in s.suite_gen_vars] + assert [var in f.all_exportables for var in f.family_gen_vars] + assert [var in t.all_exportables for var in t.task_gen_vars] + + if __name__ == "__main__": from os import path From 0794d2f83d9b8e29c3424dfefe381ef617cd5e22 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Fri, 19 Jul 2024 09:24:36 +0000 Subject: [PATCH 15/26] minor bugfix in troika host --- pyflow/host.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyflow/host.py b/pyflow/host.py index 8374ba8..9e9338f 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -1057,7 +1057,7 @@ def _translate_hint(val): return "hint", val def _translate_sthost(val): - return "export", f"STHOST={val}" + return "export_vars", f"STHOST={val}" special = { "hint": _translate_hint, From 017d4d991a46a7c81551c5a8ca5f57cb0ce862a0 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Fri, 19 Jul 2024 09:29:11 +0000 Subject: [PATCH 16/26] add restriction to ecflow version --- docs/environment.yml | 2 +- environment.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/environment.yml b/docs/environment.yml index 5af3880..8b75fd8 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -6,7 +6,7 @@ dependencies: - python - pip - jupyterlab - - ecflow + - ecflow >= 5.13 - jinja2 - requests - pytest diff --git a/environment.yml b/environment.yml index 947231b..d4a587c 100644 --- a/environment.yml +++ b/environment.yml @@ -6,7 +6,7 @@ dependencies: - python - pip - jupyterlab - - ecflow + - ecflow >= 5.13 - jinja2 - requests - pytest From 4f7c9f6f5a3ba90e35934cb71b77cf43cae47bf3 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Fri, 19 Jul 2024 10:29:21 +0000 Subject: [PATCH 17/26] fix tests --- tests/test_expressions.py | 5 ----- tests/test_families.py | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/test_expressions.py b/tests/test_expressions.py index 008dddc..7b1d218 100644 --- a/tests/test_expressions.py +++ b/tests/test_expressions.py @@ -33,11 +33,6 @@ def test_all_complete(): pyflow.all_complete(s.executable_children) ) - assert ( - "(((/s/f1/f1f1 eq complete) and (/s/f1/f1f2 eq complete)) and (/s/f1/t5 eq complete))" - == str(pyflow.all_complete(f.children)) - ) - def test_trigger_chains(): """ diff --git a/tests/test_families.py b/tests/test_families.py index b8163ea..a50de04 100644 --- a/tests/test_families.py +++ b/tests/test_families.py @@ -35,7 +35,7 @@ def test_children(): children = f.children names = set([c.name for c in children]) - assert names == {"limit1", "VARIABLE1", "f2", "t4", "t5"} + assert names == {"limit1", "VARIABLE1", "f2", "t4", "t5", "FAMILY", "FAMILY1"} def test_executable_children(): From 168d8046a26d12fd4e5e67e93cfb6687e6b7c57b Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Fri, 19 Jul 2024 10:41:01 +0000 Subject: [PATCH 18/26] fix formatting --- tests/test_host.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_host.py b/tests/test_host.py index 7a52819..26c83c6 100644 --- a/tests/test_host.py +++ b/tests/test_host.py @@ -295,7 +295,6 @@ def test_troika_host(): def test_troika_host_options(): - host = pyflow.TroikaHost( name="test_host", user="test_user", From 2febabef654fde2d1ac5b18f4fa981564ddfb5f3 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Fri, 19 Jul 2024 12:49:49 +0000 Subject: [PATCH 19/26] minor addition to tests --- tests/test_attributes.py | 3 ++- tests/test_expressions.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_attributes.py b/tests/test_attributes.py index 9329e1d..bb845dd 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -533,11 +533,12 @@ def test_repeat_datetime(self): def test_generated_variables(): with pyflow.Suite("s") as s: - with pyflow.Family("f") as f: + with pyflow.Family("f", generated_variables=["MYVAR"]) as f: t = pyflow.Task("t") assert [var in s.all_exportables for var in s.suite_gen_vars] assert [var in f.all_exportables for var in f.family_gen_vars] + assert "MYVAR" in f.all_exportables assert [var in t.all_exportables for var in t.task_gen_vars] diff --git a/tests/test_expressions.py b/tests/test_expressions.py index 7b1d218..03bbf59 100644 --- a/tests/test_expressions.py +++ b/tests/test_expressions.py @@ -16,7 +16,7 @@ def test_triggers_and(): def test_all_complete(): with pyflow.Suite("s") as s: - with pyflow.Family("f1") as f: + with pyflow.Family("f1"): with pyflow.Family("f1f1"): pyflow.Task("t1") pyflow.Task("t2") From c4de0d21733072fa832226ab3b43dc5e81afafb7 Mon Sep 17 00:00:00 2001 From: corentincarton Date: Fri, 19 Jul 2024 16:31:08 +0100 Subject: [PATCH 20/26] Update version.py --- pyflow/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyflow/version.py b/pyflow/version.py index 88c513e..903a158 100644 --- a/pyflow/version.py +++ b/pyflow/version.py @@ -1 +1 @@ -__version__ = "3.3.0" +__version__ = "3.4.0" From bf42920fb76bf54238e799b3fdcf2fb6ab39afa3 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Tue, 23 Jul 2024 13:14:52 +0000 Subject: [PATCH 21/26] update setup --- pyflow/version.py | 1 - pyproject.toml | 81 +++++++++++++++++++++++++++++++++++++++++------ setup.cfg | 30 ------------------ 3 files changed, 71 insertions(+), 41 deletions(-) delete mode 100644 pyflow/version.py delete mode 100644 setup.cfg diff --git a/pyflow/version.py b/pyflow/version.py deleted file mode 100644 index 903a158..0000000 --- a/pyflow/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "3.4.0" diff --git a/pyproject.toml b/pyproject.toml index 9afbe58..4e97dc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,74 @@ + [build-system] -# NOTE: `pip install build` to build with `python -m build` -requires = [ - "setuptools >= 40.9.0", - "wheel" -] +requires = ["setuptools>=65", "setuptools_scm[toml]>=6.2", "wheel"] build-backend = "setuptools.build_meta" -[tool.pytest.ini_options] -addopts = "" -doctest_optionflags = "" -testpaths = "tests" -markers = [] +[project] +name = "pyflow-workflow-generator" +requires-python = ">=3.8" +authors = [ + {name = "European Centre for Medium-Range Weather Forecasts (ECMWF)", email = "software.support@ecmwf.int"}, +] +maintainers = [ + {name = "Corentin Carton de Wiart", email = "corentin.carton@ecmwf.int"}, +] +description = "Create pythonic ecFlow suites" +license = {file = "LICENSE"} +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: Apache Software License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.10", + "Topic :: Scientific/Engineering", +] +dynamic = ["version", "readme"] + +dependencies = [ + "jinja2", + "requests", +] + +[project.urls] + repository = "https://github.com/ecmwf/pyflow" + documentation = "https://pyflow-workflow-generator.readthedocs.io" + issues = "https://github.com/ecmwf/pyflow/issues" + +[project.optional-dependencies] + test = [ + "pytest", + ] + diagrams = [ + "graphviz", + ] + +# Code inspection +[tool.black] +line-length = 79 + +[tool.isort] +profile="black" + +# Packaging/setuptools options +[tool.setuptools] +include-package-data = true + +[tool.setuptools.dynamic] +readme = {file = ["README.md"], content-type = "text/markdown"} + +[tool.setuptools.packages.find] +where = ["."] +exclude = ["tests"] + +[tool.setuptools_scm] +write_to = "pyflow/_version.py" +write_to_template = ''' +# Do not change! Do not track in version control! +__version__ = "{version}" +''' +parentdir_prefix_version='pyflow-' # get version from GitHub-like tarballs +fallback_version='3.3.0' \ No newline at end of file diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 2e69b53..0000000 --- a/setup.cfg +++ /dev/null @@ -1,30 +0,0 @@ -[metadata] -name = pyflow-workflow-generator -version = attr: pyflow.version.__version__ -author = European Centre for Medium-Range Weather Forecasts (ECMWF) -author_email = software.support@ecmwf.int -license = Apache 2.0 -license_files = LICENSE -description = Create pythonic ecFlow suites -long_description = file: README.md -long_description_content_type=text/markdown -url = https://pyflow-workflow-generator.readthedocs.io/en/ - -[options] -packages = find: -include_package_data = True -install_requires = - jinja2 - requests - -[options.packages.find] -include = pyflow* - -[options.extras_require] -diagrams = - graphviz -tests = - pytest -all = - %(diagrams)s - %(tests)s From e30b22b2f5e089176a1003d8ba61783343ff2dd4 Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Tue, 23 Jul 2024 13:20:52 +0000 Subject: [PATCH 22/26] fix formatting --- pyflow/attributes.py | 15 +++++++++++++-- pyflow/configurator.py | 3 ++- pyflow/cron.py | 4 +++- pyflow/deployment.py | 4 +++- pyflow/host.py | 10 ++++++++-- pyflow/html.py | 4 +++- pyflow/nodes.py | 9 +++++++-- pyflow/resource.py | 3 ++- pyproject.toml | 4 ---- tests/test_attributes.py | 12 ++++++++++-- tests/test_families.py | 10 +++++++++- tests/test_host.py | 40 ++++++++++++++++++++++++++++++++-------- tests/test_script.py | 14 +++++++++++--- tests/test_task.py | 9 ++++++++- utils/deploy-tool.py | 12 +++++++++--- 15 files changed, 120 insertions(+), 33 deletions(-) diff --git a/pyflow/attributes.py b/pyflow/attributes.py index ac254cc..e64abe4 100644 --- a/pyflow/attributes.py +++ b/pyflow/attributes.py @@ -561,7 +561,15 @@ def day_of_week(self): for dow, day in enumerate( - ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday") + ( + "monday", + "tuesday", + "wednesday", + "thursday", + "friday", + "saturday", + "sunday", + ) ): setattr(RepeatDate, day, property(lambda self: Eq(self.day_of_week, dow))) @@ -744,7 +752,10 @@ def make_variable(node, name, value): if len(value) == 3: if isinstance(value[2], int): return RepeatDate( - name, as_date(value[0]), as_date(value[1]), value[2] + name, + as_date(value[0]), + as_date(value[1]), + value[2], ) else: return RepeatDate(name, as_date(value[0]), as_date(value[1]), 1) diff --git a/pyflow/configurator.py b/pyflow/configurator.py index 0be43d2..f0c4cf7 100644 --- a/pyflow/configurator.py +++ b/pyflow/configurator.py @@ -202,7 +202,8 @@ def choices(cls): Return an iterable of (named) tuples of the name, help string and options """ Configurable = namedtuple( - "Configurable", ["name", "help", "choices", "multichoice", "default"] + "Configurable", + ["name", "help", "choices", "multichoice", "default"], ) for attr_name, configuration_class in cls.configurations(): choices = configuration_class.available_choices() diff --git a/pyflow/cron.py b/pyflow/cron.py index 00265b5..72d4db4 100644 --- a/pyflow/cron.py +++ b/pyflow/cron.py @@ -118,7 +118,9 @@ def __init__(self, cron): if minute is None and hour is None: ts = ecflow.TimeSeries( - ecflow.TimeSlot(0, 0), ecflow.TimeSlot(23, 59), ecflow.TimeSlot(0, 1) + ecflow.TimeSlot(0, 0), + ecflow.TimeSlot(23, 59), + ecflow.TimeSlot(0, 1), ) else: if minute is None: diff --git a/pyflow/deployment.py b/pyflow/deployment.py index eb1809b..c8aa70f 100644 --- a/pyflow/deployment.py +++ b/pyflow/deployment.py @@ -56,7 +56,9 @@ def deploy_uniqueness_check(self, source, target): with open(target, "r") as f: old_content = f.read() diff = difflib.unified_diff( - old_content.splitlines(), source.splitlines(), lineterm="" + old_content.splitlines(), + source.splitlines(), + lineterm="", ) diff_str = "\n".join(diff) print( diff --git a/pyflow/host.py b/pyflow/host.py index 9e9338f..18141a7 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -520,7 +520,9 @@ def copy_file_to(self, source_file, target_file): mkdir -p "{}" cp "{}" "{}" """.format( - os.path.dirname(os.path.abspath(target_file)), source_file, target_file + os.path.dirname(os.path.abspath(target_file)), + source_file, + target_file, ) ) @@ -666,7 +668,11 @@ def run_simple_command(self, cmd): if self.indirect_host is not None: return "ssh -o StrictHostKeyChecking=no {}@{} ssh -o StrictHostKeyChecking=no {}@{} {}".format( - self.indirect_user, self.indirect_host, self.user, self.hostname, cmd + self.indirect_user, + self.indirect_host, + self.user, + self.hostname, + cmd, ) else: return "ssh -o StrictHostKeyChecking=no {}@{} {}".format( diff --git a/pyflow/html.py b/pyflow/html.py index 0fde1d1..8b1abd5 100644 --- a/pyflow/html.py +++ b/pyflow/html.py @@ -23,7 +23,9 @@ def definition_to_html(d): ) n = re.sub( - r" (\%) ", r' \1 ', n + r" (\%) ", + r' \1 ', + n, ) n = re.sub( diff --git a/pyflow/nodes.py b/pyflow/nodes.py index 6631dc0..352e0a5 100644 --- a/pyflow/nodes.py +++ b/pyflow/nodes.py @@ -1069,7 +1069,11 @@ def __init__(self, name, host=None, exit_hook=None, *args, **kwargs): generated_variables += self.suite_gen_vars super().__init__( - name, host=host, generated_variables=generated_variables, *args, **kwargs + name, + host=host, + generated_variables=generated_variables, + *args, + **kwargs, ) if exit_hook is not None: @@ -1341,7 +1345,8 @@ def deploy_path(self): """ try: return "{}{}".format( - os.path.join(self.anchor.files_path, self.name), self.deploy_extension + os.path.join(self.anchor.files_path, self.name), + self.deploy_extension, ) except ValueError: return None diff --git a/pyflow/resource.py b/pyflow/resource.py index 6d4d01b..7844aac 100644 --- a/pyflow/resource.py +++ b/pyflow/resource.py @@ -48,7 +48,8 @@ def __init__(self, name, hosts): self._hosts = hosts if isinstance(hosts, collections.abc.Iterable) else [hosts] self._resource_directory = os.path.join( - self._hosts[0].resources_directory, os.path.dirname(self.fullname[1:]) + self._hosts[0].resources_directory, + os.path.dirname(self.fullname[1:]), ) for h in self._hosts: diff --git a/pyproject.toml b/pyproject.toml index 4e97dc4..440f8ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,10 +46,6 @@ dependencies = [ "graphviz", ] -# Code inspection -[tool.black] -line-length = 79 - [tool.isort] profile="black" diff --git a/tests/test_attributes.py b/tests/test_attributes.py index bb845dd..bdccaeb 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -427,7 +427,10 @@ def test_combined_date_repeats(self): ) with pyflow.Task("t2") as t2: pyflow.RepeatDate( - "DATE_REPEAT", datetime(2018, 1, 1), datetime(2019, 12, 31), 2 + "DATE_REPEAT", + datetime(2018, 1, 1), + datetime(2019, 12, 31), + 2, ) t2.triggers = t1.DATE_REPEAT == t2.DATE_REPEAT t2.triggers |= t1.DATE_REPEAT + 4 <= t2.DATE_REPEAT @@ -503,7 +506,12 @@ def test_repeat_datetime(self): input_tests = ( ("REPEAT_DATETIME", start, end), ("REPEAT_DATETIME", start, end, increment), - ("REPEAT_DATETIME", "20200101T120000", "20201231T120000", "12:00:00"), + ( + "REPEAT_DATETIME", + "20200101T120000", + "20201231T120000", + "12:00:00", + ), ("REPEAT_DATETIME", "20200101T13", "20201231T1400", "13:00"), ("REPEAT_DATETIME", "20200102", "20201231T080102", "12:00:00"), ("REPEAT_DATETIME", "20200102", end, "18:10:20"), diff --git a/tests/test_families.py b/tests/test_families.py index a50de04..7944c60 100644 --- a/tests/test_families.py +++ b/tests/test_families.py @@ -35,7 +35,15 @@ def test_children(): children = f.children names = set([c.name for c in children]) - assert names == {"limit1", "VARIABLE1", "f2", "t4", "t5", "FAMILY", "FAMILY1"} + assert names == { + "limit1", + "VARIABLE1", + "f2", + "t4", + "t5", + "FAMILY", + "FAMILY1", + } def test_executable_children(): diff --git a/tests/test_host.py b/tests/test_host.py index 26c83c6..41c7574 100644 --- a/tests/test_host.py +++ b/tests/test_host.py @@ -5,7 +5,10 @@ def test_host_task(): host1 = pyflow.SSHHost( - "a-host", user="a-user", scratch_directory="/tmp", ecflow_path="/usr/local/bin" + "a-host", + user="a-user", + scratch_directory="/tmp", + ecflow_path="/usr/local/bin", ) host2 = pyflow.LocalHost(scratch_directory="/tmp2", ecflow_path="/usr/local/bin") @@ -19,11 +22,17 @@ def test_host_task(): t1 = pyflow.Task("t1") t2 = pyflow.Task( - "t2", host=host1, script='echo "boom"', workdir=host1.scratch_directory + "t2", + host=host1, + script='echo "boom"', + workdir=host1.scratch_directory, ) t3 = pyflow.Task( - "t3", host=host2, script='echo "boom"', workdir=host2.scratch_directory + "t3", + host=host2, + script='echo "boom"', + workdir=host2.scratch_directory, ) with pyflow.Family("f2", host=host1) as f2: @@ -32,11 +41,17 @@ def test_host_task(): ) t5 = pyflow.Task( - "t5", host=host1, script='echo "boom"', workdir=host1.scratch_directory + "t5", + host=host1, + script='echo "boom"', + workdir=host1.scratch_directory, ) t6 = pyflow.Task( - "t6", host=host2, script='echo "boom"', workdir=host2.scratch_directory + "t6", + host=host2, + script='echo "boom"', + workdir=host2.scratch_directory, ) with pyflow.Family("f3", host=host2) as f3: @@ -45,7 +60,10 @@ def test_host_task(): ) t8 = pyflow.Task( - "t8", host=host1, script='echo "boom"', workdir=host1.scratch_directory + "t8", + host=host1, + script='echo "boom"', + workdir=host1.scratch_directory, ) t9 = pyflow.Task( @@ -208,7 +226,10 @@ def test_host_kill_cmd(class_name): HostClass = getattr(pyflow, class_name) host = HostClass( - hostname, user=username, scratch_directory="/tmp", log_directory="/var/log" + hostname, + user=username, + scratch_directory="/tmp", + log_directory="/var/log", ) if class_name == "LocalHost" or class_name == "SSHHost": @@ -257,7 +278,10 @@ def test_troika_host(): with pyflow.Family("f"): t1 = pyflow.Task("t1", script='echo "boom"', submit_arguments=submit_args) t2 = pyflow.Task( - "t2", host=host2, script='echo "boom"', submit_arguments=submit_args + "t2", + host=host2, + script='echo "boom"', + submit_arguments=submit_args, ) s.check_definition() diff --git a/tests/test_script.py b/tests/test_script.py index 7907d1c..68909cc 100644 --- a/tests/test_script.py +++ b/tests/test_script.py @@ -31,7 +31,10 @@ def test_script_lists(): t1 = pyflow.Task("t1") t2 = pyflow.Task( "t2", - script=["echo 'bit1'", pyflow.Script(["echo 'bit2'", "echo 'bit3'"])], + script=[ + "echo 'bit1'", + pyflow.Script(["echo 'bit2'", "echo 'bit3'"]), + ], ) t1.script = ["echo 'bit1'", pyflow.Script(["echo 'bit2'", "echo 'bit3'"])] @@ -182,7 +185,9 @@ def test_script_exportables(): with pyflow.Family("f", VARIABLE2=4321) as f: subscript = pyflow.Script("uninteresting2") with pyflow.Task( - "t", script=['echo "uninteresting script', subscript], VARIABLE3=9999 + "t", + script=['echo "uninteresting script', subscript], + VARIABLE3=9999, ) as t: v4 = pyflow.Variable("VARIABLE4", 8888) @@ -282,7 +287,10 @@ def test_variable_detection_script(): check = True assert check - not_in_script = ['export S_FOO="%S_FOO%"', 'export T_FOO_T_BAR="%T_FOO_T_BAR%"'] + not_in_script = [ + 'export S_FOO="%S_FOO%"', + 'export T_FOO_T_BAR="%T_FOO_T_BAR%"', + ] for var in not_in_script: check = False for line in full_script[0]: diff --git a/tests/test_task.py b/tests/test_task.py index 7239bd8..a9e81dc 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -261,7 +261,14 @@ def test_disable_ecflow_keywords(): script, includes = t.generate_script() idx = script.index("%nopp") - assert script[idx : idx + 6] == ["%nopp", "", "Multiline", "script", "", "%end"] + assert script[idx : idx + 6] == [ + "%nopp", + "", + "Multiline", + "script", + "", + "%end", + ] def test_modules(): diff --git a/utils/deploy-tool.py b/utils/deploy-tool.py index 4cf1f26..73f9816 100644 --- a/utils/deploy-tool.py +++ b/utils/deploy-tool.py @@ -18,7 +18,9 @@ default=os.environ.get("ECF_HOST", "localhost"), ) parser.add_argument( - "--port", help="ecflow server port", default=os.environ.get("ECF_PORT", 5459) + "--port", + help="ecflow server port", + default=os.environ.get("ECF_PORT", 5459), ) parser.add_argument( "--family", help="the subset of the tree to deploy", default="/" @@ -29,10 +31,14 @@ default=getpass.getuser(), ) parser.add_argument( - "--deploy-host", help="the subset of the tree to deploy", default="localhost" + "--deploy-host", + help="the subset of the tree to deploy", + default="localhost", ) parser.add_argument( - "--deploy-files", help="sync the files with the remote", action="store_true" + "--deploy-files", + help="sync the files with the remote", + action="store_true", ) parser.add_argument( "--play-suite", help="play the suite to ecflow", action="store_true" From 7d8a9ceec7c3dab51205be72823248d93e5a513c Mon Sep 17 00:00:00 2001 From: Corentin Carton De Wiart Date: Tue, 23 Jul 2024 13:23:33 +0000 Subject: [PATCH 23/26] update version --- pyflow/__init__.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pyflow/__init__.py b/pyflow/__init__.py index cddb6a3..0d44705 100644 --- a/pyflow/__init__.py +++ b/pyflow/__init__.py @@ -47,4 +47,11 @@ from .nodes import AnchorFamily, Family, Suite, Task, ecflow_name from .resource import DataResource, FileResource, Resources, WebResource from .script import FileScript, PythonScript, Script, TemplateFileScript, TemplateScript -from .version import __version__ + +try: + # NOTE: the `_version.py` file must not be present in the git repository + # as it is generated by setuptools at install time + from ._version import __version__ +except ImportError: # pragma: no cover + # Local copy or not installed with setuptools + __version__ = "" From 70cec8a81cc9e36c47c887b9ab82d881f3444616 Mon Sep 17 00:00:00 2001 From: Michel Wortmann Date: Fri, 26 Jul 2024 13:54:12 +0000 Subject: [PATCH 24/26] Enable node-level partial deployment of suite. --- pyflow/nodes.py | 7 ++++--- tests/test_deployment.py | 18 ++++++++++++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/pyflow/nodes.py b/pyflow/nodes.py index 352e0a5..48e968e 100644 --- a/pyflow/nodes.py +++ b/pyflow/nodes.py @@ -1138,7 +1138,7 @@ def find_node(self, subpath): return super().find_node(subpath) - def deploy_suite(self, target=FileSystem, **options): + def deploy_suite(self, target=FileSystem, node=None, **options): """ Deploys suite and its components. @@ -1154,14 +1154,15 @@ def deploy_suite(self, target=FileSystem, **options): assert not self._extern, "Attempting to deploy extern node not permitted" target = target(self, **options) - for t in self.all_tasks: + node = node if node is not None else self + for t in node.all_tasks: script, includes = t.generate_script() try: target.deploy_task(t.deploy_path, script, includes) except RuntimeError: print(f"\nERROR when deploying task: {t.fullname}\n") raise - for f in self.all_families: + for f in node.all_families: manual = self.generate_stub(f.manual) if manual: target.deploy_manual(f.manual_path, manual) diff --git a/tests/test_deployment.py b/tests/test_deployment.py index 4100162..2a14931 100644 --- a/tests/test_deployment.py +++ b/tests/test_deployment.py @@ -11,11 +11,11 @@ def test_deploy_filesystem(tmpdir, ECF_FILES): files = path.join(str(tmpdir), ECF_FILES) with pyflow.Suite("s", ECF_HOME=str(tmpdir), ECF_FILES=files) as s: with pyflow.Family("f1"): - with pyflow.Task("t") as t: - t.script = "echo foo" + with pyflow.Task("t") as t1: + t1.script = "echo foo" with pyflow.Family("f2"): - with pyflow.Task("t2") as t: - t.script = "echo bar" + with pyflow.Task("t2") as t2: + t2.script = "echo bar" s.deploy_suite() s.deploy_suite(target=pyflow.Notebook) @@ -28,6 +28,16 @@ def test_deploy_filesystem(tmpdir, ECF_FILES): with open(f2) as f: assert "echo bar" in f.read() + # partial deployment + t1.script = "echo new" + t2.script = "echo new" + s.deploy_suite(node=s.f2) + with open(f1) as f: + # shouldn't not be updated + assert "echo foo" in f.read() + with open(f2) as f: + assert "echo new" in f.read() + def test_move_node(tmpdir): d = str(tmpdir) From 00fddab4fd5fbec31a2928752908207bfbc9d490 Mon Sep 17 00:00:00 2001 From: Michel Wortmann Date: Wed, 14 Aug 2024 09:11:15 +0000 Subject: [PATCH 25/26] Change node argument to deploy_suite to str path rather than Node object. --- pyflow/nodes.py | 4 +++- tests/test_deployment.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pyflow/nodes.py b/pyflow/nodes.py index 48e968e..25ddf7e 100644 --- a/pyflow/nodes.py +++ b/pyflow/nodes.py @@ -1144,6 +1144,7 @@ def deploy_suite(self, target=FileSystem, node=None, **options): Parameters: target(Deployment): Deployment target for the suite. + node(str): Path to node to limit deployment to a family/task. **options(dict): Accept extra keyword arguments as deployment options. Returns: @@ -1154,7 +1155,8 @@ def deploy_suite(self, target=FileSystem, node=None, **options): assert not self._extern, "Attempting to deploy extern node not permitted" target = target(self, **options) - node = node if node is not None else self + node = self.find_node(node) if node is not None else self + for t in node.all_tasks: script, includes = t.generate_script() try: diff --git a/tests/test_deployment.py b/tests/test_deployment.py index 2a14931..a03e172 100644 --- a/tests/test_deployment.py +++ b/tests/test_deployment.py @@ -31,7 +31,7 @@ def test_deploy_filesystem(tmpdir, ECF_FILES): # partial deployment t1.script = "echo new" t2.script = "echo new" - s.deploy_suite(node=s.f2) + s.deploy_suite(node="f2") with open(f1) as f: # shouldn't not be updated assert "echo foo" in f.read() From c34200dea5496e54f268ad623bddaf01c4c45c20 Mon Sep 17 00:00:00 2001 From: corentincarton Date: Mon, 14 Oct 2024 13:31:46 +0100 Subject: [PATCH 26/26] Update host.py to remove tasks_per_node from deprecated options --- pyflow/host.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyflow/host.py b/pyflow/host.py index 18141a7..256a68a 100644 --- a/pyflow/host.py +++ b/pyflow/host.py @@ -1033,7 +1033,6 @@ def script_submit_arguments(self, submit_arguments): "tasks": "total_tasks", "nodes": "total_nodes", "threads_per_task": "cpus_per_task", - "tasks_per_node": "tasks_per_node", "hyperthreads": "threads_per_core", "memory_per_task": "memory_per_cpu", "accounting": "billing_account",