Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 5, 2024
1 parent 7a64ced commit 08cbe4d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 17 deletions.
8 changes: 7 additions & 1 deletion src/anemoi/registry/commands/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ def add_arguments(self, command_parser):
command_parser.add_argument("--request", help="Filter tasks to process (key=value list)", nargs="*", default=[])
command_parser.add_argument("--threads", help="Number of threads to use", type=int, default=1)
command_parser.add_argument("--heartbeat", help="Heartbeat interval", type=int, default=60)
command_parser.add_argument("--max-no-heartbeat", help="Max interval without heartbeat", type=int, default=3600)
command_parser.add_argument(
"--max-no-heartbeat",
help="Max interval without heartbeat before considering task needs to be freed.",
type=int,
default=0,
)
command_parser.add_argument("--loop", help="Run in a loop", action="store_true")

def run(self, args):
kwargs = vars(args)
Expand Down
50 changes: 34 additions & 16 deletions src/anemoi/registry/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __init__(
auto_register=True,
threads=1,
heartbeat=60,
max_no_heartbeat=3600,
max_no_heartbeat=0,
loop=False,
request={},
):
"""Run a worker that will process tasks in the queue.
Expand All @@ -54,6 +55,7 @@ def __init__(
self.threads = threads
self.heartbeat = heartbeat
self.max_no_heartbeat = max_no_heartbeat
self.loop = loop

self.wait = wait
self.stop_if_finished = stop_if_finished
Expand All @@ -65,16 +67,18 @@ def __init__(
raise ValueError(f"Target directory {target_dir} must already exist")

def run(self):
if self.loop:
while True:
res = self.process_one_task()

while True:
res = self.process_one_task()

if self.stop_if_finished and res is None:
LOG.info("All tasks have been processed, stopping.")
return
if self.stop_if_finished and res is None:
LOG.info("All tasks have been processed, stopping.")
return

LOG.info(f"Waiting {self.wait} seconds before checking again.")
time.sleep(self.wait)
LOG.info(f"Waiting {self.wait} seconds before checking again.")
time.sleep(self.wait)
else:
self.process_one_task()

def choose_task(self):
request = self.request.copy()
Expand All @@ -86,6 +90,9 @@ def choose_task(self):
return entry

# else if a task is running, check if it has been running for too long, and free it
if self.max_no_heartbeat == 0:
return None

cat = TaskCatalogueEntryList(status="running", **request)
if not cat:
LOG.info("No queued tasks found")
Expand All @@ -110,17 +117,14 @@ def process_one_task(self):
self.parse_entry(entry) # for checking only

entry.take_ownership()
self.process_entry(entry)
self.process_entry_with_heartbeat(entry)
LOG.info(f"Task {uuid} completed.")
entry.unregister()
LOG.info(f"Task {uuid} deleted.")
return True

def process_entry(self, entry):

destination, source, dataset = self.parse_entry(entry)

dataset_entry = DatasetCatalogueEntry(key=dataset)
def process_entry_with_heartbeat(self, entry):
STOP = []

# create another thread to send heartbeat
def send_heartbeat():
Expand All @@ -129,11 +133,25 @@ def send_heartbeat():
entry.set_status("running")
except Exception:
return
time.sleep(self.heartbeat)
for _ in range(self.heartbeat):
time.sleep(1)
if len(STOP) > 0:
STOP.pop()
return

thread = threading.Thread(target=send_heartbeat)
thread.start()

try:
self.process_entry(entry)
finally:
STOP.append(1) # stop the heartbeat thread
thread.join()

def process_entry(self, entry):
destination, source, dataset = self.parse_entry(entry)
dataset_entry = DatasetCatalogueEntry(key=dataset)

LOG.info(f"Transferring {dataset} from '{source}' to '{destination}'")

def get_source_path():
Expand Down

0 comments on commit 08cbe4d

Please sign in to comment.