Skip to content

Commit

Permalink
Merge pull request #916 from Open-EO/wait_till_path_available_2024-01-23
Browse files Browse the repository at this point in the history
Wait till path available
  • Loading branch information
EmileSonneveld authored Oct 23, 2024
2 parents 38815a3 + 594a80e commit 4ccfa0d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 19 deletions.
12 changes: 7 additions & 5 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,13 @@ def run_job(
ml_model_metadata = result.get_model_metadata(str(output_file))
logger.info("Extracted ml model metadata from %s" % output_file)
for name, asset in the_assets_metadata.items():
# TODO: test in separate branch
# if not asset.get("href").lower().startswith("s3:/"):
# # fusemount could have some delay to make files accessible, so poll a bit:
# asset_path = get_abs_path_of_asset(asset["href"], job_dir)
# wait_till_path_available(asset_path)
href = asset["href"]
url = urlparse(href)
if url.scheme in ["", "file"]:
file_path = url.path
# fusemount could have some delay to make files accessible, so poll a bit:
asset_path = get_abs_path_of_asset(file_path, job_dir)
wait_till_path_available(asset_path)
add_permissions(Path(asset["href"]), stat.S_IWGRP)
logger.info(f"wrote {len(the_assets_metadata)} assets to {output_file}")
assets_metadata.append(the_assets_metadata)
Expand Down
6 changes: 3 additions & 3 deletions openeogeotrellis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,13 +773,13 @@ def to_jsonable(x):

def wait_till_path_available(path: Path):
retry = 0
max_tries = 5
max_tries = 20 # Almost 2 minutes
while not os.path.exists(path):
if retry < max_tries:
retry += 1
seconds = int(math.pow(2, retry + 2)) # exponential backoff
seconds = 5
logger.info(f"Waiting for path to be available. Try {retry}/{max_tries} (sleep:{seconds}seconds): {path}")
time.sleep(seconds)
else:
logger.warning(f"Path is not available after {max_tries} tries: {path}")
return # TODO: Throw error instead
return # TODO: Throw error instead?
45 changes: 39 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import os
from typing import Union

import sys
from pathlib import Path
from datetime import datetime
from unittest import mock

import boto3
import contextlib
import flask
import pytest
import moto
import moto.server
import os
import pytest
import sys
import time_machine
import typing
import requests_mock
from _pytest.terminal import TerminalReporter
from pathlib import Path

from openeo_driver.backend import OpenEoBackendImplementation, UserDefinedProcesses
from openeo_driver.jobregistry import ElasticJobRegistry, JobRegistryInterface
Expand Down Expand Up @@ -179,6 +182,36 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0):
def api_version(request):
return request.param

# TODO: Deduplicate code with openeo-python-client
class _Sleeper:
def __init__(self):
self.history = []

@contextlib.contextmanager
def patch(self, time_machine: time_machine.TimeMachineFixture) -> typing.Iterator["_Sleeper"]:
def sleep(seconds):
# Note: this requires that `time_machine.move_to()` has been called before
# also see https://github.com/adamchainz/time-machine/issues/247
time_machine.coordinates.shift(seconds)
self.history.append(seconds)

with mock.patch("time.sleep", new=sleep):
yield self

def did_sleep(self) -> bool:
return len(self.history) > 0


@pytest.fixture
def fast_sleep(time_machine) -> typing.Iterator[_Sleeper]:
"""
Fixture using `time_machine` to make `sleep` instant and update the current time.
"""
now = datetime.now().isoformat()
time_machine.move_to(now)
with _Sleeper().patch(time_machine=time_machine) as sleeper:
yield sleeper


@pytest.fixture
def udf_noop():
Expand Down
8 changes: 3 additions & 5 deletions tests/deploy/test_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def test_run_job(evaluate, tmp_path):


@mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate")
def test_run_job_get_projection_extension_metadata(evaluate, tmp_path):
def test_run_job_get_projection_extension_metadata(evaluate, tmp_path, fast_sleep):
cube_mock = MagicMock()

job_dir = tmp_path / "job-402"
Expand Down Expand Up @@ -517,9 +517,7 @@ def test_run_job_get_projection_extension_metadata(evaluate, tmp_path):


@mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate")
def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox(
evaluate, tmp_path
):
def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox(evaluate, tmp_path, fast_sleep):
"""When there are two raster assets with the same projection metadata, it should put
those metadata at the level of the item instead of the individual bands.
"""
Expand Down Expand Up @@ -959,7 +957,7 @@ def test_run_job_get_projection_extension_metadata_assets_with_different_epsg(


@mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate")
def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate):
def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate, fast_sleep):
cube_mock = MagicMock()
# job dir should be a relative path,
# We still want the test data to be cleaned up though, so we need to use
Expand Down

0 comments on commit 4ccfa0d

Please sign in to comment.