diff --git a/c++/mpi/communicator.hpp b/c++/mpi/communicator.hpp index 23b531c3..a1898398 100644 --- a/c++/mpi/communicator.hpp +++ b/c++/mpi/communicator.hpp @@ -86,6 +86,9 @@ namespace mpi { * @details Calls `MPI_Comm_split` with the given color and key arguments. See the MPI documentation for more details, * e.g. open-mpi docs. * + * @warning This allocates a new communicator object. Make sure to call `free` on the returned communicator when + * it is no longer needed. + * * @return If mpi::has_env is true, return the split `MPI_Comm` object wrapped in a new mpi::communicator, otherwise * return a default constructed mpi::communicator. */ @@ -98,6 +101,40 @@ namespace mpi { return {}; } + /** + * @brief Duplicate the communicator. + * + * @details Calls `MPI_Comm_dup` to duplicate the communicator. See the MPI documentation for more details, e.g. + * open-mpi docs. + * + * @warning This allocates a new communicator object. Make sure to call `free` on the returned communicator when + * it is no longer needed. + * + * @return If mpi::has_env is true, return the duplicated `MPI_Comm` object wrapped in a new mpi::communicator, + * otherwise return a default constructed mpi::communicator. + */ + [[nodiscard]] communicator duplicate() const { + if (has_env) { + communicator c; + MPI_Comm_dup(_com, &c._com); + return c; + } else + return {}; + } + + /** + * @brief Free the communicator. + * + * @details Calls `MPI_Comm_free` to mark the communicator for deallocation. See the MPI documentation for more + * details, e.g. open-mpi docs + * . + * + * Does nothing, if mpi::has_env is false. + */ + void free() { + if (has_env) { MPI_Comm_free(&_com); } + } + /** * @brief If mpi::has_env is true, `MPI_Abort` is called with the given error code, otherwise std::abort is called. * @param error_code The error code to pass to `MPI_Abort`. diff --git a/c++/mpi/monitor.hpp b/c++/mpi/monitor.hpp index d61ccc1b..bac88e57 100644 --- a/c++/mpi/monitor.hpp +++ b/c++/mpi/monitor.hpp @@ -16,8 +16,7 @@ /** * @file - * @brief Provides a class for monitoring and communicating exceptions and other errors of - * individual processes. + * @brief Provides a class for monitoring and communicating events across multiple processes. */ #pragma once @@ -33,41 +32,54 @@ namespace mpi { /** - * @ingroup err_handling - * @brief Constructed on top of an MPI communicator, this class helps to monitor and communicate - * exceptions and other errors of individual processes. + * @ingroup event_handling + * @brief Constructed on top of an MPI communicator, this class helps to monitor and communicate events across + * multiple processes. * - * @details The root process (process with rank 0) monitors all other processes. If a process encounters - * an error, it sends an emergeny stop request to the root process which forwards it to all the other - * processes. + * @details The root process (rank == 0) monitors all other processes. If a process encounters an event, it sends a + * message to the root process by calling monitor::report_local_event. The root process then broadcasts this + * information to all other processes. + * + * It can be used to check + * - if an event has occurred on any process (monitor::event_on_any_rank) or + * - if an event has occurred on all processes (monitor::event_on_all_ranks). + * + * It uses a duplicate communicator to not interfere with other MPI communications. The communicator is freed in the + * `finalize_communications` function (which is called in the destructor if not called before). */ class monitor { - // Future struct for the non-blocking send/receive done on the root process. + // Future struct for non-blocking MPI communication. struct future { - // MPI request for the non-blocking receive on the root process. + // MPI request of the non-blocking MPI call. MPI_Request request{}; - // 0 means that no error has occurred, 1 means that an error has occurred. - int node_stop = 0; + // 0 means that no event has occurred, 1 means that an event has occurred. + int event = 0; }; // MPI communicator. - mpi::communicator com; + mpi::communicator comm; - // Future objects stored on the root process for every non-root process. + // Future objects stored on the root process for local events on non-root processes. std::vector root_futures; - // MPI request for broadcasting the emergency stop to all non-root processes. - MPI_Request req_ibcast{}; + // MPI request for the broadcasting done on the root process in case an event has occurred on any rank. + MPI_Request req_ibcast_any{}; + + // MPI request for the broadcasting done on the root process in case an event has occurred on all ranks. + MPI_Request req_ibcast_all{}; - // MPI request for sending the emergency stop request to the root process. + // MPI request for the sending done on non-root processes. MPI_Request req_isent{}; - // Set to 1, if the process has encountered a local error and requested an emergency stop. - int local_stop = 0; + // Set to 1, if a local event has occurred on this process. + int local_event = 0; - // Set to 1, if the process has received an emergency stop broadcasted by the root process. - int global_stop = 0; + // Set to 1, if an event has occurred on any process. + int any_event = 0; + + // Set to 1, if an event has occurred on all processes. + int all_events = 0; // Set to true, if finialize_communications() has been called. bool finalized = false; @@ -76,20 +88,25 @@ namespace mpi { /** * @brief Construct a monitor on top of a given mpi::communicator. * - * @details The root process performs a non-blocking receive for every non-root process and waits for - * a non-root process to send an emergency stop request. Non-root processes make a non-blocking broadcast - * call and wait for the root process to broadcast any emergency stop request it has received. + * @details The communicator is duplicated to not interfere with other MPI communications. + * + * The root process (rank == 0) performs a non-blocking receive for every non-root process and waits for a + * non-root process to send a message that an event has occurred. + * + * Non-root processes make two non-blocking broadcast calls and wait for the root process to broadcast a message in + * case an event has occurred on any or on all processes. * * @param c mpi::communicator. */ - monitor(mpi::communicator c) : com(c) { - if (com.rank() == 0) { + monitor(mpi::communicator c) : comm(c.duplicate()) { + if (comm.rank() == 0) { root_futures.resize(c.size() - 1); for (int rank = 1; rank < c.size(); ++rank) { - MPI_Irecv(&(root_futures[rank - 1].node_stop), 1, MPI_INT, rank, 0, MPI_COMM_WORLD, &(root_futures[rank - 1].request)); + MPI_Irecv(&(root_futures[rank - 1].event), 1, MPI_INT, rank, rank, comm.get(), &(root_futures[rank - 1].request)); } } else { - MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast); + MPI_Ibcast(&any_event, 1, MPI_INT, 0, comm.get(), &req_ibcast_any); + MPI_Ibcast(&all_events, 1, MPI_INT, 0, comm.get(), &req_ibcast_all); } } @@ -103,106 +120,162 @@ namespace mpi { ~monitor() { finalize_communications(); } /** - * @brief Request an emergency stop. + * @brief Report a local event to the root process (rank == 0). + * + * @details This function can be called on any process in case a local event has occurred. + * + * On the root process, it immediately broadcasts to all other processes that an event has occurred and further + * checks if all other processes have reported an event as well. If so, it additionally broadcasts to all processes + * that an event has occurred on all processes. * - * @details This function can be called on any process in case a local error has occurred. On the - * root process, it sets its `local_stop` and `global_stop` members to 1 and broadcasts `global_stop` - * to all non-root processes. On non-root processes, it sets `local_stop` to 1 and sends it to the - * root process. + * On non-root processes, it sends a message to the root process that a local event has occurred. */ - void request_emergency_stop() { - EXPECTS(!finalized); + void report_local_event() { // prevent sending multiple signals - if (local_stop) { return; } - - // an error has occurred - local_stop = 1; - if (com.rank() == 0) { - // root broadcasts the global_stop variable - global_stop = 1; - MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast); + if (local_event or finalized) { return; } + + // a local event has occurred + local_event = 1; + if (comm.rank() == 0) { + // on root process, check all other nodes and perform necessary broadcasts + root_check_nodes_and_bcast(); } else { - // non-root sends the local_stop variable to root - MPI_Isend(&local_stop, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &req_isent); + // on non-root processes, let the root process know about the local event + MPI_Isend(&local_event, 1, MPI_INT, 0, comm.rank(), comm.get(), &req_isent); } } /** - * @brief Check if an emergency stop has been requested. + * @brief Check if an event has occurred on any process. + * + * @details This function can be called on any process to check if an event has occurred somewhere. + * + * It returns true, if + * - a local event has occurred or + * - if an event has occurred on some other process which has already been reported to the root process and + * broadcasted to all other processes. * - * @details This function can be called on any process to check if an emergency has occurred somewhere. - * It first checks, if its `local_stop` or `global_stop` members are set to 1 and returns `true` in case - * one of them is. Otherwise, on the root process, it calls `root_check_nodes_and_bcast()` to check if - * some other process has sent an emergency message and to possibly forward the received signal. - * On non-root processes, it checks if the root process has broadcasted an emergency stop, which it has - * received from some other process. + * On the root process (rank == 0), it checks the status of all non-root processes and performs the necessary + * broadcasts in case they have not been done yet. * - * @return True, if an emergency stop has been requested. Otherwise, it returns false. + * @return True, if an event has occurred on any process. */ - [[nodiscard]] bool emergency_occured() { - // if final_communications() has already been called, global_stop == 0 if no error has occurred, otherwise it is 1 - if (finalized) return global_stop; + [[nodiscard]] bool event_on_any_rank() { + // if final_communications() has already been called, any_event == 0 if no event has occurred, otherwise it is 1 + if (finalized) return any_event; - // either a local error has occurred or some other process has requested an emergency stop - if (global_stop or local_stop) return true; + // if a local event has occurred, we return true + if (local_event) return true; - if (com.rank() == 0) { - // root checks if some other process has requested an emergency stop + // on the root process, we first check the status of all non-root processes, perform the necessary broadcasts and + // return true if an event has occurred + if (comm.rank() == 0) { root_check_nodes_and_bcast(); + return any_event; } - return global_stop; + + // on non-root processes, we check the status of the corresponding broadcast and return true if an event has + // occurred + MPI_Status status; + int has_received = 0; + MPI_Test(&req_ibcast_any, &has_received, &status); + return has_received and any_event; + } + + /** + * @brief Check if an event has occurred on all processes. + * + * @details This function can be called on any process to check if an event has occurred on all processes. + * + * It returns true, if an event has occurred on all processes which has already been reported to the root process + * and broadcasted to all other processes. + * + * On the root process (rank == 0), it checks the status of all non-root processes and performs the necessary + * broadcasts in case it has not been done yet. + * + * @return True, if an event has occurred on all processes. + */ + [[nodiscard]] bool event_on_all_ranks() { + // if final_communications() has already been called, all_events == 0 if an event has not occurred on every + // process, otherwise it is 1 + if (finalized) return all_events; + + // on the root process, we first check the status of all non-root processes, perform the necessary broadcasts and + // return true if an event has occurred on all of them + if (comm.rank() == 0) { + root_check_nodes_and_bcast(); + return all_events; + } + + // on non-root processes, we check the status of the broadcast and return true if an event has occurred on all + // processes + MPI_Status status; + int has_received = 0; + MPI_Test(&req_ibcast_all, &has_received, &status); + return has_received and all_events; } /** * @brief Finalize all pending communications. * - * @details At the end of this function, all processes have completed their work or have had a local - * emergency stop. The member `global_stop` is guaranteed to be the same on all processes when this - * function returns. + * @details At the end of this function, all MPI communications have been completed and the values of the member + * variables will not change anymore due to some member function calls. + * + * Furthermore, it frees the used communicator. */ void finalize_communications() { + // prevent multiple calls if (finalized) return; - if (com.rank() == 0) { - // root just listens to the other processes and bcasts the global_stop until everyone is done - while (root_check_nodes_and_bcast()) { usleep(100); } // 100 us (micro seconds) - // all others node have finished - // if the root has never emitted the ibcast, we do it now - if (not global_stop) { MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast); } + + if (comm.rank() == 0) { + // on root process, wait for all non-root processes to finish their MPI_Isend calls + while (root_check_nodes_and_bcast()) { + usleep(100); // 100 us (micro seconds) + } + // and perform broadcasts in case they have not been done yet + if (not any_event) { MPI_Ibcast(&any_event, 1, MPI_INT, 0, comm.get(), &req_ibcast_any); } + if (not all_events) { MPI_Ibcast(&all_events, 1, MPI_INT, 0, comm.get(), &req_ibcast_all); } } else { - // on non-root node: either Isend was done when local_stop was set to 1 during request_emergency_stop, - // or it has to happen now, i.e, work is done, and fine. - if (not local_stop) { MPI_Isend(&local_stop, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &req_isent); } + // on non-root processes, perform MPI_Isend call in case it has not been done yet + if (not local_event) { MPI_Isend(&local_event, 1, MPI_INT, 0, comm.rank(), comm.get(), &req_isent); } } - // all nodes wait for the ibcast of the global_stop to be complete - MPI_Status status; - MPI_Wait(&req_ibcast, &status); + + // all nodes wait for the broadcasts to be completed + MPI_Status status_any, status_all; + MPI_Wait(&req_ibcast_any, &status_any); + MPI_Wait(&req_ibcast_all, &status_all); + + // free the communicator + comm.free(); finalized = true; } private: - /** - * @brief Check if any non-root process has sent a stop request. If so, broadcast to all other processes - * in case it has not been done yet. - * - * @return True, if at least one process has not finished the `MPI_Isend` of the `local_stop` variable to - * the root process. Otherwise, it returns false. - */ + // Root process checks the status of all non-root processes, performs necessary broadcasts and returns a boolean + // that is true if at least one non-root process has not performed its MPI_Isend call yet. bool root_check_nodes_and_bcast() { EXPECTS(!finalized); - EXPECTS(com.rank() == 0); - // loop over all non-root processes - bool some_nodes_are_still_running = false; - for (auto &[request, node_stop] : root_futures) { - // check for an emergency stop request + EXPECTS(comm.rank() == root); + bool any = false; + bool all = true; + bool finished = true; + for (auto &[request, rank_event] : root_futures) { MPI_Status status; - int comm_received = 0; - MPI_Test(&request, &comm_received, &status); - // for the first time an emergency stop has been requested -> root calls request_emergency_stop() - // to broadcast to all other processes - if (comm_received and (not global_stop) and node_stop) request_emergency_stop(); // the root requires the stop now. It also stops itself... - some_nodes_are_still_running |= (not comm_received); + int rank_received = 0; + MPI_Test(&request, &rank_received, &status); + any |= (rank_received and rank_event); + all &= (rank_received and rank_event); + finished &= rank_received; + } + if (not any_event and (any or local_event)) { + any_event = 1; + MPI_Ibcast(&any_event, 1, MPI_INT, 0, comm.get(), &req_ibcast_any); + } + if (not all_events and all and local_event) { + all_events = 1; + MPI_Ibcast(&all_events, 1, MPI_INT, 0, comm.get(), &req_ibcast_all); } - return some_nodes_are_still_running; + return not finished; } }; diff --git a/doc/DoxygenLayout.xml b/doc/DoxygenLayout.xml index 240145da..ecd0a1ce 100644 --- a/doc/DoxygenLayout.xml +++ b/doc/DoxygenLayout.xml @@ -84,7 +84,7 @@ - + diff --git a/doc/documentation.md b/doc/documentation.md index 2722befd..68775462 100644 --- a/doc/documentation.md +++ b/doc/documentation.md @@ -68,9 +68,10 @@ Most users probably won't need to use this functionality directly. We refer the interested reader to [TRIQS/nda](https://github.com/TRIQS/nda/blob/unstable/c%2B%2B/nda/mpi/reduce.hpp) for more details. -## Error handling +## Event handling -@ref err_handling provides the mpi::monitor class which can be used to communicate and handle errors across multiple processes. +@ref event_handling provides the mpi::monitor class which can be used to communicate and handle events across multiple +processes. @ref ex2 shows a simple use case. diff --git a/doc/ex2.md b/doc/ex2.md index 024c8c17..7d2d7287 100644 --- a/doc/ex2.md +++ b/doc/ex2.md @@ -17,35 +17,35 @@ int main(int argc, char *argv[]) { // initialize monitor mpi::monitor monitor(world); - // in case a stop has been requested, print some info and return true + // in case an event has occurred, print some info and return true auto stop = [&monitor, world](int i) { bool res = false; - if (monitor.emergency_occured()) { - std::cerr << "Processor " << world.rank() << ": After " << i << " steps an emergency stop has been received.\n"; + if (monitor.event_on_any_rank()) { + std::cerr << "Processor " << world.rank() << ": After " << i << " steps an event has been communicated.\n"; res = true; } return res; }; - // loop as long as no stop has been requested - int rank_to_req = 3; + // loop as long as no event has occurred + int event_rank = 3; for (int i = 0; i < 1000000; ++i) { - // request a stop on processor 3 - if (world.rank() == rank_to_req) { - std::cerr << "Processor " << rank_to_req << ": Emergency stop requested.\n"; - monitor.request_emergency_stop(); + // report a local event on the event_rank + if (world.rank() == event_rank) { + std::cerr << "Processor " << event_rank << ": Local event reported.\n"; + monitor.report_local_event(); } // should we stop the loop? if (stop(i)) break; } - // check if all processes finished without an error + // check if all processes finished the loop if (world.rank() == 0) { - if (monitor.emergency_occured()) { - std::cout << "Oh no! An error occurred somewhere.\n"; + if (monitor.event_on_any_rank()) { + std::cout << "Oh no! An event occurred somewhere and the loop has not been finished on all processes.\n"; } else { - std::cout << "No worries, all processes finished without an error.\n"; + std::cout << "No worries, all processes have finished the loop.\n"; } } } @@ -54,24 +54,24 @@ int main(int argc, char *argv[]) { Output (running with `-n 12`): ``` -Processor 3: Emergency stop requested. -Processor 3: After 0 steps an emergency stop has been received. -Processor 2: After 5950 steps an emergency stop has been received. -Processor 4: After 10475 steps an emergency stop has been received. -Processor 5: After 7379 steps an emergency stop has been received. -Processor 6: After 8366 steps an emergency stop has been received. -Processor 7: After 1302 steps an emergency stop has been received. -Processor 8: After 1155 steps an emergency stop has been received. -Processor 9: After 14445 steps an emergency stop has been received. -Processor 11: After 9287 steps an emergency stop has been received. -Processor 0: After 0 steps an emergency stop has been received. -Processor 1: After 7443 steps an emergency stop has been received. -Processor 10: After 1321 steps an emergency stop has been received. -Oh no! An error occurred somewhere. +Processor 3: Local event reported. +Processor 3: After 0 steps an event has been communicated. +Processor 4: After 8428 steps an event has been communicated. +Processor 0: After 0 steps an event has been communicated. +Processor 8: After 10723 steps an event has been communicated. +Processor 5: After 10426 steps an event has been communicated. +Processor 6: After 12172 steps an event has been communicated. +Processor 7: After 9014 steps an event has been communicated. +Processor 1: After 400 steps an event has been communicated. +Processor 2: After 1646 steps an event has been communicated. +Processor 11: After 12637 steps an event has been communicated. +Processor 10: After 9120 steps an event has been communicated. +Processor 9: After 1 steps an event has been communicated. +Oh no! An event occurred somewhere and the loop has not been finished on all processes. ``` Output (running with `-n 3`): ``` -No worries, all processes finished without an error. +No worries, all processes have finished the loop. ``` \ No newline at end of file diff --git a/doc/groups.dox b/doc/groups.dox index 568c6471..ccfb102d 100644 --- a/doc/groups.dox +++ b/doc/groups.dox @@ -54,10 +54,10 @@ */ /** - * @defgroup err_handling Error handling - * @brief Communicate and handle errors across multiple processes. + * @defgroup event_handling Event handling + * @brief Communicate and handle events across multiple processes. * - * @details A typical use case for the mpi::monitor class could be: + * @details A typical use case for the mpi::monitor class could be to monitor and communicate exceptions: * * @code{.cpp} * // initialize monitor @@ -65,19 +65,19 @@ * ... * * // loop as long as everything is fine - * while (!monitor.emergency_occured()) { + * while (!monitor.event_on_any_rank()) { * try { * // do some work * ... * } catch (my_exception const &e) { - * // send an emergency stop request - * monitor.request_emergency_stop(); + * // report an exception + * monitor.report_local_event(); * } * } * - * // finalize communications and check if the computation finished due to an error + * // finalize communications and check if the computation finished due to an exception * monitor.finalize_communications(); - * if (!monitor.emergency_occured()) { + * if (!monitor.event_on_any_rank()) { * // do some clean up and maybe stop the program * ... * } diff --git a/test/c++/mpi_monitor.cpp b/test/c++/mpi_monitor.cpp index dab028a4..8deb909b 100644 --- a/test/c++/mpi_monitor.cpp +++ b/test/c++/mpi_monitor.cpp @@ -16,73 +16,81 @@ #include #include +#include #include #include +#include #include #include // in micro second = 1 milli second const int delta_tau_sleep = 1000; -// Monitor all nodes while some of them might fail. +// Monitor all nodes while some of them might report an event. // -// c: MPI communicator -// fastest_node: rank of the fastest node -// rank_failing: ranks of the nodes that will fail -// iteration_failure: iteration at which the nodes will fail -bool test(mpi::communicator c, int fastest_node, std::vector rank_failing, int iteration_failure = 3) { +// c: MPI communicator. +// fastest_node: Rank of the fastest node. +// rank_reporting: Ranks of the nodes that will report an event. +// all_events: If true, the all_events_occurred() function will be used instead of some_event_occurred(). +// iteration_event: Iteration at which the nodes will report an event. +bool test_monitor(mpi::communicator c, int fastest_node, std::vector rank_reporting, bool all_events = false, int iteration_event = 3) { const int niter = 10; const int size = c.size(); int sleeptime = delta_tau_sleep * (((c.rank() - fastest_node + size) % size) + 1); - bool will_fail = std::any_of(rank_failing.cbegin(), rank_failing.cend(), [&c](int i) { return i == c.rank(); }); + bool will_fail = std::any_of(rank_reporting.cbegin(), rank_reporting.cend(), [&c](int i) { return i == c.rank(); }); std::cerr << "Node " << c.rank() << ": sleeptime " << sleeptime << std::endl; mpi::monitor monitor{c}; + auto events_occurred = [all_events, &monitor]() { return all_events ? monitor.event_on_all_ranks() : monitor.event_on_any_rank(); }; - for (int i = 0; (!monitor.emergency_occured()) and (i < niter); ++i) { + for (int i = 0; (!events_occurred()) and (i < niter); ++i) { usleep(sleeptime); - std::cerr << "Node " << c.rank() << "is in iteration " << i << std::endl; - if (will_fail and (i >= iteration_failure)) { + std::cerr << "Node " << c.rank() << " is in iteration " << i << std::endl; + if (will_fail and (i >= iteration_event)) { std::cerr << "Node " << c.rank() << " is failing" << std::endl; - monitor.request_emergency_stop(); - monitor.request_emergency_stop(); // 2nd call should not resend MPI message + monitor.report_local_event(); + monitor.report_local_event(); // 2nd call should not resend MPI message } if (i == niter - 1) { std::cerr << "Node " << c.rank() << " has done all tasks" << std::endl; } } monitor.finalize_communications(); std::cerr << "Ending on node " << c.rank() << std::endl; - return not monitor.emergency_occured(); + return not events_occurred(); } -TEST(MPI, MonitorNoFailure) { - // no failure +TEST(MPI, MonitorNoEvent) { + // no event usleep(1000); mpi::communicator world; for (int i = 0; i < world.size(); ++i) { world.barrier(); if (world.rank() == 0) std::cerr << "***\nNode " << i << " is the fastest" << std::endl; - EXPECT_TRUE(test(world, i, {})); + EXPECT_TRUE(test_monitor(world, i, {})); + world.barrier(); + EXPECT_TRUE(test_monitor(world, i, {}, true)); world.barrier(); } } -TEST(MPI, MonitorOneFailureOnRoot) { - // root node fails +TEST(MPI, MonitorOneEventOnRoot) { + // one event on root node usleep(1000); mpi::communicator world; for (int i = 0; i < world.size(); ++i) { world.barrier(); if (world.rank() == 0) std::cerr << "***\nNode " << i << " is the fastest" << std::endl; - EXPECT_EQ(test(world, i, {0}), false); + EXPECT_EQ(test_monitor(world, i, {0}), false); + world.barrier(); + EXPECT_EQ(test_monitor(world, i, {0}, true), world.size() > 1); world.barrier(); } usleep(1000); } -TEST(MPI, MonitorOneFailureOnNonRoot) { - // one non-root node fails +TEST(MPI, MonitorOneEventOnNonRoot) { + // one event on non-root node usleep(1000); mpi::communicator world; if (world.size() < 2) { @@ -92,15 +100,17 @@ TEST(MPI, MonitorOneFailureOnNonRoot) { world.barrier(); if (world.rank() == 0) std::cerr << "***\nNode " << i << " is the fastest" << std::endl; bool has_failure = (world.size() > 1 ? false : true); // No failure if only rank 0 exists - EXPECT_EQ(test(world, i, {1}), has_failure); + EXPECT_EQ(test_monitor(world, i, {1}), has_failure); + world.barrier(); + EXPECT_EQ(test_monitor(world, i, {1}, true), world.size() > 1); world.barrier(); } } usleep(1000); } -TEST(MPI, MonitorTwoFailuresWithRoot) { - // two nodes fail including the root process +TEST(MPI, MonitorTwoEventsWithRoot) { + // two events on nodes including the root process usleep(1000); mpi::communicator world; if (world.size() < 2) { @@ -109,15 +119,17 @@ TEST(MPI, MonitorTwoFailuresWithRoot) { for (int i = 0; i < world.size(); ++i) { world.barrier(); if (world.rank() == 0) std::cerr << "***\nNode " << i << " is the fastest" << std::endl; - EXPECT_EQ(test(world, i, {0, 1}), false); + EXPECT_EQ(test_monitor(world, i, {0, 1}), false); + world.barrier(); + EXPECT_EQ(test_monitor(world, i, {0, 1}, true), world.size() > 2); world.barrier(); } } usleep(1000); } -TEST(MPI, MonitorTwoFailuresWithoutRoot) { - // two nodes fail excluding the root process +TEST(MPI, MonitorTwoEventsWithoutRoot) { + // two events on nodes excluding the root process usleep(1000); mpi::communicator world; if (world.size() < 3) { @@ -126,11 +138,54 @@ TEST(MPI, MonitorTwoFailuresWithoutRoot) { for (int i = 0; i < world.size(); ++i) { world.barrier(); if (world.rank() == 0) std::cerr << "***\nNode " << i << " is the fastest" << std::endl; - EXPECT_EQ(test(world, i, {1, 2}), false); + EXPECT_EQ(test_monitor(world, i, {1, 2}), false); + world.barrier(); + EXPECT_EQ(test_monitor(world, i, {1, 2}, true), world.size() > 2); world.barrier(); } } usleep(1000); } +TEST(MPI, MonitorAllEvents) { + // events on all nodes + usleep(1000); + mpi::communicator world; + std::vector rank_reporting(world.size()); + std::iota(rank_reporting.begin(), rank_reporting.end(), 0); + for (int i = 0; i < world.size(); ++i) { + world.barrier(); + if (world.rank() == 0) std::cerr << "***\nNode " << i << " is the fastest" << std::endl; + EXPECT_FALSE(test_monitor(world, i, rank_reporting)); + world.barrier(); + EXPECT_FALSE(test_monitor(world, i, rank_reporting, true)); + world.barrier(); + } + usleep(1000); +} + +TEST(MPI, MultipleMonitors) { + // test multiple monitors + usleep(1000); + mpi::communicator world; + mpi::monitor monitor1{world}; + mpi::monitor monitor2{world}; + mpi::monitor monitor3{world}; + if (world.rank() == 0) { + monitor3.report_local_event(); + } + monitor2.report_local_event(); + monitor1.finalize_communications(); + monitor2.finalize_communications(); + monitor3.finalize_communications(); + EXPECT_FALSE(monitor1.event_on_any_rank()); + EXPECT_FALSE(monitor1.event_on_all_ranks()); + EXPECT_TRUE(monitor2.event_on_any_rank()); + EXPECT_TRUE(monitor2.event_on_all_ranks()); + EXPECT_TRUE(monitor3.event_on_any_rank()); + if (world.size() == 1) EXPECT_TRUE(monitor3.event_on_all_ranks()); + else EXPECT_FALSE(monitor3.event_on_all_ranks()); + usleep(1000); +} + MPI_TEST_MAIN;