Skip to content

Commit

Permalink
phlop runtime process monitoring (#881)
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilipDeegan authored Oct 16, 2024
1 parent bf121b3 commit 2fa9a75
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 26 deletions.
5 changes: 3 additions & 2 deletions pyphare/pyphare/cpp/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ def log_runtime_config():
git_hash=get_git_hash(),
)

rank_info_dir = DOT_PHARE_DIR / "rank_info"
if cpp_lib.mpi_rank() == 0:
DOT_PHARE_DIR.mkdir(exist_ok=True, parents=True)
rank_info_dir.mkdir(exist_ok=True, parents=True)
cpp_lib.mpi_barrier()

rank_dir = DOT_PHARE_DIR / f"rank_{cpp_lib.mpi_rank()}"
rank_dir = rank_info_dir / f"{cpp_lib.mpi_rank()}"
rank_dir.mkdir(exist_ok=True)

with open(rank_dir / "runtime_config.json", "w") as f:
Expand Down
48 changes: 48 additions & 0 deletions pyphare/pyphare/simulator/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from pathlib import Path


def have_phlop():
from importlib.util import find_spec

try:
return find_spec("phlop.dict") is not None
except (ImportError, ModuleNotFoundError):
return False


def valdict(**kwargs):
if not have_phlop():
return dict

from phlop.dict import ValDict # pylint: disable=import-error

return ValDict(**kwargs)


_globals = valdict(stats_man=None)


def monitoring_yaml_file(cpplib):
path = Path(".phare") / "stats" / f"rank.{cpplib.mpi_rank()}.yaml"
path.parent.mkdir(exist_ok=True, parents=True)
return path


def setup_monitoring(cpplib, interval=10):
if not have_phlop():
return

from phlop.app import stats_man as sm # pylint: disable=import-error

_globals.stats_man = sm.AttachableRuntimeStatsManager(
valdict(yaml=monitoring_yaml_file(cpplib), interval=interval),
dict(rank=cpplib.mpi_rank()),
).start()


def monitoring_shutdown(cpplib):
if not have_phlop():
return

if _globals.stats_man:
_globals.stats_man.kill().join()
23 changes: 18 additions & 5 deletions pyphare/pyphare/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
#
#

import os
import datetime
import atexit
import time as timem
import numpy as np
import pyphare.pharein as ph
from pathlib import Path
from . import monitoring as mon


life_cycles = {}
SIM_MONITOR = os.getenv("PHARE_SIM_MON", "False").lower() in ("true", "1", "t")
SCOPE_TIMING = os.getenv("PHARE_SCOPE_TIMING", "False").lower() in ("true", "1", "t")


@atexit.register
Expand All @@ -24,6 +29,9 @@ def simulator_shutdown():
def make_cpp_simulator(dim, interp, nbrRefinedPart, hier):
from pyphare.cpp import cpp_lib

if SCOPE_TIMING:
Path(".phare/timings").mkdir(exist_ok=True)

make_sim = f"make_simulator_{dim}_{interp}_{nbrRefinedPart}"
return getattr(cpp_lib(), make_sim)(hier)

Expand Down Expand Up @@ -127,6 +135,7 @@ def initialize(self):

self.cpp_sim.initialize()
self._auto_dump() # first dump might be before first advance

return self
except:
import sys
Expand All @@ -140,7 +149,6 @@ def initialize(self):

def _throw(self, e):
import sys
from pyphare.cpp import cpp_lib

print_rank0(e)
sys.exit(1)
Expand Down Expand Up @@ -170,12 +178,19 @@ def times(self):
self.timeStep(),
)

def run(self, plot_times=False):
def run(self, plot_times=False, monitoring=None):
"""monitoring requires phlop"""
from pyphare.cpp import cpp_lib

self._check_init()

if monitoring is None: # check env
monitoring = SIM_MONITOR

if self.simulation.dry_run:
return self
if monitoring:
mon.setup_monitoring(cpp_lib())
perf = []
end_time = self.cpp_sim.endTime()
t = self.cpp_sim.currentTime()
Expand All @@ -197,6 +212,7 @@ def run(self, plot_times=False):
if plot_times:
plot_timestep_time(perf)

mon.monitoring_shutdown(cpp_lib())
return self.reset()

