From 92881bc50e980785e392a05318951a84accb27d7 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Mon, 10 May 2021 16:17:40 +0100 Subject: [PATCH 1/4] add memory monitor --- ceci/monitor.py | 101 ++++++++++++++++++++++++++++++++++++++++++++++++ ceci/stage.py | 16 ++++++++ 2 files changed, 117 insertions(+) create mode 100644 ceci/monitor.py diff --git a/ceci/monitor.py b/ceci/monitor.py new file mode 100644 index 0000000..3318441 --- /dev/null +++ b/ceci/monitor.py @@ -0,0 +1,101 @@ +import time +import psutil +import threading +import datetime + + +class MemoryMonitor: + """ + A monitor which reports on memory usage by this process throughout the lifetime of + a process. + + The monitor is designed to be run in a thread, which is done automatically in the + start_in_thread method, and will then continue until either the main thread ends + or the stop method is called from another thread. + + To print out different process information you could use subclass and override the + log method. + """ + + def __init__(self, interval=30): + """Create a memory monitor. + + Parameters + ---------- + interval: float or int + The interval in seconds between each report. + Default is 30 seconds + """ + self.should_continue = True + self.interval = interval + self.process = psutil.Process() + + @classmethod + def start_in_thread(cls, *args, **kwargs): + """Create a new thread and run the memory monitor in it + + For parameters, see the init method; all arguments sent to this method are + passed directly to it. + + Returns + ------- + monitor: MemoryMonitor + The monitor, already running in its own thread + """ + monitor = cls(*args, **kwargs) + thread = threading.Thread(target=monitor.run) + thread.start() + return monitor + + def stop(self): + """Stop the monitor. + + The monitor will complete its current sleep interval and then end. + + Parameters + ---------- + None + + Returns + ------- + None + """ + self.should_continue = False + + @staticmethod + def log(p): + """Print memory usage information to screen + + By default this method prints the p + + Parameters + ---------- + p: Process + A psutil process + """ + mem = p.memory_info() + # report time since start of process + dt = datetime.timedelta(seconds=time.time() - p.create_time()) + + # Various memory + rss = mem.rss / 1e9 + vms = mem.vms / 1e9 + avail = psutil.virtual_memory().available / 1e9 + + # For now I don't use the python logging mechanism, but + # at some point should probably switch to that. + print( + f"MemoryMonitor Time: {dt} Physical mem: {rss:.3f} GB " + f"Virtual mem: {vms:.3f} GB " + f"Available mem: {avail:.1f} GB" + ) + + def _run(self): + # there are two ways to stop the monitor - it is automatically + # ended if the main thread completes. And it can be stopped + # manually using the stop method. Check for both these. + while threading.main_thread().is_alive(): + if not self.should_continue: + break + self.log(p) + time.sleep(self.interval) diff --git a/ceci/stage.py b/ceci/stage.py index 0d5248d..29b81b7 100644 --- a/ceci/stage.py +++ b/ceci/stage.py @@ -5,6 +5,7 @@ import cProfile from .errors import * +from .monitor import MemoryMonitor SERIAL = "serial" MPI_PARALLEL = "mpi" @@ -356,6 +357,12 @@ def _parse_command_line(cls): type=str, help="Profile the stage using the python cProfile tool", ) + parser.add_argument( + "--memmon", + type=int, + default=0, + help="Report memory use. Argument gives interval in seconds between reports", + ) args = parser.parse_args() return args @@ -383,6 +390,9 @@ def execute(cls, args): profile = cProfile.Profile() profile.enable() + if args.memmon: + monitor = MemoryMonitor(interval=args.memmon) + try: stage.run() except Exception as error: @@ -394,6 +404,12 @@ def execute(cls, args): pdb.post_mortem() else: raise + finally: + if args.memmon: + monitor.stop() + + # The default finalization renames any output files to their + # final location, but subclasses can override to do other things too try: stage.finalize() except Exception as error: From 34c1b85bb164cbf87c8a1a839bf7766f6ae05d8d Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Mon, 10 May 2021 16:32:40 +0100 Subject: [PATCH 2/4] add test and make it work --- ceci/monitor.py | 4 ++-- tests/test_monitor.py | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 tests/test_monitor.py diff --git a/ceci/monitor.py b/ceci/monitor.py index 3318441..e0d0048 100644 --- a/ceci/monitor.py +++ b/ceci/monitor.py @@ -43,7 +43,7 @@ def start_in_thread(cls, *args, **kwargs): The monitor, already running in its own thread """ monitor = cls(*args, **kwargs) - thread = threading.Thread(target=monitor.run) + thread = threading.Thread(target=monitor._run) thread.start() return monitor @@ -97,5 +97,5 @@ def _run(self): while threading.main_thread().is_alive(): if not self.should_continue: break - self.log(p) + self.log(self.process) time.sleep(self.interval) diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 0000000..148832e --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,20 @@ +import time +from ceci.monitor import MemoryMonitor +# should already be installed on CI as we need HDF5 +import numpy as np + + +def test_monitor_report(capsys): + monitor = MemoryMonitor.start_in_thread(interval=1) + time.sleep(2) + x = np.random.uniform(size=10_000_000) + time.sleep(2) + monitor.stop() + captured = capsys.readouterr() + lines = [line for line in captured.out.split("\n") if line.startswith("MemoryMonitor Time")] + assert len(lines) >= 3 + + # check that + gb1 = float(lines[0].split()[5]) + gb2 = float(lines[-1].split()[5]) + assert gb2 - gb1 >= x.nbytes / 1e9 / 1.01 # avoid edge case \ No newline at end of file From 6217fd0f9adc487df697cec8615aa604c792f63a Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Mon, 10 May 2021 16:34:08 +0100 Subject: [PATCH 3/4] improve comments --- tests/test_monitor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 148832e..06be7b6 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -14,7 +14,8 @@ def test_monitor_report(capsys): lines = [line for line in captured.out.split("\n") if line.startswith("MemoryMonitor Time")] assert len(lines) >= 3 - # check that + # check that memory usage is large enough gb1 = float(lines[0].split()[5]) gb2 = float(lines[-1].split()[5]) - assert gb2 - gb1 >= x.nbytes / 1e9 / 1.01 # avoid edge case \ No newline at end of file + # avoid a rounding error by removing an extra 1% + assert gb2 - gb1 >= x.nbytes / 1.01e9 \ No newline at end of file From de236617d580b9942c5735daf91204c5e7775e40 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Mon, 10 May 2021 16:36:25 +0100 Subject: [PATCH 4/4] add command line test --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 77a6f13..4bad725 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,6 +38,9 @@ jobs: ceci tests/test.yml ceci --dry-run tests/test.yml pytest --cov=ceci + # add a test with the memory monitor and profiling switched on + python3 -m ceci_example PZEstimationPipe --DM=./tests/inputs/dm.txt --fiducial_cosmology=./tests/inputs/fiducial_cosmology.txt --config=./tests/config.yml --photoz_pdfs=./tests/outputs/photoz_pdfs.txt --memmon=1 --cprofile=profile.stats + - name: Upload coverage to Codecov uses: codecov/codecov-action@v1