Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy TaskInfo #608

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions mapchete/cli/progress_bar.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@

from tqdm import tqdm

from mapchete.pretty import pretty_bytes, pretty_seconds
from mapchete.processing.types import TaskInfo
from mapchete.protocols import ObserverProtocol
from mapchete.types import Progress

@@ -29,13 +31,41 @@ def update(
*args,
progress: Optional[Progress] = None,
message: Optional[str] = None,
**kwargs
task_info: Optional[TaskInfo] = None,
**kwargs,
):
if progress:
if self._pbar.total is None or self._pbar.total != progress.total:
self._pbar.reset(progress.total)

self._pbar.update(progress.current - self._pbar.n)

if self.print_messages and message:
tqdm.write(message)
if self.print_messages:
if task_info:
msg = f"task {task_info.id}: {task_info.process_msg}"

if task_info.profiling:
msg += profiling_info(task_info)

tqdm.write(msg)

if message:
tqdm.write(message)


def profiling_info(task_info: TaskInfo) -> str:
profiling_info = []
if task_info.profiling.get("time"):
elapsed = task_info.profiling["time"].elapsed
profiling_info.append(f"time: {pretty_seconds(elapsed)}")
if task_info.profiling.get("memory"): # pragma: no cover
max_allocated = task_info.profiling["memory"].max_allocated
profiling_info.append(f"max memory usage: {pretty_bytes(max_allocated)}")
if task_info.profiling.get("memory"): # pragma: no cover
head_requests = task_info.profiling["requests"].head_count
get_requests = task_info.profiling["requests"].get_count
requests = head_requests + get_requests
transferred = task_info.profiling["requests"].get_bytes
profiling_info.append(f"{requests} GET and HEAD requests")
profiling_info.append(f"{pretty_bytes(transferred)} transferred")
return f" ({', '.join(profiling_info)})"
30 changes: 3 additions & 27 deletions mapchete/commands/_execute.py
Original file line number Diff line number Diff line change
@@ -15,10 +15,8 @@
from mapchete.errors import JobCancelledError
from mapchete.executor import Executor
from mapchete.executor.types import Profiler
from mapchete.pretty import pretty_bytes, pretty_seconds
from mapchete.processing.profilers import preconfigured_profilers
from mapchete.processing.profilers.time import measure_time
from mapchete.processing.tasks import TaskInfo
from mapchete.types import MPathLike, Progress

logger = logging.getLogger(__name__)
@@ -155,6 +153,8 @@ def execute(
dask_client=dask_settings.client,
multiprocessing_start_method=multiprocessing_start_method,
max_workers=workers,
preprocessing_tasks=tasks.preprocessing_tasks_count,
tile_tasks=tasks.tile_tasks_count,
) as executor:
if profiling:
for profiler in preconfigured_profilers:
@@ -179,15 +179,9 @@ def execute(
):
count += 1

msg = f"task {task_info.id}: {task_info.process_msg}"

if task_info.profiling:
msg += profiling_info(task_info)

all_observers.notify(
progress=Progress(total=len(tasks), current=count),
task_result=task_info,
message=msg,
task_info=task_info,
)

