Skip to content

Commit

Permalink
continue with the refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
lucafedeli88 committed Jul 29, 2024
1 parent 3f33a53 commit a144b99
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 68 deletions.
21 changes: 5 additions & 16 deletions Source/Diagnostics/ReducedDiags/LoadBalanceCosts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,33 +109,22 @@ void LoadBalanceCosts::ComputeDiags (int step)
m_data.resize(dataSize, 0.0_rt);
m_data.assign(dataSize, 0.0_rt);

// read in WarpX costs to local copy; compute if using `Heuristic` update
amrex::Vector<std::unique_ptr<amrex::LayoutData<amrex::Real> > > costs;

costs.resize(nLevels);
for (int lev = 0; lev < nLevels; ++lev)
{
costs[lev] = std::make_unique<LayoutData<Real>>(*WarpX::getCosts(lev));
}

if (LoadBalance::get_instance().get_update_algo() == CostsUpdateAlgo::Heuristic)
{
warpx.ComputeCostsHeuristic(costs);
}

// keep track of correct index in array over all boxes on all levels
// shift index for m_data
int shift_m_data = 0;

// save data
for (int lev = 0; lev < nLevels; ++lev)
{
// read in WarpX costs
const auto& costs_at_lev = load_balance.get_costs(lev);

const amrex::DistributionMapping& dm = warpx.DistributionMap(lev);
const MultiFab & Ex = warpx.getField(FieldType::Efield_aux, lev,0);
for (MFIter mfi(Ex, false); mfi.isValid(); ++mfi)
{
const Box& tbx = mfi.tilebox();
m_data[shift_m_data + mfi.index()*m_nDataFields + 0] = (*costs[lev])[mfi.index()];
m_data[shift_m_data + mfi.index()*m_nDataFields + 0] = (*costs_at_lev)[mfi.index()];
m_data[shift_m_data + mfi.index()*m_nDataFields + 1] = dm[mfi.index()];
m_data[shift_m_data + mfi.index()*m_nDataFields + 2] = lev;
m_data[shift_m_data + mfi.index()*m_nDataFields + 3] = tbx.loVect()[0];
Expand All @@ -158,7 +147,7 @@ void LoadBalanceCosts::ComputeDiags (int step)
}

// we looped through all the boxes on level lev, update the shift index
shift_m_data += m_nDataFields*(costs[lev]->size());
shift_m_data += m_nDataFields*(costs_at_lev->size());
}

// parallel reduce to IO proc and get data over all procs
Expand Down
3 changes: 3 additions & 0 deletions Source/Evolve/WarpXEvolve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ WarpX::Evolve (int numsteps)
ExecutePythonCallback("afterstep");

