Skip to content

Commit

Permalink
fix ctx remainders; add tests for request profiler
Browse files Browse the repository at this point in the history
  • Loading branch information
ungarj committed Nov 9, 2023
1 parent 519d63b commit f128850
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 14 deletions.
7 changes: 4 additions & 3 deletions mapchete/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from abc import ABC, abstractmethod
from collections import OrderedDict
from concurrent.futures._base import CancelledError
from contextlib import AbstractContextManager
from functools import cached_property, partial
from typing import Any, Callable, Iterator, List, Optional

Expand Down Expand Up @@ -55,7 +54,7 @@ def _wait(self, *args, **kwargs) -> None:
def add_profiler(
self,
name: Optional[str] = None,
ctx: Optional[AbstractContextManager] = None,
decorator: Optional[Callable] = None,
args: Optional[tuple] = None,
kwargs: Optional[dict] = None,
profiler: Optional[Profiler] = None,
Expand All @@ -66,7 +65,9 @@ def add_profiler(
self.profilers.append(name)
else:
self.profilers.append(
Profiler(name=name, ctx=ctx, args=args or (), kwargs=kwargs or {})
Profiler(
name=name, decorator=decorator, args=args or (), kwargs=kwargs or {}
)
)

def _ready(self) -> List[MFuture]:
Expand Down
2 changes: 1 addition & 1 deletion test/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def test_execute_profiling(cleantopo_br_metatiling_1, concurrency, dask_compute_
for profiler in ["time", "memory"]:
assert profiler in task_result.profiling

assert task_result.profiling["time"]._elapsed > 0
assert task_result.profiling["time"].elapsed > 0

assert task_result.profiling["memory"].max_allocated > 0
assert task_result.profiling["memory"].total_allocated > 0
Expand Down
55 changes: 45 additions & 10 deletions test/test_executor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import time
from concurrent.futures._base import CancelledError

import numpy.ma as ma
import pytest

import mapchete
from mapchete import Timer
from mapchete.errors import MapcheteTaskFailed
from mapchete.executor import MFuture
from mapchete.executor.base import Profiler, Result, run_func_with_profilers
from mapchete.io.raster import read_raster_no_crs
from mapchete.processing.profilers import measure_memory, measure_requests, measure_time


Expand Down Expand Up @@ -191,25 +192,59 @@ def raise_cancellederror(*args, **kwargs):
list(dask_executor.as_completed(raise_cancellederror, range(items)))


def test_profile_wrapper():
elapsed_time = 0.2
@pytest.mark.parametrize(
"path",
[
pytest.lazy_fixture("raster_4band"),
],
)
def test_profile_wrapper(path):
result = run_func_with_profilers(
_dummy_process,
1,
fkwargs=dict(sleep=elapsed_time),
read_raster_no_crs,
path,
profilers=[
Profiler(name="time", decorator=measure_time),
Profiler(name="memory", decorator=measure_memory),
],
)
assert isinstance(result, Result)
assert isinstance(result.output, ma.MaskedArray)
assert isinstance(result.profiling, dict)
assert len(result.profiling) == 3
assert result.profiling["time"].elapsed > 0
assert result.profiling["memory"].max_allocated > 0
assert result.profiling["memory"].total_allocated > 0


@pytest.mark.integration
@pytest.mark.parametrize(
"path",
[
pytest.lazy_fixture("raster_4band_s3"),
pytest.lazy_fixture("raster_4band_aws_s3"),
pytest.lazy_fixture("raster_4band_http"),
pytest.lazy_fixture("raster_4band_secure_http"),
],
)
def test_profile_wrapper_requests(path):
result = run_func_with_profilers(
read_raster_no_crs,
path,
profilers=[
Profiler(name="time", decorator=measure_time),
Profiler(name="requests", decorator=measure_requests),
Profiler(name="memory", decorator=measure_memory),
],
)
assert isinstance(result, Result)
assert result.output == 2
assert isinstance(result.output, ma.MaskedArray)
assert isinstance(result.profiling, dict)
assert len(result.profiling) == 3
assert result.profiling["time"].elapsed > elapsed_time
assert result.profiling["time"].elapsed > 0
assert result.profiling["memory"].max_allocated > 0
assert result.profiling["requests"].head_count == 0
assert result.profiling["memory"].total_allocated > 0
assert result.profiling["requests"].get_count > 0
assert result.profiling["requests"].get_bytes > 0


@pytest.mark.parametrize(
Expand All @@ -220,7 +255,7 @@ def test_profiling(executor_fixture, request):
executor = request.getfixturevalue(executor_fixture)

# add profiler
executor.add_profiler("time", Timer)
executor.add_profiler("time", measure_time)

items = list(range(10))
for future in executor.as_completed(_dummy_process, items):
Expand Down

0 comments on commit f128850

Please sign in to comment.