Skip to content

Commit

Permalink
Merge pull request #94 from do-jason/mpi_opt
Browse files Browse the repository at this point in the history
MPI collective addition for some routines
  • Loading branch information
biochem-fan authored Apr 3, 2023
2 parents 563d1bf + 947437d commit 561a8bf
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ if(ALTCPU)
add_definitions(-DALTCPU)
endif()

# ----------------------------------------------------------------- MPI COLLECTIVE --
if(ALTCPU)
add_definitions(-DUSE_MPI_COLLECTIVE) # Adding MPI collective for some routines
endif()

# ---------------------------------------------------------------USE TEXTURES OR NOT--
if(NOT CudaTexture OR ALTCPU)
add_definitions(-DPROJECTOR_NO_TEXTURES)
Expand Down
2 changes: 1 addition & 1 deletion src/acc/cpu/cpu_kernels/diff2.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ inline void diff2_coarse(
diffi[j] = 0.0;

#if _OPENMP > 201307 // For OpenMP 4.5 and later
#pragma omp simd reduction(+:diffi[:eulers_per_block])
#pragma omp simd reduction(+:diffi)
#endif
for (int tid=0; tid<block_sz; tid++) {
// This will generate masked SVML routines for Intel compiler
Expand Down
4 changes: 2 additions & 2 deletions src/apps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ if(OPENMP_FOUND)
target_link_libraries(relion_lib ${OpenMP_omp_LIBRARY})
endif()

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_C_FLAGS "-std=c99 ${CMAKE_C_FLAGS}")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")

# Set this flag to activate bounds checking in stl-vectors (incl. strings)
# It is useful to do this periodically, as it catches
Expand Down
33 changes: 21 additions & 12 deletions src/ml_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1798,15 +1798,7 @@ void MlWsumModel::initZeros()

}

//#define DEBUG_PACK
#ifdef DEBUG_PACK
#define MAX_PACK_SIZE 100000
#else
// Approximately 1024*1024*1024/8/2 ~ 0.5 Gb
#define MAX_PACK_SIZE 67101000
#endif

void MlWsumModel::pack(MultidimArray<RFLOAT> &packed)
unsigned long long MlWsumModel::getPackSize()
{
unsigned long long packed_size = 0;
int spectral_size = (ori_size / 2) + 1;
Expand All @@ -1823,15 +1815,32 @@ void MlWsumModel::pack(MultidimArray<RFLOAT> &packed)

// for all class-related stuff
// data is complex: multiply by two!
packed_size += nr_classes * nr_bodies * 2 * (unsigned long long)BPref[0].getSize();
packed_size += nr_classes * nr_bodies * (unsigned long long)BPref[0].getSize();
packed_size += nr_classes * nr_bodies * (unsigned long long)nr_directions;
packed_size += nr_classes * nr_bodies * 2 * (unsigned long long) BPref[0].getSize(); // BPref.data
packed_size += nr_classes * nr_bodies * (unsigned long long) BPref[0].getSize(); // BPref.weight
packed_size += nr_classes * nr_bodies * (unsigned long long) nr_directions; // pdf_directions

// for pdf_class
packed_size += nr_classes;

// for priors for each class
if (ref_dim==2)
packed_size += nr_classes*2;

return packed_size;
}

//#define DEBUG_PACK
#ifdef DEBUG_PACK
#define MAX_PACK_SIZE 100000
#else
// Approximately 1024*1024*1024/8/2 ~ 0.5 Gb
#define MAX_PACK_SIZE 67101000
#endif

void MlWsumModel::pack(MultidimArray<RFLOAT> &packed)
{
unsigned long long packed_size = getPackSize();

// Get memory for the packed array
packed.clear();
packed.resize(packed_size);
Expand Down
3 changes: 3 additions & 0 deletions src/ml_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ class MlWsumModel: public MlModel
// Initialize all weighted sums to zero (with resizing the BPrefs to current_size)
void initZeros();

// Return the current pack size
unsigned long long getPackSize();

// Pack entire structure into one large MultidimArray<RFLOAT> for reading/writing to disc
// To save memory, the model itself will be cleared after packing.
void pack(MultidimArray<RFLOAT> &packed);
Expand Down
92 changes: 90 additions & 2 deletions src/ml_optimiser_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ void MlOptimiserMpi::initialise()
}

grad_pseudo_halfsets = gradient_refine && !do_split_random_halves;
#ifdef _CUDA_ENABLED
/************************************************************************/
//Setup GPU related resources
int devCount, deviceAffinity;
bool is_split(false);

