Skip to content

Commit

Permalink
Task.run wait for inputs (#162)
Browse files Browse the repository at this point in the history
New options:

WAIT_PERIOD_FOR_INPUTS_AVAILABLE
TASK_INPUTS_MUST_BE_AVAILABLE

Fix #159
  • Loading branch information
albertz authored Dec 15, 2023
1 parent 7795128 commit d78596a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
5 changes: 5 additions & 0 deletions sisyphus/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ def file_caching(path):
WAIT_PERIOD_JOB_CLEANUP = 10
#: How many seconds should all inputs be available before starting a job to avoid file system synchronization problems
WAIT_PERIOD_MTIME_OF_INPUTS = 60
#: How long to wait for all inputs to be available in Task.run (https://github.com/rwth-i6/sisyphus/issues/159)
WAIT_PERIOD_FOR_INPUTS_AVAILABLE = 60

#: Fail when not all inputs are available after WAIT_PERIOD_FOR_INPUTS_AVAILABLE
TASK_INPUTS_MUST_BE_AVAILABLE = True

#: set true to automatically clean jobs in error state and retry
CLEAR_ERROR = False
Expand Down
14 changes: 13 additions & 1 deletion sisyphus/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import os
import logging
import sys
Expand Down Expand Up @@ -137,13 +138,24 @@ def run(self, task_id, resume_job=False, logging_thread=None):
else:
logging.info("%s (Variable: %s, %s)" % (i.get_path(), str(i), type(i.get())))

if gs.WAIT_PERIOD_FOR_INPUTS_AVAILABLE:
for _ in range(math.ceil(gs.WAIT_PERIOD_FOR_INPUTS_AVAILABLE)):
if os.path.exists(i.get_path()):
break
logging.warning("Input path does not exist, waiting: %s" % i.get_path())
time.sleep(1)

# each input must be at least X seconds old
# if an input file is too young it's may not synced in a network filesystem yet
try:
input_age = time.time() - os.stat(i.get_path()).st_mtime
time.sleep(max(0, gs.WAIT_PERIOD_MTIME_OF_INPUTS - input_age))
except FileNotFoundError:
logging.warning("Input path does not exist: %s" % i.get_path())
(logging.error if gs.TASK_INPUTS_MUST_BE_AVAILABLE else logging.warning)(
"Input path does not exist: %s" % i.get_path()
)
if gs.TASK_INPUTS_MUST_BE_AVAILABLE:
raise

tools.get_system_informations(sys.stdout)
sys.stdout.flush()
Expand Down

0 comments on commit d78596a

Please sign in to comment.