From 656e63c59f92b621a44f2328f9d29e45a2789651 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sat, 10 Feb 2024 12:56:47 +0100 Subject: [PATCH 1/8] job startup hook, to update env vars Fix #178 --- sisyphus/global_constants.py | 1 + sisyphus/job.py | 15 ++++++++++++++- sisyphus/worker.py | 7 +++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sisyphus/global_constants.py b/sisyphus/global_constants.py index d7ae69f..10c915a 100644 --- a/sisyphus/global_constants.py +++ b/sisyphus/global_constants.py @@ -10,6 +10,7 @@ JOB_FINISHED_MARKER = "finished" JOB_FINISHED_ARCHIVE = "finished.tar.gz" JOB_INFO = "info" +JOB_STARTUP_HOOK_PY = "startup_hook.py" # engine path ENGINE_LOG = "log" diff --git a/sisyphus/job.py b/sisyphus/job.py index 5901482..91ebdbc 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -65,7 +65,6 @@ def job_finished(path): class JobSingleton(type): - """Meta class to ensure that every Job with the same hash value is only created once""" @@ -225,6 +224,7 @@ def _sis_init(self, args, kwargs, parsed_args): self._sis_environment = tools.EnvironmentModifier() self._sis_environment.keep(gs.DEFAULT_ENVIRONMENT_KEEP) self._sis_environment.set(gs.DEFAULT_ENVIRONMENT_SET) + self._sis_environ_updates = {} if gs.AUTO_SET_JOB_INIT_ATTRIBUTES: self.set_attrs(parsed_args) @@ -286,6 +286,15 @@ def _sis_setup_directory(self, force=False): if not os.path.isdir(link_name): os.symlink(src=os.path.abspath(str(creator._sis_path())), dst=link_name, target_is_directory=True) + if self._sis_environ_updates: + startup_hook_py_file = self._sis_path(gs.JOB_STARTUP_HOOK_PY) + with open(startup_hook_py_file, "w") as f: + f.write("import os\n\n") + f.write("os.environ.update({\n") + for k, v in self._sis_environ_updates.items(): + f.write(f" {k!r}: {v!r},\n") + f.write("})\n\n") + # export the actual job with gzip.open(self._sis_path(gs.JOB_SAVE), "w") as f: pickle.dump(self, f) @@ -1137,6 +1146,10 @@ def update_rqmt(self, task_name, rqmt): self._sis_task_rqmt_overwrite[task_name] = rqmt.copy(), False return self + def putenv(self, key: str, value: str): + """this environment var will be set at job startup""" + self._sis_environ_updates[key] = value + def tasks(self) -> Iterator[Task]: """ :return: yields Task's diff --git a/sisyphus/worker.py b/sisyphus/worker.py index 6fa51e3..6380a61 100644 --- a/sisyphus/worker.py +++ b/sisyphus/worker.py @@ -248,6 +248,13 @@ def worker_helper(args): if hasattr(task._job, "_sis_environment") and task._job._sis_environment: task._job._sis_environment.modify_environment() + startup_hook_py_file = args.jobdir + os.path.sep + gs.JOB_STARTUP_HOOK_PY + if os.path.exists(startup_hook_py_file): + source = open(startup_hook_py_file).read() + co = compile(source, startup_hook_py_file, "exec") + user_ns = {"__file__": startup_hook_py_file, "__name__": startup_hook_py_file, "task": task, "job": job} + eval(co, user_ns, user_ns) + try: # run task task.run(task_id, resume_job, logging_thread=logging_thread) From e111d55c67ec0701ecbc7b15eecf240bccb085af Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 11 Feb 2024 21:52:18 +0100 Subject: [PATCH 2/8] set_env more consistent to other methods --- sisyphus/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sisyphus/job.py b/sisyphus/job.py index 91ebdbc..264dcd0 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -1146,7 +1146,7 @@ def update_rqmt(self, task_name, rqmt): self._sis_task_rqmt_overwrite[task_name] = rqmt.copy(), False return self - def putenv(self, key: str, value: str): + def set_env(self, key: str, value: str): """this environment var will be set at job startup""" self._sis_environ_updates[key] = value From f1e83a46ab197ef69228fcb2007227b008126301 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 11 Feb 2024 21:53:37 +0100 Subject: [PATCH 3/8] update env directly, instead of using generic startup hook code --- sisyphus/global_constants.py | 1 - sisyphus/job.py | 9 --------- sisyphus/worker.py | 11 +++++------ 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/sisyphus/global_constants.py b/sisyphus/global_constants.py index 10c915a..d7ae69f 100644 --- a/sisyphus/global_constants.py +++ b/sisyphus/global_constants.py @@ -10,7 +10,6 @@ JOB_FINISHED_MARKER = "finished" JOB_FINISHED_ARCHIVE = "finished.tar.gz" JOB_INFO = "info" -JOB_STARTUP_HOOK_PY = "startup_hook.py" # engine path ENGINE_LOG = "log" diff --git a/sisyphus/job.py b/sisyphus/job.py index 264dcd0..a35d86d 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -286,15 +286,6 @@ def _sis_setup_directory(self, force=False): if not os.path.isdir(link_name): os.symlink(src=os.path.abspath(str(creator._sis_path())), dst=link_name, target_is_directory=True) - if self._sis_environ_updates: - startup_hook_py_file = self._sis_path(gs.JOB_STARTUP_HOOK_PY) - with open(startup_hook_py_file, "w") as f: - f.write("import os\n\n") - f.write("os.environ.update({\n") - for k, v in self._sis_environ_updates.items(): - f.write(f" {k!r}: {v!r},\n") - f.write("})\n\n") - # export the actual job with gzip.open(self._sis_path(gs.JOB_SAVE), "w") as f: pickle.dump(self, f) diff --git a/sisyphus/worker.py b/sisyphus/worker.py index 6380a61..7d7eefb 100644 --- a/sisyphus/worker.py +++ b/sisyphus/worker.py @@ -248,12 +248,11 @@ def worker_helper(args): if hasattr(task._job, "_sis_environment") and task._job._sis_environment: task._job._sis_environment.modify_environment() - startup_hook_py_file = args.jobdir + os.path.sep + gs.JOB_STARTUP_HOOK_PY - if os.path.exists(startup_hook_py_file): - source = open(startup_hook_py_file).read() - co = compile(source, startup_hook_py_file, "exec") - user_ns = {"__file__": startup_hook_py_file, "__name__": startup_hook_py_file, "task": task, "job": job} - eval(co, user_ns, user_ns) + # Maybe update some env vars. + # Use getattr for compatibility with older serialized jobs. + if getattr(task._job, "_sis_environ_updates", None): + for k, v in task._job._sis_environ_updates.items(): + os.environ[k] = v try: # run task From 29c56f69fd95d0848b464d8a9bcbbefdc771e6a9 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 11 Feb 2024 23:36:52 +0100 Subject: [PATCH 4/8] reuse EnvironmentModifier --- sisyphus/job.py | 11 ++++------- sisyphus/tools.py | 33 ++++++++++++++++++++++----------- sisyphus/worker.py | 8 +------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/sisyphus/job.py b/sisyphus/job.py index a35d86d..ad22eca 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -219,12 +219,9 @@ def _sis_init(self, args, kwargs, parsed_args): self._sis_is_finished = False self._sis_setup_since_restart = False - self._sis_environment = None - if gs.CLEANUP_ENVIRONMENT: - self._sis_environment = tools.EnvironmentModifier() - self._sis_environment.keep(gs.DEFAULT_ENVIRONMENT_KEEP) - self._sis_environment.set(gs.DEFAULT_ENVIRONMENT_SET) - self._sis_environ_updates = {} + self._sis_environment = tools.EnvironmentModifier(cleanup_env=gs.CLEANUP_ENVIRONMENT) + self._sis_environment.keep(gs.DEFAULT_ENVIRONMENT_KEEP) + self._sis_environment.set(gs.DEFAULT_ENVIRONMENT_SET) if gs.AUTO_SET_JOB_INIT_ATTRIBUTES: self.set_attrs(parsed_args) @@ -1139,7 +1136,7 @@ def update_rqmt(self, task_name, rqmt): def set_env(self, key: str, value: str): """this environment var will be set at job startup""" - self._sis_environ_updates[key] = value + self._sis_environment.set_var(key, value) def tasks(self) -> Iterator[Task]: """ diff --git a/sisyphus/tools.py b/sisyphus/tools.py index 31a1e59..07d4bdf 100644 --- a/sisyphus/tools.py +++ b/sisyphus/tools.py @@ -132,7 +132,6 @@ def try_get(v): class execute_in_dir(object): - """Object to be used by the with statement. All code after the with will be executed in the given directory, working directory will be changed back after with statement. @@ -156,7 +155,6 @@ def __exit__(self, type, value, traceback): class cache_result(object): - """decorated to cache the result of a function for x_seconds""" def __init__(self, cache_time=30, force_update=None, clear_cache=None): @@ -481,33 +479,42 @@ class EnvironmentModifier: A class to cleanup the environment before a job starts """ - def __init__(self): + def __init__(self, *, cleanup_env: bool = True): + self.cleanup_env = cleanup_env self.keep_vars = set() self.set_vars = {} def keep(self, var): - if type(var) == str: + if isinstance(var, str): self.keep_vars.add(var) else: self.keep_vars.update(var) def set(self, var, value=None): - if type(var) == dict: + if isinstance(var, dict): self.set_vars.update(var) else: self.set_vars[var] = value + def set_var(self, key: str, value: str, *, allow_env_substitute: bool = False): + if not allow_env_substitute: + # Need to escape $ for string.Template.substitute below. + value = value.replace("$", "$$") + self.set_vars[key] = value + def modify_environment(self): import os import string orig_env = dict(os.environ) - keys = list(os.environ.keys()) - for k in keys: - if k not in self.keep_vars: - del os.environ[k] + if self.cleanup_env: + keys = list(os.environ.keys()) + for k in keys: + if k not in self.keep_vars: + del os.environ[k] + for k, v in self.set_vars.items(): - if type(v) == str: + if isinstance(v, str): os.environ[k] = string.Template(v).substitute(orig_env) else: os.environ[k] = str(v) @@ -516,7 +523,11 @@ def modify_environment(self): logging.debug("environment var %s=%s" % (k, v)) def __repr__(self): - return repr(self.keep_vars) + " " + repr(self.set_vars) + return ( + f"cleanup_env={self.cleanup_env} " + + (f"keep={self.keep_vars!r} " if self.cleanup_env else "") + + f"set={self.set_vars!r}" + ) class FinishedResultsCache: diff --git a/sisyphus/worker.py b/sisyphus/worker.py index 7d7eefb..9e5a6b4 100644 --- a/sisyphus/worker.py +++ b/sisyphus/worker.py @@ -245,15 +245,9 @@ def worker_helper(args): gs.active_engine.init_worker(task) # cleanup environment - if hasattr(task._job, "_sis_environment") and task._job._sis_environment: + if getattr(task._job, "_sis_environment", None): task._job._sis_environment.modify_environment() - # Maybe update some env vars. - # Use getattr for compatibility with older serialized jobs. - if getattr(task._job, "_sis_environ_updates", None): - for k, v in task._job._sis_environ_updates.items(): - os.environ[k] = v - try: # run task task.run(task_id, resume_job, logging_thread=logging_thread) From 7821a667da9115d3482c32fe3f54e33bed44b2a3 Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 11 Feb 2024 23:39:33 +0100 Subject: [PATCH 5/8] use set_vars_verbatim --- sisyphus/tools.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sisyphus/tools.py b/sisyphus/tools.py index 07d4bdf..a4fd52f 100644 --- a/sisyphus/tools.py +++ b/sisyphus/tools.py @@ -483,6 +483,7 @@ def __init__(self, *, cleanup_env: bool = True): self.cleanup_env = cleanup_env self.keep_vars = set() self.set_vars = {} + self.set_vars_verbatim = {} # does not go through string.Template(v).substitute def keep(self, var): if isinstance(var, str): @@ -497,10 +498,7 @@ def set(self, var, value=None): self.set_vars[var] = value def set_var(self, key: str, value: str, *, allow_env_substitute: bool = False): - if not allow_env_substitute: - # Need to escape $ for string.Template.substitute below. - value = value.replace("$", "$$") - self.set_vars[key] = value + (self.set_vars if allow_env_substitute else self.set_vars_verbatim)[key] = value def modify_environment(self): import os @@ -519,6 +517,9 @@ def modify_environment(self): else: os.environ[k] = str(v) + for k, v in self.set_vars_verbatim.items(): + os.environ[k] = v + for k, v in os.environ.items(): logging.debug("environment var %s=%s" % (k, v)) From 23e6e73443d593ed68e21eb19eece27efa006d3d Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Sun, 11 Feb 2024 23:42:11 +0100 Subject: [PATCH 6/8] set_verbatim method instead --- sisyphus/job.py | 2 +- sisyphus/tools.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sisyphus/job.py b/sisyphus/job.py index ad22eca..397d79f 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -1136,7 +1136,7 @@ def update_rqmt(self, task_name, rqmt): def set_env(self, key: str, value: str): """this environment var will be set at job startup""" - self._sis_environment.set_var(key, value) + self._sis_environment.set_verbatim(key, value) def tasks(self) -> Iterator[Task]: """ diff --git a/sisyphus/tools.py b/sisyphus/tools.py index a4fd52f..32f9777 100644 --- a/sisyphus/tools.py +++ b/sisyphus/tools.py @@ -497,8 +497,8 @@ def set(self, var, value=None): else: self.set_vars[var] = value - def set_var(self, key: str, value: str, *, allow_env_substitute: bool = False): - (self.set_vars if allow_env_substitute else self.set_vars_verbatim)[key] = value + def set_verbatim(self, key: str, value: str): + self.set_vars_verbatim[key] = value def modify_environment(self): import os From 79daa7214ba00ab711a2b6e3fb81b8b06d3aadda Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Mon, 12 Feb 2024 22:14:37 +0100 Subject: [PATCH 7/8] fix old behavior for DEFAULT_ENVIRONMENT_SET --- sisyphus/job.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sisyphus/job.py b/sisyphus/job.py index 397d79f..b2b9a15 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -220,8 +220,9 @@ def _sis_init(self, args, kwargs, parsed_args): self._sis_setup_since_restart = False self._sis_environment = tools.EnvironmentModifier(cleanup_env=gs.CLEANUP_ENVIRONMENT) - self._sis_environment.keep(gs.DEFAULT_ENVIRONMENT_KEEP) - self._sis_environment.set(gs.DEFAULT_ENVIRONMENT_SET) + if gs.CLEANUP_ENVIRONMENT: # for compat, only set those below if CLEANUP_ENVIRONMENT is enabled + self._sis_environment.keep(gs.DEFAULT_ENVIRONMENT_KEEP) + self._sis_environment.set(gs.DEFAULT_ENVIRONMENT_SET) if gs.AUTO_SET_JOB_INIT_ATTRIBUTES: self.set_attrs(parsed_args) From 00fb8519d7383ca2c7e928261a7893157db9c78e Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Mon, 12 Feb 2024 22:42:35 +0100 Subject: [PATCH 8/8] set_env verbatim flag --- sisyphus/job.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sisyphus/job.py b/sisyphus/job.py index b2b9a15..a71fc80 100644 --- a/sisyphus/job.py +++ b/sisyphus/job.py @@ -1135,9 +1135,18 @@ def update_rqmt(self, task_name, rqmt): self._sis_task_rqmt_overwrite[task_name] = rqmt.copy(), False return self - def set_env(self, key: str, value: str): - """this environment var will be set at job startup""" - self._sis_environment.set_verbatim(key, value) + def set_env(self, key: str, value: str, *, verbatim: bool = True): + """ + Set environment variable. This environment var will be set at job startup in the worker. + + :param key: variable name + :param value: + :param verbatim: True: set it as-is; False: use string.Template(value).substitute(orig_env) + """ + if verbatim: + self._sis_environment.set_verbatim(key, value) + else: + self._sis_environment.set(key, value) def tasks(self) -> Iterator[Task]: """