#ifdef _CUDA_ENABLED
/************************************************************************/
//Setup GPU related resources
Expand Down Expand Up @@ -546,6 +552,12 @@ void MlOptimiserMpi::initialiseWorkLoad()
// Get the same random number generator seed for all mpi nodes
if (random_seed == -1)
{
#ifdef USE_MPI_COLLECTIVE
if (node->isLeader())
random_seed = time(NULL);

node->relion_MPI_Bcast(&random_seed, 1, MPI_INT, 0, MPI_COMM_WORLD);
#else
if (node->isLeader())
{
random_seed = time(NULL);
Expand All @@ -557,6 +569,7 @@ void MlOptimiserMpi::initialiseWorkLoad()
MPI_Status status;
node->relion_MPI_Recv(&random_seed, 1, MPI_INT, 0, MPITAG_RANDOMSEED, MPI_COMM_WORLD, status);
}
#endif
}

// Also randomize random-number-generator for perturbations on the angles
Expand All @@ -574,23 +587,29 @@ void MlOptimiserMpi::initialiseWorkLoad()
{
// First do half-set 1
if (node->isLeader()) mydata.getNumberOfImagesPerGroup(mymodel.nr_particles_per_group, 1);
#ifdef USE_MPI_COLLECTIVE
node->relion_MPI_Bcast(&mymodel.nr_particles_per_group[0], mymodel.nr_particles_per_group.size(), MPI_LONG, 0, node->root_oddC);
#else
for (int follower = 1; follower < node->size; follower+=2)
{
MPI_Status status;
if (node->isLeader()) node->relion_MPI_Send(&mymodel.nr_particles_per_group[0], mymodel.nr_particles_per_group.size(), MPI_LONG, follower, MPITAG_METADATA, MPI_COMM_WORLD);
else if (node->rank == follower) node->relion_MPI_Recv(&mymodel.nr_particles_per_group[0], mymodel.nr_particles_per_group.size(), MPI_LONG, 0, MPITAG_METADATA, MPI_COMM_WORLD, status);
}

#endif

// Then do half-set 2
if (node->isLeader()) mydata.getNumberOfImagesPerGroup(mymodel.nr_particles_per_group, 2);
#ifdef USE_MPI_COLLECTIVE
node->relion_MPI_Bcast(&mymodel.nr_particles_per_group[0], mymodel.nr_particles_per_group.size(), MPI_LONG, 0, node->root_evenC);
#else
for (int follower = 2; follower < node->size; follower+=2)
{
MPI_Status status;
if (node->isLeader()) node->relion_MPI_Send(&mymodel.nr_particles_per_group[0], mymodel.nr_particles_per_group.size(), MPI_LONG, follower, MPITAG_METADATA, MPI_COMM_WORLD);
else if (node->rank == follower) node->relion_MPI_Recv(&mymodel.nr_particles_per_group[0], mymodel.nr_particles_per_group.size(), MPI_LONG, 0, MPITAG_METADATA, MPI_COMM_WORLD, status);
}

#endif
}
else
{
Expand Down Expand Up @@ -1689,6 +1708,24 @@ void MlOptimiserMpi::combineAllWeightedSums()
// Only combine weighted sums if there are more than one followers per subset!
if ((node->size - 1)/nr_halfsets > 1)
{
#ifdef USE_MPI_COLLECTIVE
if (!node->isLeader())
{
// First all followers pack up their wsum_model
wsum_model.pack(Mpack);
Msum.initZeros(Mpack);

if (do_split_random_halves)
node->relion_MPI_Allreduce(MULTIDIM_ARRAY(Mpack), MULTIDIM_ARRAY(Msum), MULTIDIM_SIZE(Mpack), MY_MPI_DOUBLE, MPI_SUM, node->splitC);
else
node->relion_MPI_Allreduce(MULTIDIM_ARRAY(Mpack), MULTIDIM_ARRAY(Msum), MULTIDIM_SIZE(Mpack), MY_MPI_DOUBLE, MPI_SUM, node->followerC);
#ifdef DEBUG
if (node->rank == 1) std::cerr << " MPI_Allreduce MULTIDIM_SIZE(Mpack)= "<< MULTIDIM_SIZE(Mpack) << std::endl;
#endif

wsum_model.unpack(Msum);
}
#else
// Loop over possibly multiple instances of Mpack of maximum size
int piece = 0;
int nr_pieces = 1;
Expand Down Expand Up @@ -1806,6 +1843,7 @@ void MlOptimiserMpi::combineAllWeightedSums()
} // end for piece

MPI_Barrier(MPI_COMM_WORLD);
#endif
}

