Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 4, 2024
1 parent 19e0a3a commit 7599842
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions src/anemoi/registry/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@


class Worker:
def __init__(self, action, destination, timeout=None, wait=60, stop_if_finished=True, target_dir=".", request={}):
def __init__(
self,
action,
destination,
timeout=None,
wait=60,
stop_if_finished=True,
target_dir=".",
auto_register=True,
request={},
):
"""Run a worker that will process tasks in the queue.
timeout: Kill itself after `timeout` seconds.
wait: When no task is found, wait `wait` seconds before checking again.
Expand All @@ -37,6 +47,7 @@ def __init__(self, action, destination, timeout=None, wait=60, stop_if_finished=

self.wait = wait
self.stop_if_finished = stop_if_finished
self.auto_register = auto_register
if timeout:
signal.alarm(timeout)

Expand Down Expand Up @@ -79,10 +90,12 @@ def process_one_task(self):
def process_entry(self, entry):
destination, source, dataset = self.parse_entry(entry)

dataset_entry = DatasetCatalogueEntry(key=dataset)

LOG.info(f"Transferring {dataset} from {source} to {destination}")

def get_source_path():
e = DatasetCatalogueEntry(key=dataset).record
e = dataset_entry.record
if "locations" not in e:
raise ValueError(f"Dataset {dataset} has no locations")
locations = e["locations"]
Expand Down Expand Up @@ -124,6 +137,9 @@ def get_source_path():
download(source_path, target_tmp_path, resume=True)
os.rename(target_tmp_path, target_path)

if self.auto_register:
dataset_entry.add_location(self, platform=destination, path=target_path)

@classmethod
def parse_entry(cls, entry):
data = entry.record.copy()
Expand Down

0 comments on commit 7599842

Please sign in to comment.