Skip to content

Commit

Permalink
Merge pull request #4 from parcollet/monitor
Browse files Browse the repository at this point in the history
Monitor
  • Loading branch information
Wentzell authored Sep 22, 2020
2 parents 8572d7b + 57b35a8 commit dbf2ac1
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
compile_commands.json
doc/cpp2rst_generated
*.pyc
.*.swp
.*.swo
.*.swn
127 changes: 127 additions & 0 deletions c++/mpi/macros.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) 2019-2020 Simons Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.txt
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _CCQ_MACROS_GUARD_H
#define _CCQ_MACROS_GUARD_H

// CCQ, TRIQS general macros
// GUARD IT do not use pragma once
// hence one can simply include them in every projects

// --- Stringify macros -----

#define AS_STRING(...) AS_STRING2(__VA_ARGS__)
#define AS_STRING2(...) #__VA_ARGS__

#define PRINT(X) std::cerr << AS_STRING(X) << " = " << X << " at " << __FILE__ << ":" << __LINE__ << '\n'
#define NDA_PRINT(X) std::cerr << AS_STRING(X) << " = " << X << " at " << __FILE__ << ":" << __LINE__ << '\n'

// --- Concept macros -----

#if (__cplusplus > 201703L)

// C++20
// REQUIRES17 : only in 17, same for 20
// REQUIRES : in both

#define AUTO(X) X auto
#define CONCEPT(X) X

#define REQUIRES17(...)
#define REQUIRES requires
#define REQUIRES20 requires

// C++20 explicit(bool) : degrade it NOTHING in c++17, we can not check easily
#define EXPLICIT explicit

// WARNING : it is critical for our doctools to have REQUIRES as requires, NOT a (...) with __VA_ARGS__
// It is the same effect, but raises unnecessary complications in traversing the AST in libtooling with macros.

#else

// C++17 backward compat mode

#define AUTO(X) auto
#define CONCEPT(X) typename
#define REQUIRES20(...)

#define EXPLICIT(...)

#ifdef __clang__
#define REQUIRES17(...) __attribute__((enable_if(__VA_ARGS__, AS_STRING(__VA_ARGS__))))
#define REQUIRES(...) __attribute__((enable_if(__VA_ARGS__, AS_STRING(__VA_ARGS__))))
#elif __GNUC__
// with the -fconcepts TS only. A degraded concept mode, not exactly the C++20. We return to C++17 + basic require
#define REQUIRES17 requires
#define REQUIRES requires

#endif

#endif

// -----------------------------------------------------------

#define FORCEINLINE __inline__ __attribute__((always_inline))

#ifdef NDEBUG

#define EXPECTS(X)
#define ASSERT(X)
#define ENSURES(X)
#define EXPECTS_WITH_MESSAGE(X, ...)
#define ASSERT_WITH_MESSAGE(X, ...)
#define ENSURES_WITH_MESSAGE(X, ...)

#else

#include <iostream>

#define EXPECTS(X) \
if (!(X)) { \
std::cerr << "Precondition " << AS_STRING(X) << " violated at " << __FILE__ << ":" << __LINE__ << "\n"; \
std::terminate(); \
}
#define ASSERT(X) \
if (!(X)) { \
std::cerr << "Assertion " << AS_STRING(X) << " violated at " << __FILE__ << ":" << __LINE__ << "\n"; \
std::terminate(); \
}
#define ENSURES(X) \
if (!(X)) { \
std::cerr << "Postcondition " << AS_STRING(X) << " violated at " << __FILE__ << ":" << __LINE__ << "\n"; \
std::terminate(); \
}