#ifdef TIMING
Expand Down Expand Up @@ -1875,6 +1913,33 @@ void MlOptimiserMpi::combineWeightedSumsTwoRandomHalves()
MultidimArray<RFLOAT> Mpack, Msum;
MPI_Status status;

#ifdef USE_MPI_COLLECTIVE
// The leader does not have a wsum_model!
if (!node->isLeader())
{
if (node->rank == 1)
{
if (verb > 0) std::cout << " Combining two random halves ..."<< std::endl;
wsum_model.pack(Mpack);
Msum.resize(wsum_model.getPackSize());
node->relion_MPI_Recv(MULTIDIM_ARRAY(Msum), MULTIDIM_SIZE(Msum), MY_MPI_DOUBLE, 2, MPITAG_PACK, MPI_COMM_WORLD, status);
Mpack += Msum;
}
else if (node->rank == 2)
{
wsum_model.pack(Mpack);
node->relion_MPI_Send(MULTIDIM_ARRAY(Mpack), MULTIDIM_SIZE(Mpack), MY_MPI_DOUBLE, 1, MPITAG_PACK, MPI_COMM_WORLD);
}
else
Mpack.resize(wsum_model.getPackSize());

// rank one sends Mpack to everyone else
node->relion_MPI_Bcast(MULTIDIM_ARRAY(Mpack), MULTIDIM_SIZE(Mpack), MY_MPI_DOUBLE, 0, node->followerC);

// Everyone unpacks the new Mpack
wsum_model.unpack(Mpack);
}
#else
int piece = 0;
int nr_pieces = 1;
long int pack_size;
Expand Down Expand Up @@ -1937,6 +2002,7 @@ void MlOptimiserMpi::combineWeightedSumsTwoRandomHalves()
Mpack.clear();
}
}
#endif
}

void MlOptimiserMpi::maximization()
Expand Down Expand Up @@ -2309,6 +2375,27 @@ void MlOptimiserMpi::maximization()
{
if (!do_join_random_halves)
{
#ifdef USE_MPI_COLLECTIVE
if (!node->isLeader())
{
// first pass halfset1, second pass halfset2
int reconstruct_rank = 2 * (ith_recons % ((node->size-1)/2)) + node->myRandomSubset();

// This is MPI rank in splitC communicator(split random halves run)
int split_rank = node->getSplitRank(reconstruct_rank);

node->relion_MPI_Bcast(MULTIDIM_ARRAY(mymodel.Iref[ith_recons]), MULTIDIM_SIZE(mymodel.Iref[ith_recons]), MY_MPI_DOUBLE, split_rank, node->splitC);
node->relion_MPI_Bcast(MULTIDIM_ARRAY(mymodel.data_vs_prior_class[ith_recons]), MULTIDIM_SIZE(mymodel.data_vs_prior_class[ith_recons]), MY_MPI_DOUBLE, split_rank, node->splitC);
node->relion_MPI_Bcast(MULTIDIM_ARRAY(mymodel.fourier_coverage_class[ith_recons]), MULTIDIM_SIZE(mymodel.fourier_coverage_class[ith_recons]), MY_MPI_DOUBLE, split_rank, node->splitC);
node->relion_MPI_Bcast(MULTIDIM_ARRAY(mymodel.sigma2_class[ith_recons]), MULTIDIM_SIZE(mymodel.sigma2_class[ith_recons]), MY_MPI_DOUBLE, split_rank, node->splitC);

if (do_grad)
{
node->relion_MPI_Bcast(MULTIDIM_ARRAY(mymodel.Igrad1[ith_recons]), MULTIDIM_SIZE(mymodel.Igrad1[ith_recons]), MY_MPI_DOUBLE, split_rank, node->splitC);
node->relion_MPI_Bcast(MULTIDIM_ARRAY(mymodel.Igrad2[ith_recons]), MULTIDIM_SIZE(mymodel.Igrad2[ith_recons]), MY_MPI_DOUBLE, split_rank, node->splitC);
}
}
#else
MPI_Status status;
// Make sure I am sending from the rank where the reconstruction was done (see above) to all other followers of this subset
// Loop twice through this, as each class was reconstructed by two different followers!!
Expand Down Expand Up @@ -2367,6 +2454,7 @@ void MlOptimiserMpi::maximization()
}
}
}
#endif
// No one should continue until we're all here
MPI_Barrier(MPI_COMM_WORLD);

Expand Down
Loading

0 comments on commit 561a8bf

Please sign in to comment.