Skip to content

Commit

Permalink
move things around + sem
Browse files Browse the repository at this point in the history
  • Loading branch information
Atticus1806 committed Nov 14, 2024
1 parent 3e3468e commit 155076a
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions sisyphus/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def __init__(
self.ui = ui
self.interactive = interative
self.interactive_always_skip = set()
self.update_out_sem = threading.Semaphore()

self.stop_if_done = True
self._stop_loop = False
Expand Down Expand Up @@ -436,12 +437,13 @@ def f(job):
self.thread_pool.map(f, self.jobs.get(gs.STATE_RUNNABLE, []))

def check_output(self, write_output=False, update_all_outputs=False, force_update=False):
targets = self.sis_graph.targets if update_all_outputs else self.sis_graph.active_targets
for target in targets:
target.update_requirements(write_output=write_output, force=force_update)
if target.is_done():
target.run_when_done(write_output=write_output)
self.sis_graph.remove_from_active_targets(target)
with self.update_out_sem:
targets = self.sis_graph.targets if update_all_outputs else self.sis_graph.active_targets
for target in targets:
target.update_requirements(write_output=write_output, force=force_update)
if target.is_done():
target.run_when_done(write_output=write_output)
self.sis_graph.remove_from_active_targets(target)

def continue_manager_loop(self):
# Stop loop flag is set
Expand Down Expand Up @@ -482,7 +484,6 @@ def startup(self):
config_manager.continue_readers()

self.job_engine.reset_cache()
self.check_output(write_output=False, update_all_outputs=True)
self.update_jobs()

# Ensure at least one async reader head the chance to continue until he added his jobs to the list
Expand Down Expand Up @@ -593,7 +594,6 @@ def run(self):
if self.mem_profile:
self.mem_profile.snapshot()
self.job_engine.reset_cache()
self.check_output(write_output=self.link_outputs)

config_manager.continue_readers()
self.update_jobs()
Expand Down Expand Up @@ -626,6 +626,8 @@ def run(self):
for job in self.jobs.get(gs.STATE_ERROR, []):
gs.on_job_failure(job)

self.check_output(write_output=self.link_outputs)

# Stop config reader
config_manager.cancel_all_reader()

Expand Down

0 comments on commit 155076a

Please sign in to comment.