Skip to content

Commit

Permalink
add memray memory tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
ungarj committed Nov 9, 2023
1 parent af68c77 commit fba1a1a
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 2 deletions.
3 changes: 2 additions & 1 deletion mapchete/processing/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from mapchete.executor.future import MFuture
from mapchete.executor.types import Profiler
from mapchete.path import batch_sort_property
from mapchete.processing.profilers import preconfigured_profilers
from mapchete.processing.tasks import TileTask, to_dask_collection
from mapchete.processing.types import PreprocessingProcessInfo, TileProcessInfo
from mapchete.tile import BufferedTile
Expand Down Expand Up @@ -61,7 +62,7 @@ def compute(

profilers = []
if profiling:
profilers = [Profiler(name="time", ctx=Timer)]
profilers = preconfigured_profilers
for profiler in profilers:
executor.add_profiler(profiler)

Expand Down
8 changes: 8 additions & 0 deletions mapchete/processing/profilers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from mapchete.executor.types import Profiler
from mapchete.processing.profilers.memory import MemoryTracker
from mapchete.processing.profilers.time import Timer

preconfigured_profilers = [
Profiler(name="time", ctx=Timer),
Profiler(name="memory", ctx=MemoryTracker),
]
82 changes: 82 additions & 0 deletions mapchete/processing/profilers/memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
import os
import uuid
from contextlib import ExitStack
from tempfile import TemporaryDirectory
from typing import Optional

from mapchete.io import copy
from mapchete.path import MPath
from mapchete.types import MPathLike

logger = logging.getLogger(__name__)


class MemoryTracker:
"""Tracks memory usage inside context."""

max_allocated: int = 0
total_allocated: int = 0
allocations: int = 0
output_file: Optional[MPath] = None

def __init__(
self,
output_file: Optional[MPathLike] = None,
raise_exc_multiple_trackers: bool = True,
):
try:
import memray
except ImportError: # pragma: no cover
raise ImportError("please install memray if you want to use this feature.")
self.output_file = MPath.from_inp(output_file) if output_file else None
self._exit_stack = ExitStack()
self._temp_dir = self._exit_stack.enter_context(TemporaryDirectory())
self._temp_file = str(
MPath(self._temp_dir) / f"{os. getpid()}-{uuid.uuid4().hex}.bin"
)
try:
self._memray_tracker = self._exit_stack.enter_context(
memray.Tracker(self._temp_file, follow_fork=True)
)
except RuntimeError as exc:
if raise_exc_multiple_trackers:
raise
self._memray_tracker = None
logger.exception(exc)

def __str__(self):
max_allocated = f"{self.max_allocated / 1024 / 1024:.2f}MB"
total_allocated = f"{self.total_allocated / 1024 / 1024:.2f}MB"
return f"<MemoryTracker max_allocated={max_allocated}, total_allocated={total_allocated}, allocations={self.allocations}>"

def __repr__(self):
return repr(str(self))

def __enter__(self):
return self

def __exit__(self, *args):
try:
try:
from memray import FileReader
except ImportError: # pragma: no cover
raise ImportError(
"please install memray if you want to use this feature."
)
# close memray.Tracker before attempting to read file
if self._memray_tracker:
self._memray_tracker.__exit__(*args)
reader = FileReader(self._temp_file)
allocations = list(
reader.get_high_watermark_allocation_records(merge_threads=True)
)
self.max_allocated = max(record.size for record in allocations)
self.total_allocated = sum(record.size for record in allocations)
self.allocations = len(allocations)
if self.output_file:
copy(self._temp_file, self.output_file, overwrite=True)
finally:
self._exit_stack.__exit__(*args)
# we need to set this to None, so MemoryTracker can be serialized
self._memray_tracker = None
Empty file.
1 change: 1 addition & 0 deletions mapchete/processing/profilers/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from mapchete.timer import Timer
10 changes: 9 additions & 1 deletion test/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_execute_preprocessing_tasks(concurrency, preprocess_cache_raster_vector
@pytest.mark.parametrize(
"concurrency,dask_compute_graph",
[
("threads", False),
# ("threads", False),
("dask", False),
("dask", True),
("processes", False),
Expand All @@ -192,6 +192,14 @@ def test_execute_profiling(cleantopo_br_metatiling_1, concurrency, dask_compute_
):
assert isinstance(task_result, TaskResult)
assert task_result.profiling
for profiler in ["time", "memory"]:
assert profiler in task_result.profiling

assert task_result.profiling["time"]._elapsed > 0

assert task_result.profiling["memory"].max_allocated > 0
assert task_result.profiling["memory"].total_allocated > 0
assert task_result.profiling["memory"].allocations > 0


def test_convert_geodetic(cleantopo_br_tif, mp_tmpdir):
Expand Down

0 comments on commit fba1a1a

Please sign in to comment.