def _auto_dump(self):
Expand Down Expand Up @@ -263,13 +279,10 @@ def _log_to_file(self):
DATETIME_FILES - logfile with starting datetime timestamp per rank
NONE - no logging files, display to cout
"""
import os

if "PHARE_LOG" not in os.environ:
os.environ["PHARE_LOG"] = "RANK_FILES"
from pyphare.cpp import cpp_lib

if os.environ["PHARE_LOG"] != "NONE" and cpp_lib().mpi_rank() == 0:
from pathlib import Path

Path(".log").mkdir(exist_ok=True)
4 changes: 4 additions & 0 deletions src/amr/solvers/solver_ppc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ void SolverPPC<HybridModel, AMR_Types>::fillMessengerInfo(
template<typename HybridModel, typename AMR_Types>
void SolverPPC<HybridModel, AMR_Types>::saveState_(level_t& level, ModelViews_t& views)
{
PHARE_LOG_SCOPE(1, "SolverPPC::saveState_");

for (auto& state : views)
{
std::stringstream ss;
Expand All @@ -232,6 +234,8 @@ void SolverPPC<HybridModel, AMR_Types>::saveState_(level_t& level, ModelViews_t&
template<typename HybridModel, typename AMR_Types>
void SolverPPC<HybridModel, AMR_Types>::restoreState_(level_t& level, ModelViews_t& views)
{
PHARE_LOG_SCOPE(1, "SolverPPC::restoreState_");

for (auto& state : views)
{
std::stringstream ss;
Expand Down
2 changes: 1 addition & 1 deletion src/hdf5/detail/h5/h5_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class HighFiveFile
fapl.add(HighFive::MPIOFileAccess{MPI_COMM_WORLD, MPI_INFO_NULL});
#else
std::cout << "WARNING: PARALLEL HDF5 not available" << std::endl;
if (core::mpi_size() > 1)
if (core::mpi::size() > 1)
{
throw std::runtime_error("HDF5 NOT PARALLEL!");
}
Expand Down
2 changes: 1 addition & 1 deletion src/phare/phare.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SamraiLifeCycle
PHARE_WITH_PHLOP( //
if (auto e = core::get_env("PHARE_SCOPE_TIMING", "false"); e == "1" || e == "true")
phlop::ScopeTimerMan::INSTANCE()
.file_name(".phare_times." + std::to_string(core::mpi::rank()) + ".txt")
.file_name(".phare/timings/rank." + std::to_string(core::mpi::rank()) + ".txt")
.init(); //
)
}
Expand Down
4 changes: 2 additions & 2 deletions src/simulator/simulator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ std::string Simulator<_dimension, _interp_order, _nbRefinedPart>::to_str()
template<std::size_t _dimension, std::size_t _interp_order, std::size_t _nbRefinedPart>
void Simulator<_dimension, _interp_order, _nbRefinedPart>::initialize()
{
PHARE_LOG_SCOPE(1, "Simulator::initialize");

try
{
if (isInitialized)
Expand Down Expand Up @@ -414,8 +416,6 @@ double Simulator<_dimension, _interp_order, _nbRefinedPart>::advance(double dt)

try
{
PHARE_LOG_SCOPE(1, "Simulator::advance");

dt_new = integrator_->advance(dt);
currentTime_ = startTime_ + ((*timeStamper) += dt);
}
Expand Down
8 changes: 5 additions & 3 deletions tests/functional/harris/harris_2d_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

mpl.use("Agg")

SCOPE_TIMING = os.getenv("PHARE_SCOPE_TIMING", "True").lower() in ("true", "1", "t")
LOAD_BALANCE = os.getenv("LOAD_BALANCE", "True").lower() in ("true", "1", "t")

cpp = cpp_lib()
Expand Down Expand Up @@ -171,12 +172,12 @@ def plot(diag_dir):
for c in ["x", "y", "z"]:
run.GetB(time).plot(
filename=plot_file_for_qty(f"b{c}", time),
qty=f"B{c}",
qty=f"{c}",
plot_patches=True,
)
run.GetJ(time).plot(
filename=plot_file_for_qty("jz", time),
qty="Jz",
qty="z",
plot_patches=True,
vmin=-2,
vmax=2,
Expand All @@ -200,7 +201,8 @@ def test_run(self):
Simulator(config()).run().reset()
if cpp.mpi_rank() == 0:
plot(diag_dir)
m_plotting.plot_run_timer_data(diag_dir, cpp.mpi_rank())
if SCOPE_TIMING:
m_plotting.plot_run_timer_data(diag_dir, cpp.mpi_rank())
cpp.mpi_barrier()
return self

Expand Down
67 changes: 56 additions & 11 deletions tools/python3/phloping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@
# parsing PHARE scope funtion timers
#

import sys
import argparse
import numpy as np
from dataclasses import dataclass, field

from pyphare.pharesee.run import Run
from pyphare.pharesee.hierarchy import hierarchy_from

from phlop.timing.scope_timer import ScopeTimerFile as phScopeTimerFile
from phlop.timing.scope_timer import file_parser as phfile_parser

from phlop.timing import scope_timer as st

substeps_per_finer_level = 4


@dataclass
class ScopeTimerFile(phScopeTimerFile):
class ScopeTimerFile(st.ScopeTimerFile):
run: Run
rank: str
advances: list = field(default_factory=lambda: [])
Expand Down Expand Up @@ -124,20 +122,67 @@ def normalised_times_for_L(self, ilvl):
"""
Normalise substep time against particle count for that level
at the most recent coarse time, no refined timesteps
Particle counts may include init dump, so be one bigger.
"""
times = self.advance_times_for_L(ilvl)
counts = len(self.particles_per_level_per_time_step[ilvl])

# trim init particle count for lvl
Li_times = (
self.particles_per_level_per_time_step[ilvl]
if counts == len(times)
else self.particles_per_level_per_time_step[ilvl][1:]
)
if ilvl == 0:
return times / self.particles_per_level_per_time_step[0]
return times / Li_times
substeps = self.steps_per_coarse_timestep_for_L(ilvl)
norm_times = times.copy()
return (
norm_times.reshape(int(times.shape[0] / substeps), substeps)
/ self.particles_per_level_per_time_step[ilvl].reshape(
self.particles_per_level_per_time_step[ilvl].shape[0], 1
)
/ Li_times.reshape(Li_times.shape[0], 1)
).reshape(times.shape[0])


def file_parser(run, rank, times_filepath):
supe = phfile_parser(times_filepath)
supe = st.file_parser(times_filepath)
return ScopeTimerFile(supe.id_keys, supe.roots, run, str(rank))


def write_root_as_csv(scope_timer_file, outfile, headers=None, regex=None):
from contextlib import redirect_stdout

with open(outfile, "w") as f:
with redirect_stdout(f):
print_root_as_csv(scope_timer_file, headers, regex)


def print_root_as_csv(scope_timer_file, n_parts, headers=None, regex=None):
stf = scope_timer_file # alias
stf = file_parser(stf) if isinstance(stf, str) else stf

if headers:
print(",".join(headers))
for root in stf.roots:
s = stf(root.k)
if regex and regex not in s:
continue
bits = s.split(",")
print(f"{s}{root.t},{root.t/n_parts}")


def print_variance_across(scope_timer_filepath=None):
if scope_timer_filepath is None: # assume cli
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file", default=None, help="timer file")
scope_timer_filepath = parser.parse_args().file
if not scope_timer_filepath:
parser.print_help()
sys.exit(1)
st.print_variance_across(scope_timer_filepath)


if __name__ == "__main__":
if len(sys.argv) > 1:
fn = sys.argv[1]
sys.argv = [sys.argv[0]] + sys.argv[2:]
globals()[fn]()
2 changes: 1 addition & 1 deletion tools/python3/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def plot_run_timer_data(diag_dir=None, rank=0):
parser.add_argument("-d", "--dir", default=".", help="Diagnostics directory")
diag_dir = parser.parse_args().dir
run = Run(diag_dir)
res = phloping.file_parser(run, rank, Path(f".phare_times.{rank}.txt"))
res = phloping.file_parser(run, rank, Path(f".phare/timings/rank.{rank}.txt"))
fig, ax = plt.subplots()
L0X = res.time_steps_for_L(0)
ax.plot(L0X, res.normalised_times_for_L(0), ":", label="L0 times", color="black")
Expand Down

0 comments on commit 2fa9a75

Please sign in to comment.