diff --git a/sisyphus/localengine.py b/sisyphus/localengine.py index 706579f..fddd08a 100644 --- a/sisyphus/localengine.py +++ b/sisyphus/localengine.py @@ -157,20 +157,26 @@ def enough_free_resources(self, rqmt): return False return True - def reserve_resources(self, rqmt): + def reserve_resources(self, rqmt, selected_devices=None): self.free_resources = {key: free - rqmt.get(key, 0) for key, free in self.free_resources.items()} for key, max_available in self.max_resources.items(): free = self.free_resources[key] assert 0 <= free <= max_available + # reserve specific GPUs - selected_devices = [] - for name, free in self.available_gpus.items(): - if len(selected_devices) == rqmt.get("gpu", 0): - return ",".join(selected_devices) - if free: - self.available_gpus[name] = False - selected_devices.append(name) - assert len(selected_devices) == rqmt.get("gpu", 0) + if selected_devices is not None: + if selected_devices != "": + for name in selected_devices.split(","): + self.available_gpus[name] = False + else: + selected_devices = [] + for name, free in self.available_gpus.items(): + if len(selected_devices) == rqmt.get("gpu", 0): + break + if free: + self.available_gpus[name] = False + selected_devices.append(name) + assert len(selected_devices) == rqmt.get("gpu", 0) return ",".join(selected_devices) def release_resources(self, rqmt, selected_devices): @@ -301,23 +307,19 @@ def try_to_recover_task(self, task, task_id): rqmt = d['requested_resources'] logpath = os.path.relpath(task.path(gs.JOB_LOG_ENGINE)) call_with_id = task.get_worker_call(task_id) - call_with_id += ['--redirect_output'] name = task.task_name() task_name = task.name() task_instance = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id) - if call_with_id != process.cmdline()[1:]: + if call_with_id[1:] != process.cmdline()[1:]: logging.debug('Job changed, ignore this job: %i %s %s' % (pid, process.cmdline(), task_instance.call)) return False - if os.path.abspath(os.getcwd()) != process.cwd(): - logging.debut('Job changed, ignore this job: %i %s %s' % (pid, os.getcwd(), process.cwd())) - return False - with self.running_tasks as running_tasks: name = (task_instance.name, task_id) - running_tasks[name] = (process, task_instance, "") - self.reserve_resources(rqmt) + used_gpus = process.environ().get("CUDA_VISIBLE_DEVICES", "") + running_tasks[name] = (process, task_instance, used_gpus) + self.reserve_resources(rqmt, selected_devices=used_gpus) logging.debug('Loaded job: %i %s %s' % (pid, process.cmdline(), task_instance.call)) return True