Skip to content

Commit

Permalink
Merge pull request #196 from rwth-i6/too-many-open-files-fix-2
Browse files Browse the repository at this point in the history
Catch `subprocess.TimeoutExpired` once

Revert most of the code added in `system_call` so that the `subprocess.TimeoutExpired` exception is raised to the caller, which will catch it and consider the queuing process as failed.
  • Loading branch information
Icemole authored Jul 4, 2024
2 parents 58694e6 + bc4b1e7 commit ce5f7a2
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 26 deletions.
7 changes: 2 additions & 5 deletions sisyphus/aws_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,8 @@ def system_call(self, command, send_to_stdin=None):
logging.debug("shell_cmd: %s" % " ".join(system_command))
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
try:
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
except subprocess.TimeoutExpired:
logging.warning("Timeout expired for command: %s" % " ".join(system_command))
return [], ["TimeoutExpired".encode()], -1

p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)

def fix_output(o):
"""
Expand Down
13 changes: 5 additions & 8 deletions sisyphus/load_sharing_facility_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,8 @@ def system_call(self, command, send_to_stdin=None):
logging.debug("shell_cmd: %s" % " ".join(system_command))
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
try:
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
except subprocess.TimeoutExpired:
logging.warning("Timeout expired for command: %s" % " ".join(system_command))
return [], ["TimeoutExpired".encode()], -1

p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)

def fix_output(o):
# split output and drop last empty line
Expand Down Expand Up @@ -187,12 +184,12 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, rangestring):
)

while True:
logging.info("bsub_call: %s" % bsub_call)
logging.info("command: %s" % command)
try:
logging.info("bsub_call: %s" % bsub_call)
logging.info("command: %s" % command)
out, err, retval = self.system_call(bsub_call, command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
logging.warning(self._system_call_timeout_warn_msg(bsub_call))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down
10 changes: 3 additions & 7 deletions sisyphus/simple_linux_utility_for_resource_management_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,8 @@ def system_call(self, command, send_to_stdin=None):
logging.debug("shell_cmd: %s" % " ".join(system_command))
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
try:
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
except subprocess.TimeoutExpired:
logging.warning("Timeout expired for command: %s" % " ".join(system_command))
return [], ["TimeoutExpired".encode()], -1

p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)

def fix_output(o):
"""
Expand Down Expand Up @@ -234,13 +231,12 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id,
sbatch_call += self.options(rqmt)

sbatch_call += ["-a", "%i-%i:%i" % (start_id, end_id, step_size)]
command = '"' + " ".join(call) + '"'
sbatch_call += ["--wrap=%s" % " ".join(call)]
while True:
try:
out, err, retval = self.system_call(sbatch_call)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
logging.warning(self._system_call_timeout_warn_msg(sbatch_call))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down
9 changes: 3 additions & 6 deletions sisyphus/son_of_grid_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ def system_call(self, command, send_to_stdin=None):
logging.debug("shell_cmd: %s" % " ".join(system_command))
if send_to_stdin:
send_to_stdin = send_to_stdin.encode()
try:
p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)
except subprocess.TimeoutExpired:
logging.warning("Timeout expired for command: %s" % " ".join(system_command))
return [], ["TimeoutExpired".encode()], -1

p = subprocess.run(system_command, input=send_to_stdin, capture_output=True, timeout=30)

def fix_output(o):
"""
Expand Down Expand Up @@ -255,7 +252,7 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id,
try:
out, err, retval = self.system_call(qsub_call, command)
except subprocess.TimeoutExpired:
logging.warning(self._system_call_timeout_warn_msg(command))
logging.warning(self._system_call_timeout_warn_msg(qsub_call))
time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT)
continue
break
Expand Down

0 comments on commit ce5f7a2

Please sign in to comment.