/// reduced diags
//
// TODO: conditionally recompute weights
//
if (reduced_diags->m_plot_rd != 0)
{
reduced_diags->LoadBalance();
Expand Down
3 changes: 3 additions & 0 deletions Source/Initialization/WarpXInitData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ WarpX::InitData ()
multi_diags->FilterComputePackFlush(istep[0] - 1);

// Write reduced diagnostics before the first iteration.
//
// TODO: conditionally recompute weights
//
if (reduced_diags->m_plot_rd != 0)
{
reduced_diags->ComputeDiags(istep[0] - 1);
Expand Down
20 changes: 17 additions & 3 deletions Source/LoadBalance/LoadBalance.H
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ namespace warpx::load_balance
SpaceFillingCurve
};

struct LoadBalanceResult
{
amrex::DistributionMapping dm;

// These store efficiency (meaning, the average 'cost' over all ranks,
// normalized to max cost) for current and proposed distribution mappings
amrex::Real currentEfficiency = 0.0;
amrex::Real proposedEfficiency = 0.0;
};

class CostTracker
{
public:
Expand Down Expand Up @@ -76,9 +86,11 @@ namespace warpx::load_balance

void set_efficiency (int lev, amrex::Real val);

[[nodiscard]]
amrex::Real get_efficiency (int lev) const;

[[nodiscard]]
amrex::Real get_efficiency_ratio_threshold () const;

[[nodiscard]]
const std::unique_ptr<amrex::LayoutData<amrex::Real>>& get_costs (int lev) const;

Expand All @@ -101,8 +113,11 @@ namespace warpx::load_balance
*/
void compute_costs_if_heuristic (
int finest_level,
const amrex::Vector<std::array< std::unique_ptr<amrex::MultiFab>, 3 > >& efield_ref,
const MultiParticleContainer& mypc_ref);

LoadBalanceResult compute_new_distribution_mapping(int lev) const;

protected:
LoadBalance (){};
~ LoadBalance (){}
Expand Down Expand Up @@ -140,8 +155,7 @@ namespace warpx::load_balance
* here means the average cost per MPI rank. */
amrex::Real m_efficiency_ratio_threshold = 0.0;

/** Current load balance efficiency for each level. */
amrex::Vector<amrex::Real> m_efficiency;
std::vector<amrex::Real> m_efficiency;

/** Weight factor for cells in `Heuristic` costs update.
* Default values on GPU are determined from single-GPU tests on Summit.
Expand Down
42 changes: 39 additions & 3 deletions Source/LoadBalance/LoadBalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <AMReX_Gpu.H>
#include <AMReX_GpuAtomic.H>
#include <AMReX_ParallelDescriptor.H>
#include <AMReX_ParmParse.H>

#include <string>
Expand Down Expand Up @@ -298,6 +299,14 @@ amrex::Real LoadBalance::get_efficiency (const int lev) const
return m_efficiency[lev];
}

[[nodiscard]]
amrex::Real LoadBalance::get_efficiency_ratio_threshold () const
{
WARPX_ALWAYS_ASSERT_WITH_MESSAGE(m_initialized,
"LoadBalance must be initialized before calling get_efficiency_ratio_threshold");
return m_efficiency_ratio_threshold;
}

void LoadBalance::reset_costs (const int finest_level)
{
WARPX_ALWAYS_ASSERT_WITH_MESSAGE(m_initialized,
Expand Down Expand Up @@ -350,6 +359,7 @@ void LoadBalance::allocate (const int lev,

void LoadBalance::compute_costs_if_heuristic (
const int finest_level,
const amrex::Vector<std::array< std::unique_ptr<amrex::MultiFab>, 3 > >& efield_ref,
const MultiParticleContainer& mypc_ref)
{
WARPX_ALWAYS_ASSERT_WITH_MESSAGE(m_initialized,
Expand All @@ -374,11 +384,37 @@ void LoadBalance::compute_costs_if_heuristic (
}

// Cell loop
MultiFab* Ex = Efield_fp[lev][0].get();
for (MFIter mfi(*Ex, false); mfi.isValid(); ++mfi)
const auto Ex = efield_ref[lev][0].get();
for (amrex::MFIter mfi(*Ex, false); mfi.isValid(); ++mfi)
{
const Box& gbx = mfi.growntilebox();
const amrex::Box& gbx = mfi.growntilebox();
(*m_costs[lev])[mfi.index()] += m_costs_heuristic_cells_wt*gbx.numPts();
}
}
}

LoadBalanceResult LoadBalance::compute_new_distribution_mapping(int lev) const
{
using namespace amrex;

const Real nboxes = m_costs[lev]->size();
const Real nprocs = ParallelContext::NProcsSub();
const int nmax = static_cast<int>(std::ceil(nboxes/nprocs*m_knapsack_factor));

// Compute the new distribution mapping
LoadBalanceResult res;

const bool broadcast_to_all_false = false;
res.dm = (m_strategy == LoadBalanceStrategy::SpaceFillingCurve)?
DistributionMapping::makeSFC(*m_costs[lev],
res.currentEfficiency, res.proposedEfficiency,
broadcast_to_all_false,
ParallelDescriptor::IOProcessorNumber()):
DistributionMapping::makeKnapSack(*m_costs[lev],
res.currentEfficiency, res.proposedEfficiency,
nmax,
broadcast_to_all_false,
ParallelDescriptor::IOProcessorNumber());

return res;
}
66 changes: 20 additions & 46 deletions Source/Parallelization/WarpXRegrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,78 +73,58 @@ WarpX::LoadBalance ()
WARPX_PROFILE_REGION("LoadBalance");
WARPX_PROFILE("WarpX::LoadBalance()");

AMREX_ALWAYS_ASSERT(!costs.empty());
AMREX_ALWAYS_ASSERT(costs[0] != nullptr);

#ifdef AMREX_USE_MPI
if (LoadBalance::get_instance().get_update_algo() == CostsUpdateAlgo::Heuristic)
{
// compute the costs on a per-rank basis
ComputeCostsHeuristic(costs);
}

// By default, do not do a redistribute; this toggles to true if RemakeLevel
// is called for any level
int loadBalancedAnyLevel = false;

const int nLevels = finestLevel();

auto& load_balance = LoadBalance::get_instance();
load_balance.compute_costs_if_heuristic(
nLevels, Efield_fp, *mypc);

for (int lev = 0; lev <= nLevels; ++lev)
{
int doLoadBalance = false;

// Compute the new distribution mapping
DistributionMapping newdm;
const amrex::Real nboxes = costs[lev]->size();
const amrex::Real nprocs = ParallelContext::NProcsSub();
const int nmax = static_cast<int>(std::ceil(nboxes/nprocs*load_balance_knapsack_factor));
// These store efficiency (meaning, the average 'cost' over all ranks,
// normalized to max cost) for current and proposed distribution mappings
amrex::Real currentEfficiency = 0.0;
amrex::Real proposedEfficiency = 0.0;

newdm = (load_balance_with_sfc)
? DistributionMapping::makeSFC(*costs[lev],
currentEfficiency, proposedEfficiency,
false,
ParallelDescriptor::IOProcessorNumber())
: DistributionMapping::makeKnapSack(*costs[lev],
currentEfficiency, proposedEfficiency,
nmax,
false,
ParallelDescriptor::IOProcessorNumber());
auto load_balance_result = load_balance.compute_new_distribution_mapping(lev);

// As specified in the above calls to makeSFC and makeKnapSack, the new
// distribution mapping is NOT communicated to all ranks; the loadbalanced
// dm is up-to-date only on root, and we can decide whether to broadcast
if ((load_balance_efficiency_ratio_threshold > 0.0)
if ((load_balance.get_efficiency_ratio_threshold() > 0.0)
&& (ParallelDescriptor::MyProc() == ParallelDescriptor::IOProcessorNumber()))
{
doLoadBalance = (proposedEfficiency > load_balance_efficiency_ratio_threshold*currentEfficiency);
doLoadBalance = (load_balance_result.proposedEfficiency > load_balance_result.currentEfficiency);
}

ParallelDescriptor::Bcast(&doLoadBalance, 1,
ParallelDescriptor::IOProcessorNumber());
ParallelDescriptor::IOProcessorNumber());

if (doLoadBalance)
{
Vector<int> pmap;
if (ParallelDescriptor::MyProc() == ParallelDescriptor::IOProcessorNumber())
{
pmap = newdm.ProcessorMap();
pmap = load_balance_result.dm.ProcessorMap();
} else
{
const auto& costs = load_balance.get_costs(lev);
const Real nboxes = costs->size();
pmap.resize(static_cast<std::size_t>(nboxes));
}
ParallelDescriptor::Bcast(pmap.data(), pmap.size(), ParallelDescriptor::IOProcessorNumber());

if (ParallelDescriptor::MyProc() != ParallelDescriptor::IOProcessorNumber())
{
newdm = DistributionMapping(pmap);
load_balance_result.dm = DistributionMapping(pmap);
}

RemakeLevel(lev, t_new[lev], boxArray(lev), newdm);
RemakeLevel(lev, t_new[lev], boxArray(lev), load_balance_result.dm);

// Record the load balance efficiency
setLoadBalanceEfficiency(lev, proposedEfficiency);
load_balance.set_efficiency(lev, load_balance_result.proposedEfficiency);
}

loadBalancedAnyLevel = loadBalancedAnyLevel || doLoadBalance;
Expand Down Expand Up @@ -375,16 +355,10 @@ WarpX::RemakeLevel (int lev, Real /*time*/, const BoxArray& ba, const Distributi
// Re-initialize the lattice element finder with the new ba and dm.
m_accelerator_lattice[lev]->InitElementFinder(lev, ba, dm);

if (costs[lev] != nullptr)
{
costs[lev] = std::make_unique<LayoutData<Real>>(ba, dm);
const auto iarr = costs[lev]->IndexArray();
for (const auto& i : iarr)
{
(*costs[lev])[i] = 0.0;
setLoadBalanceEfficiency(lev, -1);
}
}
auto& load_balance = LoadBalance::get_instance();

load_balance.set_costs(lev, 0.0);
load_balance.set_efficiency(lev, -1);

SetDistributionMap(lev, dm);

Expand Down

0 comments on commit a144b99

Please sign in to comment.