Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize the monitor class #20

Merged
merged 7 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions c++/mpi/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ namespace mpi {
* @details Calls `MPI_Comm_split` with the given color and key arguments. See the MPI documentation for more details,
* e.g. <a href="https://docs.open-mpi.org/en/v5.0.x/man-openmpi/man3/MPI_Comm_split.3.html">open-mpi docs</a>.
*
* @warning This allocates a new communicator object. Make sure to call `free` on the returned communicator when
* it is no longer needed.
*
* @return If mpi::has_env is true, return the split `MPI_Comm` object wrapped in a new mpi::communicator, otherwise
* return a default constructed mpi::communicator.
*/
Expand All @@ -98,6 +101,40 @@ namespace mpi {
return {};
}

/**
* @brief Duplicate the communicator.
*
* @details Calls `MPI_Comm_dup` to duplicate the communicator. See the MPI documentation for more details, e.g.
* <a href="https://docs.open-mpi.org/en/v5.0.x/man-openmpi/man3/MPI_Comm_dup.3.html">open-mpi docs</a>.
*
* @warning This allocates a new communicator object. Make sure to call `free` on the returned communicator when
* it is no longer needed.
*
* @return If mpi::has_env is true, return the duplicated `MPI_Comm` object wrapped in a new mpi::communicator,
* otherwise return a default constructed mpi::communicator.
*/
[[nodiscard]] communicator duplicate() const {
if (has_env) {
communicator c;
MPI_Comm_dup(_com, &c._com);
return c;
} else
return {};
}

/**
* @brief Free the communicator.
*
* @details Calls `MPI_Comm_free` to mark the communicator for deallocation. See the MPI documentation for more
* details, e.g. <a href="https://docs.open-mpi.org/en/v5.0.x/man-openmpi/man3/MPI_Comm_free.3.html">open-mpi docs
* </a>.
*
* Does nothing, if mpi::has_env is false.
*/
void free() {
if (has_env) { MPI_Comm_free(&_com); }
}

/**
* @brief If mpi::has_env is true, `MPI_Abort` is called with the given error code, otherwise std::abort is called.
* @param error_code The error code to pass to `MPI_Abort`.
Expand Down
263 changes: 168 additions & 95 deletions c++/mpi/monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

/**
* @file
* @brief Provides a class for monitoring and communicating exceptions and other errors of
* individual processes.
* @brief Provides a class for monitoring and communicating events across multiple processes.
*/

#pragma once
Expand All @@ -33,41 +32,54 @@
namespace mpi {

/**
* @ingroup err_handling
* @brief Constructed on top of an MPI communicator, this class helps to monitor and communicate
* exceptions and other errors of individual processes.
* @ingroup event_handling
* @brief Constructed on top of an MPI communicator, this class helps to monitor and communicate events across
* multiple processes.
*
* @details The root process (process with rank 0) monitors all other processes. If a process encounters
* an error, it sends an emergeny stop request to the root process which forwards it to all the other
* processes.
* @details The root process (rank == 0) monitors all other processes. If a process encounters an event, it sends a
* message to the root process by calling monitor::report_local_event. The root process then broadcasts this
* information to all other processes.
*
* It can be used to check
* - if an event has occurred on any process (monitor::event_on_any_rank) or
* - if an event has occurred on all processes (monitor::event_on_all_ranks).
*
* It uses a duplicate communicator to not interfere with other MPI communications. The communicator is freed in the
* `finalize_communications` function (which is called in the destructor if not called before).
*/
class monitor {
// Future struct for the non-blocking send/receive done on the root process.
// Future struct for non-blocking MPI communication.
struct future {
// MPI request for the non-blocking receive on the root process.
// MPI request of the non-blocking MPI call.
MPI_Request request{};

// 0 means that no error has occurred, 1 means that an error has occurred.
int node_stop = 0;
// 0 means that no event has occurred, 1 means that an event has occurred.
int event = 0;
};

// MPI communicator.
mpi::communicator com;
mpi::communicator comm;

// Future objects stored on the root process for every non-root process.
// Future objects stored on the root process for local events on non-root processes.
std::vector<future> root_futures;

// MPI request for broadcasting the emergency stop to all non-root processes.
MPI_Request req_ibcast{};
// MPI request for the broadcasting done on the root process in case an event has occurred on any rank.
MPI_Request req_ibcast_any{};

// MPI request for the broadcasting done on the root process in case an event has occurred on all ranks.
MPI_Request req_ibcast_all{};

// MPI request for sending the emergency stop request to the root process.
// MPI request for the sending done on non-root processes.
MPI_Request req_isent{};

// Set to 1, if the process has encountered a local error and requested an emergency stop.
int local_stop = 0;
// Set to 1, if a local event has occurred on this process.
int local_event = 0;

// Set to 1, if the process has received an emergency stop broadcasted by the root process.
int global_stop = 0;
// Set to 1, if an event has occurred on any process.
int any_event = 0;

// Set to 1, if an event has occurred on all processes.
int all_events = 0;

// Set to true, if finialize_communications() has been called.
bool finalized = false;
Expand All @@ -76,20 +88,25 @@ namespace mpi {
/**
* @brief Construct a monitor on top of a given mpi::communicator.
*
* @details The root process performs a non-blocking receive for every non-root process and waits for
* a non-root process to send an emergency stop request. Non-root processes make a non-blocking broadcast
* call and wait for the root process to broadcast any emergency stop request it has received.
* @details The communicator is duplicated to not interfere with other MPI communications.
*
* The root process (rank == 0) performs a non-blocking receive for every non-root process and waits for a
* non-root process to send a message that an event has occurred.
*
* Non-root processes make two non-blocking broadcast calls and wait for the root process to broadcast a message in
* case an event has occurred on any or on all processes.
*
* @param c mpi::communicator.
*/
monitor(mpi::communicator c) : com(c) {
if (com.rank() == 0) {
monitor(mpi::communicator c) : comm(c.duplicate()) {
if (comm.rank() == 0) {
root_futures.resize(c.size() - 1);
for (int rank = 1; rank < c.size(); ++rank) {
MPI_Irecv(&(root_futures[rank - 1].node_stop), 1, MPI_INT, rank, 0, MPI_COMM_WORLD, &(root_futures[rank - 1].request));
MPI_Irecv(&(root_futures[rank - 1].event), 1, MPI_INT, rank, rank, comm.get(), &(root_futures[rank - 1].request));
}
} else {
MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast);
MPI_Ibcast(&any_event, 1, MPI_INT, 0, comm.get(), &req_ibcast_any);
MPI_Ibcast(&all_events, 1, MPI_INT, 0, comm.get(), &req_ibcast_all);
}
}

Expand All @@ -103,106 +120,162 @@ namespace mpi {
~monitor() { finalize_communications(); }

/**
* @brief Request an emergency stop.
* @brief Report a local event to the root process (rank == 0).
*
* @details This function can be called on any process in case a local event has occurred.
*
* On the root process, it immediately broadcasts to all other processes that an event has occurred and further
* checks if all other processes have reported an event as well. If so, it additionally broadcasts to all processes
* that an event has occurred on all processes.
*
* @details This function can be called on any process in case a local error has occurred. On the
* root process, it sets its `local_stop` and `global_stop` members to 1 and broadcasts `global_stop`
* to all non-root processes. On non-root processes, it sets `local_stop` to 1 and sends it to the
* root process.
* On non-root processes, it sends a message to the root process that a local event has occurred.
*/
void request_emergency_stop() {
EXPECTS(!finalized);
void report_local_event() {
// prevent sending multiple signals
if (local_stop) { return; }

// an error has occurred
local_stop = 1;
if (com.rank() == 0) {
// root broadcasts the global_stop variable
global_stop = 1;
MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast);
if (local_event or finalized) { return; }

// a local event has occurred
local_event = 1;
if (comm.rank() == 0) {
// on root process, check all other nodes and perform necessary broadcasts
root_check_nodes_and_bcast();
} else {
// non-root sends the local_stop variable to root
MPI_Isend(&local_stop, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &req_isent);
// on non-root processes, let the root process know about the local event
MPI_Isend(&local_event, 1, MPI_INT, 0, comm.rank(), comm.get(), &req_isent);
}
}

/**
* @brief Check if an emergency stop has been requested.
* @brief Check if an event has occurred on any process.
*
* @details This function can be called on any process to check if an event has occurred somewhere.
*
* It returns true, if
* - a local event has occurred or
* - if an event has occurred on some other process which has already been reported to the root process and
* broadcasted to all other processes.
*
* @details This function can be called on any process to check if an emergency has occurred somewhere.
* It first checks, if its `local_stop` or `global_stop` members are set to 1 and returns `true` in case
* one of them is. Otherwise, on the root process, it calls `root_check_nodes_and_bcast()` to check if
* some other process has sent an emergency message and to possibly forward the received signal.
* On non-root processes, it checks if the root process has broadcasted an emergency stop, which it has
* received from some other process.
* On the root process (rank == 0), it checks the status of all non-root processes and performs the necessary
* broadcasts in case they have not been done yet.
*
* @return True, if an emergency stop has been requested. Otherwise, it returns false.
* @return True, if an event has occurred on any process.
*/
[[nodiscard]] bool emergency_occured() {
// if final_communications() has already been called, global_stop == 0 if no error has occurred, otherwise it is 1
if (finalized) return global_stop;
[[nodiscard]] bool event_on_any_rank() {
// if final_communications() has already been called, any_event == 0 if no event has occurred, otherwise it is 1
if (finalized) return any_event;

// either a local error has occurred or some other process has requested an emergency stop
if (global_stop or local_stop) return true;
// if a local event has occurred, we return true
if (local_event) return true;

if (com.rank() == 0) {
// root checks if some other process has requested an emergency stop
// on the root process, we first check the status of all non-root processes, perform the necessary broadcasts and
// return true if an event has occurred
if (comm.rank() == 0) {
root_check_nodes_and_bcast();
return any_event;
}
return global_stop;

// on non-root processes, we check the status of the corresponding broadcast and return true if an event has
// occurred
MPI_Status status;
int has_received = 0;
MPI_Test(&req_ibcast_any, &has_received, &status);
return has_received and any_event;
}

/**
* @brief Check if an event has occurred on all processes.
*
* @details This function can be called on any process to check if an event has occurred on all processes.
*
* It returns true, if an event has occurred on all processes which has already been reported to the root process
* and broadcasted to all other processes.
*
* On the root process (rank == 0), it checks the status of all non-root processes and performs the necessary
* broadcasts in case it has not been done yet.
*
* @return True, if an event has occurred on all processes.
*/
[[nodiscard]] bool event_on_all_ranks() {
// if final_communications() has already been called, all_events == 0 if an event has not occurred on every
// process, otherwise it is 1
if (finalized) return all_events;

// on the root process, we first check the status of all non-root processes, perform the necessary broadcasts and
// return true if an event has occurred on all of them
if (comm.rank() == 0) {
root_check_nodes_and_bcast();
return all_events;
}

// on non-root processes, we check the status of the broadcast and return true if an event has occurred on all
// processes
MPI_Status status;
int has_received = 0;
MPI_Test(&req_ibcast_all, &has_received, &status);
return has_received and all_events;
}

/**
* @brief Finalize all pending communications.
*
* @details At the end of this function, all processes have completed their work or have had a local
* emergency stop. The member `global_stop` is guaranteed to be the same on all processes when this
* function returns.
* @details At the end of this function, all MPI communications have been completed and the values of the member
* variables will not change anymore due to some member function calls.
*
* Furthermore, it frees the used communicator.
*/
void finalize_communications() {
// prevent multiple calls
if (finalized) return;
if (com.rank() == 0) {
// root just listens to the other processes and bcasts the global_stop until everyone is done
while (root_check_nodes_and_bcast()) { usleep(100); } // 100 us (micro seconds)
// all others node have finished
// if the root has never emitted the ibcast, we do it now
if (not global_stop) { MPI_Ibcast(&global_stop, 1, MPI_INT, 0, MPI_COMM_WORLD, &req_ibcast); }

if (comm.rank() == 0) {
// on root process, wait for all non-root processes to finish their MPI_Isend calls
while (root_check_nodes_and_bcast()) {
usleep(100); // 100 us (micro seconds)
}
// and perform broadcasts in case they have not been done yet
if (not any_event) { MPI_Ibcast(&any_event, 1, MPI_INT, 0, comm.get(), &req_ibcast_any); }
if (not all_events) { MPI_Ibcast(&all_events, 1, MPI_INT, 0, comm.get(), &req_ibcast_all); }
} else {
// on non-root node: either Isend was done when local_stop was set to 1 during request_emergency_stop,
// or it has to happen now, i.e, work is done, and fine.
if (not local_stop) { MPI_Isend(&local_stop, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &req_isent); }
// on non-root processes, perform MPI_Isend call in case it has not been done yet
if (not local_event) { MPI_Isend(&local_event, 1, MPI_INT, 0, comm.rank(), comm.get(), &req_isent); }
}
// all nodes wait for the ibcast of the global_stop to be complete
MPI_Status status;
MPI_Wait(&req_ibcast, &status);

// all nodes wait for the broadcasts to be completed
MPI_Status status_any, status_all;
MPI_Wait(&req_ibcast_any, &status_any);
MPI_Wait(&req_ibcast_all, &status_all);

// free the communicator
comm.free();
finalized = true;
}

private:
/**
* @brief Check if any non-root process has sent a stop request. If so, broadcast to all other processes
* in case it has not been done yet.
*
* @return True, if at least one process has not finished the `MPI_Isend` of the `local_stop` variable to
* the root process. Otherwise, it returns false.
*/
// Root process checks the status of all non-root processes, performs necessary broadcasts and returns a boolean
// that is true if at least one non-root process has not performed its MPI_Isend call yet.
bool root_check_nodes_and_bcast() {
EXPECTS(!finalized);
EXPECTS(com.rank() == 0);
// loop over all non-root processes
bool some_nodes_are_still_running = false;
for (auto &[request, node_stop] : root_futures) {
// check for an emergency stop request
EXPECTS(comm.rank() == root);
bool any = false;
bool all = true;
bool finished = true;
for (auto &[request, event] : root_futures) {
Thoemi09 marked this conversation as resolved.
Show resolved Hide resolved
MPI_Status status;
int comm_received = 0;
MPI_Test(&request, &comm_received, &status);
// for the first time an emergency stop has been requested -> root calls request_emergency_stop()
// to broadcast to all other processes
if (comm_received and (not global_stop) and node_stop) request_emergency_stop(); // the root requires the stop now. It also stops itself...
some_nodes_are_still_running |= (not comm_received);
int has_received = 0;
MPI_Test(&request, &has_received, &status);
any |= (has_received and event);
all &= (has_received and event);
finished &= has_received;
Thoemi09 marked this conversation as resolved.
Show resolved Hide resolved
}
if (not any_event and (any or local_event)) {
any_event = 1;
MPI_Ibcast(&any_event, 1, MPI_INT, 0, comm.get(), &req_ibcast_any);
}
if (not all_events and all and local_event) {
all_events = 1;
MPI_Ibcast(&all_events, 1, MPI_INT, 0, comm.get(), &req_ibcast_all);
}
return some_nodes_are_still_running;
return not finished;
}
};

Expand Down
Loading
Loading