#define EXPECTS_WITH_MESSAGE(X, ...) \
if (!(X)) { \
std::cerr << "Precondition " << AS_STRING(X) << " violated at " << __FILE__ << ":" << __LINE__ << "\n"; \
std::cerr << "Error message : " << __VA_ARGS__ << std::endl; \
std::terminate(); \
}
#define ASSERT_WITH_MESSAGE(X, ...) \
if (!(X)) { \
std::cerr << "Assertion " << AS_STRING(X) << " violated at " << __FILE__ << ":" << __LINE__ << "\n"; \
std::cerr << "Error message : " << __VA_ARGS__ << std::endl; \
std::terminate(); \
}
#define ENSURES_WITH_MESSAGE(X, ...) \
if (!(X)) { \
std::cerr << "Postcondition " << AS_STRING(X) << " violated at " << __FILE__ << ":" << __LINE__ << "\n"; \
std::cerr << "Error message : " << __VA_ARGS__ << std::endl; \
std::terminate(); \
}

#endif

#endif
147 changes: 147 additions & 0 deletions c++/mpi/monitor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2019 Simons Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.txt
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include "mpi/mpi.hpp"
#include "mpi/macros.hpp"
#include <vector>
#include <algorithm>
#include <unistd.h>

namespace mpi {

/**
* Contructed on top on a mpi communicator, this class
* monitors and syncronizes failure states of nodes, due to e.g. exceptions.
*
* Usage :
* monitor M{comm};
*
* on a node when failure occurs : request_emergency_stop.
*
* on all nodes : emergency_occured -> bool tells where a node a requested an emergency stop
*
* finalize_communications() cleans the monitor object and returns a bool : true iif computation finished normally (no emergency stop).
*/
class monitor {

mpi::communicator com;

struct future {
MPI_Request request;
int value = 0;
};
std::vector<future> root_futures; // communication of local_stop from the nodes to the root. On root only.
MPI_Request req_ibcast, req_isent; // request for the ibcast and isent. On all nodes.

int local_stop = 0; // = 1 if the node has requested an emergency stop. Local to the node. (No bool in MPI.)
int global_stop = 0; // = 1 if any node has requested an emergency stop. Always the same on all nodes.
bool finalized = false; // the finalized() method should be called once,
// and can not just be the desctructor since we use the returned value

public:
/// Constructs on top on a mpi communicator
monitor(mpi::communicator c) : com(c) {
if (com.rank() == 0) { // on root
// Register an async recv for the variable local_stop from every non-root nodes
// The associated send will be issued from each node:
// * with value = 1 in case of emergency stop
// * with value = 0 during finalization
root_futures.resize(c.size() - 1);
for (int rank = 1; rank < c.size(); ++rank) { // the index of root_futures is rank - 1 since there is none for rank = 0
MPI_Irecv(&(root_futures[rank - 1].value), 1, MPI_INT, rank, 0, MPI_COMM_WORLD, &(root_futures[rank - 1].request));
}
} else { // not root
// Register the receive of the ibcast of global_stop that root will issue (in case of emergency stop or during fianlize)
MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast);
}
}

monitor(monitor const &) = delete;
monitor &operator=(monitor const &) = delete;

~monitor() { finalize_communications(); }

/// Request an emergency stop of all nodes contained in the mpi communicator.
/// It send the message to the root.
/// It the local node is the root, immedeatly sends the global ibcast to all nodes can check if to stop
void request_emergency_stop() {
EXPECTS(!finalized);
if (local_stop) { return; } // prevent sending signal multiple times
local_stop = 1;
if (com.rank() == 0) { // root
global_stop = 1;
MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast);
} else { // non root
MPI_Isend(&local_stop, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &req_isent);
}
}

/// Any node: should the calculation should be stopped?
[[nodiscard]] bool emergency_occured() {
if (finalized) return global_stop;
if (global_stop or local_stop) return true;
if (com.rank() == 0) { // root test whether other nodes have emergency stop and communicates signal
root_check_nodes_and_bcast();
} else { // other nodes just listen to the root bcast to see if an emergency stop broadcast has occured
MPI_Status status;
int flag;
MPI_Test(&req_ibcast, &flag, &status);
// if flag, then global_stop is now what was bcasted by the root
}
return global_stop;
}