all_observers.notify(status=Status.done)
@@ -210,21 +204,3 @@ def execute(
except Exception as exception:
all_observers.notify(status=Status.failed, exception=exception)
raise


def profiling_info(task_info: TaskInfo) -> str:
profiling_info = []
if task_info.profiling.get("time"):
elapsed = task_info.profiling["time"].elapsed
profiling_info.append(f"time: {pretty_seconds(elapsed)}")
if task_info.profiling.get("memory"): # pragma: no cover
max_allocated = task_info.profiling["memory"].max_allocated
profiling_info.append(f"max memory usage: {pretty_bytes(max_allocated)}")
if task_info.profiling.get("memory"): # pragma: no cover
head_requests = task_info.profiling["requests"].head_count
get_requests = task_info.profiling["requests"].get_count
requests = head_requests + get_requests
transferred = task_info.profiling["requests"].get_bytes
profiling_info.append(f"{requests} GET and HEAD requests")
profiling_info.append(f"{pretty_bytes(transferred)} transferred")
return f" ({', '.join(profiling_info)})"
15 changes: 5 additions & 10 deletions mapchete/executor/dask.py
Original file line number Diff line number Diff line change
@@ -213,22 +213,17 @@ def compute_task_graph(
# send to scheduler
logger.debug("task graph has %s", pretty_bytes(getsizeof(dask_collection)))

with Timer() as t:
futures = self._executor.compute(
dask_collection, optimize_graph=True, traverse=True
)
logger.debug("%s tasks sent to scheduler in %s", len(futures), t)
self._submitted += len(futures)

logger.debug("wait for tasks to finish...")
logger.debug(
"sending %s tasks to cluster and wait for them to finish...",
len(dask_collection),
)
for batch in as_completed(
futures,
self._executor.compute(dask_collection, optimize_graph=True, traverse=True),
with_results=with_results,
raise_errors=raise_errors,
loop=self._executor.loop,
).batches():
for item in batch:
self._submitted -= 1
if with_results: # pragma: no cover
future, result = item
else:
23 changes: 12 additions & 11 deletions mapchete/executor/future.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import logging
import os
from typing import Any, Callable, Dict, Optional, Protocol, Tuple, Union

from dask.distributed import TimeoutError
@@ -10,8 +9,7 @@

from mapchete.errors import MapcheteTaskFailed
from mapchete.executor.types import Result

FUTURE_TIMEOUT = float(os.environ.get("MP_FUTURE_TIMEOUT", 10))
from mapchete.settings import mapchete_options

logger = logging.getLogger(__name__)

@@ -71,7 +69,7 @@ def from_future(
future: FutureProtocol,
lazy: bool = True,
result: Optional[Any] = None,
timeout: int = FUTURE_TIMEOUT,
timeout: int = mapchete_options.future_timeout,
) -> MFuture:
# get status and name if possible
# distributed.Future
@@ -138,7 +136,7 @@ def from_func_partial(func: Callable, item: Any) -> MFuture:
return MFuture(exception=exc)
return MFuture(result=result.output, profiling=result.profiling)

def result(self, timeout: int = FUTURE_TIMEOUT, **kwargs) -> Any:
def result(self, timeout: int = mapchete_options.future_timeout, **kwargs) -> Any:
"""Return task result."""
self._populate_from_future(timeout=timeout)

@@ -158,18 +156,21 @@ def cancelled(self) -> bool: # pragma: no cover
"""Sequential futures cannot be cancelled."""
return self._cancelled

def _populate_from_future(self, timeout: int = FUTURE_TIMEOUT, **kwargs):
def _populate_from_future(
self, timeout: int = mapchete_options.future_timeout, **kwargs
):
"""Fill internal cache with future.result() if future was provided."""
# only check if there is a cached future but no result nor exception
if (
self._future is not None
and self._result is None
and self._exception is None
):
try:
self._set_result(self._future.result(timeout=timeout, **kwargs))
except Exception as exc: # pragma: no cover
exc = self._future.exception(timeout=timeout)
if exc: # pragma: no cover
self._exception = exc
else:
self._set_result(self._future.result(timeout=timeout, **kwargs))

# delete reference to future so it can be released from the dask cluster
self._future = None
@@ -196,7 +197,7 @@ def failed_or_cancelled(self) -> bool:
return self.status in ["error", "cancelled"]
# concurrent.futures futures
else:
return self.exception(timeout=FUTURE_TIMEOUT) is not None
return self.exception(timeout=mapchete_options.future_timeout) is not None

def raise_if_failed(self) -> None:
"""
@@ -209,7 +210,7 @@ def raise_if_failed(self) -> None:
keep_exceptions = (CancelledError, TimeoutError, CommClosedError)

if self.failed_or_cancelled():
exception = self.exception(timeout=FUTURE_TIMEOUT)
exception = self.exception(timeout=mapchete_options.future_timeout)

# sometimes, exceptions are simply empty
if exception is None: # pragma: no cover
2 changes: 2 additions & 0 deletions mapchete/pretty.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
def pretty_bytes(count: float, round_value: int = 2) -> str:
"""Return human readable bytes."""
out = ""

for measurement in [
"bytes",
"KiB",
90 changes: 80 additions & 10 deletions mapchete/processing/types.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from typing import Any, Optional

from mapchete.executor.future import MFuture
from mapchete.executor.types import Result
from mapchete.settings import mapchete_options
from mapchete.tile import BufferedTile


@@ -14,6 +14,8 @@ def default_tile_task_id(tile: BufferedTile) -> str:

@dataclass
class TaskInfo:
"""Storage of metadata from finished task."""

id: str = None
processed: bool = False
process_msg: Optional[str] = None
@@ -24,16 +26,84 @@ class TaskInfo:
profiling: dict = field(default_factory=dict)

@staticmethod
def from_future(future: MFuture) -> TaskInfo:
result = future.result()
def from_future(future: MFuture) -> LazyTaskInfo:
return LazyTaskInfo(future)


class LazyTaskInfo(TaskInfo):
"""Lazy storage of task metadata using a future."""

_future: MFuture
_result: Any = None
_result_is_set: bool = False

def __init__(self, future: MFuture):
self._future = future

def _set_result(self):
if self._result_is_set:
return

result = self._future.result(timeout=mapchete_options.future_timeout)

if isinstance(result, TaskInfo):
task_info = result
if future.profiling:
task_info.profiling = future.profiling
self._id = task_info.id
self._processed = task_info.processed
self._process_msg = task_info.process_msg
self._written = task_info.written
self._write_msg = task_info.write_msg
self._output = task_info.output
self._tile = task_info.tile
self._profiling = task_info.profiling
if self._future.profiling:
self._profiling = self._future.profiling
else: # pragma: no cover
task_info = TaskInfo(
id=future.key if hasattr(future, "key") else future.name,
processed=True,
profiling=future.profiling,
self._id = (
self._future.key if hasattr(self._future, "key") else self._future.name
)
return task_info
self._processed = True
self._profiling = self._future.profiling

self._result_is_set = True
self._future = None

@property
def id(self) -> str: # pragma: no cover
self._set_result()
return self._id

@property
def processed(self) -> bool: # pragma: no cover
self._set_result()
return self._processed

@property
def process_msg(self) -> Optional[str]: # pragma: no cover
self._set_result()
return self._process_msg

@property
def written(self) -> bool: # pragma: no cover
self._set_result()
return self._written

@property
def write_msg(self) -> Optional[str]: # pragma: no cover
self._set_result()
return self._write_msg

@property
def output(self) -> Optional[Any]: # pragma: no cover
self._set_result()
return self._output

@property
def tile(self) -> Optional[BufferedTile]: # pragma: no cover
self._set_result()
return self._tile

@property
def profiling(self) -> dict: # pragma: no cover
self._set_result()
return self._profiling
11 changes: 11 additions & 0 deletions mapchete/settings.py
Original file line number Diff line number Diff line change
@@ -45,3 +45,14 @@ class IORetrySettings(BaseSettings):
backoff: float = 1.0
# read from environment
model_config = SettingsConfigDict(env_prefix="MAPCHETE_IO_RETRY_")


class MapcheteOptions(BaseSettings):
# timeout granted when fetching future results or exceptions
future_timeout: int = 10

# read from environment
model_config = SettingsConfigDict(env_prefix="MAPCHETE_")


mapchete_options = MapcheteOptions()
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ morecantile>=5.0.0
pre-commit
pytest
pytest-cov
pytest-env
pytest-flask
pytest-lazy-fixture
rio-cogeo>=5.0.0
26 changes: 26 additions & 0 deletions test/test_pretty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest

from mapchete.pretty import pretty_bytes, pretty_seconds


@pytest.mark.parametrize(
"bytes,string",
[
(2_000, "KiB"),
(2_000_000, "MiB"),
(2_000_000_000, "GiB"),
],
)
def test_pretty_bytes(bytes, string):
assert string in pretty_bytes(bytes)


@pytest.mark.parametrize(
"seconds,string",
[
(61, "m"),
(3601, "h"),
],
)
def test_pretty_seconds(seconds, string):
assert string in pretty_seconds(seconds)