Skip to content

Commit

Permalink
Merge pull request #945 from Open-EO/log_lock
Browse files Browse the repository at this point in the history
Log lock
  • Loading branch information
EmileSonneveld authored Nov 25, 2024
2 parents 88ab283 + 4f137a3 commit e7c85eb
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 13 deletions.
52 changes: 39 additions & 13 deletions openeogeotrellis/integrations/gdal.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,33 @@ def to_dict(self) -> dict:
return result


def exec_parallel_with_fallback(callback, argument_tuples):
def error_handler(e):
poorly_log(f"Error while calling '{callback.__name__}', may be incomplete. {str(e)}", level=logging.WARNING)

pool_size = min(10, max(1, int(len(argument_tuples) // 3)))

if pool_size == 1:
# no need for error-prone multiprocessing here (Typical for NetCDF output)
results = [callback(*arg_tuple) for arg_tuple in argument_tuples]
else:
pool = multiprocessing.Pool(pool_size)
jobs = [pool.apply_async(callback, arg_tuple, error_callback=error_handler) for arg_tuple in argument_tuples]
pool.close()
try:
results = [job.get(timeout=60) for job in jobs]
pool.join()
except multiprocessing.TimeoutError:
pool.terminate()
pool.join()
poorly_log(
"Multiprocessing had timeout. This could be due to a deadlock. Retrying without threading.",
level=logging.WARNING,
)
results = [callback(*arg_tuple) for arg_tuple in argument_tuples]
return results


def _extract_gdal_asset_raster_metadata(
asset_metadata: Dict[str, Any],
job_dir: Path,
Expand All @@ -124,17 +151,17 @@ def _extract_gdal_asset_raster_metadata(
# TODO would be better if we could return just Dict[str, AssetRasterMetadata]
# or even CollectionRasterMetadata with CollectionRasterMetadata = Dict[str, AssetRasterMetadata]


def error_handler(e):
poorly_log(f"Error while looking up result metadata, may be incomplete. {str(e)}", level=logging.WARNING)

pool_size = min(10,max(1,int(len(asset_metadata)//3)))

pool = multiprocessing.Pool(pool_size)
job = [pool.apply_async(_get_metadata_callback, (asset_path, asset_md,job_dir,), error_callback=error_handler) for asset_path, asset_md in asset_metadata.items()]
pool.close()
pool.join()

# Ideally gdalinfo would be called on the moment the asset is created.
# Then it could profit from Sparks parallel processing.
argument_tuples = [
(
asset_path,
asset_md,
job_dir,
)
for asset_path, asset_md in asset_metadata.items()
]
results = exec_parallel_with_fallback(_get_metadata_callback, argument_tuples)

# Add the projection extension metadata.
# When the projection metadata is the same for all assets, then set it at
Expand All @@ -147,9 +174,8 @@ def error_handler(e):
# metadata at the item level.
is_some_raster_md_missing = False

for j in job:
for result in results:
try:
result = j.get()
if result is not None:
raster_metadata[result[0]] = result[1]
else:
Expand Down
64 changes: 64 additions & 0 deletions tests/deploy/test_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from shapely.geometry import box, mapping, shape

from openeogeotrellis._version import __version__
from openeogeotrellis.backend import JOB_METADATA_FILENAME
from openeogeotrellis.config import get_backend_config
from openeogeotrellis.config.constants import UDF_DEPENDENCIES_INSTALL_MODE
from openeogeotrellis.deploy.batch_job import (
Expand Down Expand Up @@ -329,6 +330,69 @@ def test_extract_result_metadata_aggregate_spatial_delayed_vector_when_bbox_crs_
}


def start_log_locker():
from threading import Thread
from queue import Queue
from logging.handlers import QueueListener, QueueHandler

# Logs get written to a queue, and then a thread reads
# from that queue and writes messages to a file:
_log_queue = Queue()
QueueListener(_log_queue, logging.FileHandler("out.log")).start()
logging.getLogger().addHandler(QueueHandler(_log_queue))

is_active = True

def stop_log_locker():
nonlocal is_active
is_active = False

def write_logs():
while is_active:
logging.warning("attempting to create deadlock")

Thread(target=write_logs).start()
return stop_log_locker


@pytest.mark.timeout(130)
def test_log_lock(tmp_path):
process_graph = {
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "TestCollection-LonLat4x4",
"temporal_extent": ["2023-06-01", "2023-08-01"],
"spatial_extent": {"west": 0.0, "south": 0.0, "east": 1.0, "north": 2.0},
"bands": ["Longitude", "Latitude"],
},
},
"saveresult1": {
"process_id": "save_result",
"arguments": {
"data": {"from_node": "loadcollection1"},
"format": "GTiff",
},
"result": True,
},
}
process = {
"process_graph": process_graph,
}
stop_log_locker = start_log_locker()
try:
run_job(
process,
output_file=tmp_path / "out",
metadata_file=tmp_path / JOB_METADATA_FILENAME,
api_version="2.0.0",
job_dir=tmp_path,
dependencies=[],
)
finally:
stop_log_locker()


@mock.patch('openeo_driver.ProcessGraphDeserializer.evaluate')
def test_run_job(evaluate, tmp_path, fast_sleep):
cube_mock = MagicMock()
Expand Down

0 comments on commit e7c85eb

Please sign in to comment.