From d78596a6c33e9128fe8a0b79b10bd8dece62b22f Mon Sep 17 00:00:00 2001 From: Albert Zeyer Date: Fri, 15 Dec 2023 15:56:02 +0100 Subject: [PATCH] Task.run wait for inputs (#162) New options: WAIT_PERIOD_FOR_INPUTS_AVAILABLE TASK_INPUTS_MUST_BE_AVAILABLE Fix #159 --- sisyphus/global_settings.py | 5 +++++ sisyphus/task.py | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sisyphus/global_settings.py b/sisyphus/global_settings.py index 3c3cd43..36431d9 100644 --- a/sisyphus/global_settings.py +++ b/sisyphus/global_settings.py @@ -195,6 +195,11 @@ def file_caching(path): WAIT_PERIOD_JOB_CLEANUP = 10 #: How many seconds should all inputs be available before starting a job to avoid file system synchronization problems WAIT_PERIOD_MTIME_OF_INPUTS = 60 +#: How long to wait for all inputs to be available in Task.run (https://github.com/rwth-i6/sisyphus/issues/159) +WAIT_PERIOD_FOR_INPUTS_AVAILABLE = 60 + +#: Fail when not all inputs are available after WAIT_PERIOD_FOR_INPUTS_AVAILABLE +TASK_INPUTS_MUST_BE_AVAILABLE = True #: set true to automatically clean jobs in error state and retry CLEAR_ERROR = False diff --git a/sisyphus/task.py b/sisyphus/task.py index 85b2bce..e874689 100644 --- a/sisyphus/task.py +++ b/sisyphus/task.py @@ -1,3 +1,4 @@ +import math import os import logging import sys @@ -137,13 +138,24 @@ def run(self, task_id, resume_job=False, logging_thread=None): else: logging.info("%s (Variable: %s, %s)" % (i.get_path(), str(i), type(i.get()))) + if gs.WAIT_PERIOD_FOR_INPUTS_AVAILABLE: + for _ in range(math.ceil(gs.WAIT_PERIOD_FOR_INPUTS_AVAILABLE)): + if os.path.exists(i.get_path()): + break + logging.warning("Input path does not exist, waiting: %s" % i.get_path()) + time.sleep(1) + # each input must be at least X seconds old # if an input file is too young it's may not synced in a network filesystem yet try: input_age = time.time() - os.stat(i.get_path()).st_mtime time.sleep(max(0, gs.WAIT_PERIOD_MTIME_OF_INPUTS - input_age)) except FileNotFoundError: - logging.warning("Input path does not exist: %s" % i.get_path()) + (logging.error if gs.TASK_INPUTS_MUST_BE_AVAILABLE else logging.warning)( + "Input path does not exist: %s" % i.get_path() + ) + if gs.TASK_INPUTS_MUST_BE_AVAILABLE: + raise tools.get_system_informations(sys.stdout) sys.stdout.flush()