/// Finalize the monitor.
/// At end of this functions, all nodes have completed their work, or have had an emergency stop.
/// The global_stop result is guaranteed to be the same on all nodes.
void finalize_communications() {
if (finalized) return;
if (com.rank() == 0) {
// the root is done computing, it just listens to the other nodes and bcast the global_stop until everyone is done.
while (root_check_nodes_and_bcast()) { usleep(100); } // 100 us (micro seconds)
// all others node have finished
// if the root has never emitted the ibcast, we do it and wait it, since we can not cancel it FIXME (why ??).
if (!global_stop) { MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast); }
} else {
// on non-root node: either Isend was done when local_stop was set (during request_emergency_stop),
// or it has to happen now, sending local_stop = 0, i.e, work is done, and fine.
if (!local_stop) { MPI_Isend(&local_stop, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &req_isent); }
}
// All nodes wait for the ibcast of global_stop to be complete.
MPI_Status status;
MPI_Wait(&req_ibcast, &status);
finalized = true;
}

private:
// ROOT ONLY.
// looks at all irecv that have arrived, and if they are a stop request, bcast the global stop.
// returns true iff there are still running nodes.
bool root_check_nodes_and_bcast() {
EXPECTS(!finalized);
EXPECTS(com.rank() == 0); // root only
bool some_nodes_are_still_running = false;
for (auto &f : root_futures) {
MPI_Status status;
int flag;
MPI_Test(&(f.request), &flag, &status);
if (flag and (not global_stop) and (f.value > 0)) request_emergency_stop(); // the root requires the stop now. It also stops itself...
some_nodes_are_still_running |= (flag == 0);
}
return some_nodes_are_still_running;
}
};

} // namespace mpi
2 changes: 2 additions & 0 deletions c++/mpi/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ namespace mpi {
return c;
}

void abort(int error_code) { MPI_Abort(_com, error_code); }

#ifdef BOOST_MPI_HPP
// Conversion to and from boost communicator, Keep for backward compatibility
inline operator boost::mpi::communicator() const { return boost::mpi::communicator(_com, boost::mpi::comm_duplicate); }
Expand Down
3 changes: 2 additions & 1 deletion test/c++/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ foreach(test ${all_tests})
target_link_libraries(${test_name} ${PROJECT_NAME}::${PROJECT_NAME}_c openmp ${PROJECT_NAME}_warnings gtest_main)
set_property(TARGET ${test_name} PROPERTY RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${test_dir})
set(test_bin ${CMAKE_CURRENT_BINARY_DIR}/${test_dir}/${test_name})
add_test(NAME ${test_name}_np2 COMMAND ${MPIEXEC_EXECUTABLE} ${MPIEXEC_NUMPROC_FLAG} 4 ${MPIEXEC_PREFLAGS} ${test_bin} ${MPIEXEC_POSTFLAGS} WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${test_dir})
add_test(NAME ${test_name}_np1 COMMAND ${MPIEXEC_EXECUTABLE} ${MPIEXEC_NUMPROC_FLAG} 1 ${MPIEXEC_PREFLAGS} ${test_bin} ${MPIEXEC_POSTFLAGS} WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${test_dir})
add_test(NAME ${test_name}_np2 COMMAND ${MPIEXEC_EXECUTABLE} ${MPIEXEC_NUMPROC_FLAG} 2 ${MPIEXEC_PREFLAGS} ${test_bin} ${MPIEXEC_POSTFLAGS} WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${test_dir})
add_test(NAME ${test_name}_np4 COMMAND ${MPIEXEC_EXECUTABLE} ${MPIEXEC_NUMPROC_FLAG} 4 ${MPIEXEC_PREFLAGS} ${test_bin} ${MPIEXEC_POSTFLAGS} WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${test_dir})
# Run clang-tidy if found
if(CLANG_TIDY_EXECUTABLE)
Expand Down
3 changes: 3 additions & 0 deletions test/c++/comm_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ TEST(Comm, split) {
communicator world;
int rank = world.rank();

// Skip test if only one rank in communicator
if (world.size() == 1) return;

ASSERT_TRUE(2 == world.size() or 4 == world.size());

int colors[] = {0, 2, 1, 1};
Expand Down
Loading

0 comments on commit dbf2ac1

Please sign in to comment.