Skip to content

Commit

Permalink
Merge pull request #54 from LSSTDESC/monitoring
Browse files Browse the repository at this point in the history
Monitoring
  • Loading branch information
joezuntz authored Jul 19, 2021
2 parents c6a1f56 + de23661 commit 836a16f
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ jobs:
ceci tests/test.yml
ceci --dry-run tests/test.yml
pytest --cov=ceci
# add a test with the memory monitor and profiling switched on
python3 -m ceci_example PZEstimationPipe --DM=./tests/inputs/dm.txt --fiducial_cosmology=./tests/inputs/fiducial_cosmology.txt --config=./tests/config.yml --photoz_pdfs=./tests/outputs/photoz_pdfs.txt --memmon=1 --cprofile=profile.stats
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
Expand Down
101 changes: 101 additions & 0 deletions ceci/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import time
import psutil
import threading
import datetime


class MemoryMonitor:
"""
A monitor which reports on memory usage by this process throughout the lifetime of
a process.
The monitor is designed to be run in a thread, which is done automatically in the
start_in_thread method, and will then continue until either the main thread ends
or the stop method is called from another thread.
To print out different process information you could use subclass and override the
log method.
"""

def __init__(self, interval=30):
"""Create a memory monitor.
Parameters
----------
interval: float or int
The interval in seconds between each report.
Default is 30 seconds
"""
self.should_continue = True
self.interval = interval
self.process = psutil.Process()

@classmethod
def start_in_thread(cls, *args, **kwargs):
"""Create a new thread and run the memory monitor in it
For parameters, see the init method; all arguments sent to this method are
passed directly to it.
Returns
-------
monitor: MemoryMonitor
The monitor, already running in its own thread
"""
monitor = cls(*args, **kwargs)
thread = threading.Thread(target=monitor._run)
thread.start()
return monitor

def stop(self):
"""Stop the monitor.
The monitor will complete its current sleep interval and then end.
Parameters
----------
None
Returns
-------
None
"""
self.should_continue = False

@staticmethod
def log(p):
"""Print memory usage information to screen
By default this method prints the p
Parameters
----------
p: Process
A psutil process
"""
mem = p.memory_info()
# report time since start of process
dt = datetime.timedelta(seconds=time.time() - p.create_time())

# Various memory
rss = mem.rss / 1e9
vms = mem.vms / 1e9
avail = psutil.virtual_memory().available / 1e9

# For now I don't use the python logging mechanism, but
# at some point should probably switch to that.
print(
f"MemoryMonitor Time: {dt} Physical mem: {rss:.3f} GB "
f"Virtual mem: {vms:.3f} GB "
f"Available mem: {avail:.1f} GB"
)

def _run(self):
# there are two ways to stop the monitor - it is automatically
# ended if the main thread completes. And it can be stopped
# manually using the stop method. Check for both these.
while threading.main_thread().is_alive():
if not self.should_continue:
break
self.log(self.process)
time.sleep(self.interval)
16 changes: 16 additions & 0 deletions ceci/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import cProfile

from .errors import *
from .monitor import MemoryMonitor

SERIAL = "serial"
MPI_PARALLEL = "mpi"
Expand Down Expand Up @@ -356,6 +357,12 @@ def _parse_command_line(cls):
type=str,
help="Profile the stage using the python cProfile tool",
)
parser.add_argument(
"--memmon",
type=int,
default=0,
help="Report memory use. Argument gives interval in seconds between reports",
)
args = parser.parse_args()
return args

Expand Down Expand Up @@ -383,6 +390,9 @@ def execute(cls, args):
profile = cProfile.Profile()
profile.enable()

if args.memmon:
monitor = MemoryMonitor(interval=args.memmon)

try:
stage.run()
except Exception as error:
Expand All @@ -394,6 +404,12 @@ def execute(cls, args):
pdb.post_mortem()
else:
raise
finally:
if args.memmon:
monitor.stop()

# The default finalization renames any output files to their
# final location, but subclasses can override to do other things too
try:
stage.finalize()
except Exception as error:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import time
from ceci.monitor import MemoryMonitor
# should already be installed on CI as we need HDF5
import numpy as np


def test_monitor_report(capsys):
monitor = MemoryMonitor.start_in_thread(interval=1)
time.sleep(2)
x = np.random.uniform(size=10_000_000)
time.sleep(2)
monitor.stop()
captured = capsys.readouterr()
lines = [line for line in captured.out.split("\n") if line.startswith("MemoryMonitor Time")]
assert len(lines) >= 3

# check that memory usage is large enough
gb1 = float(lines[0].split()[5])
gb2 = float(lines[-1].split()[5])
# avoid a rounding error by removing an extra 1%
assert gb2 - gb1 >= x.nbytes / 1.01e9

0 comments on commit 836a16f

Please sign in to comment.