diff --git a/.vscode/settings.json b/.vscode/settings.json index 28c3106eb..706be5cce 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,87 @@ { - "cmake.configureOnOpen": true + "cmake.configureOnOpen": true, + "files.associations": { + "*.ipp": "cpp", + "ratio": "cpp", + "array": "cpp", + "functional": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "variant": "cpp", + "cctype": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "csignal": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "any": "cpp", + "atomic": "cpp", + "strstream": "cpp", + "bit": "cpp", + "*.tcc": "cpp", + "bitset": "cpp", + "charconv": "cpp", + "chrono": "cpp", + "cinttypes": "cpp", + "codecvt": "cpp", + "compare": "cpp", + "complex": "cpp", + "concepts": "cpp", + "condition_variable": "cpp", + "cstdint": "cpp", + "deque": "cpp", + "forward_list": "cpp", + "list": "cpp", + "map": "cpp", + "set": "cpp", + "string": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "regex": "cpp", + "source_location": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "fstream": "cpp", + "future": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "numbers": "cpp", + "ostream": "cpp", + "ranges": "cpp", + "semaphore": "cpp", + "span": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "thread": "cpp", + "cfenv": "cpp", + "typeindex": "cpp", + "typeinfo": "cpp", + "valarray": "cpp", + "stacktrace": "cpp", + "__nullptr": "cpp" + } } diff --git a/include/NeoFOAM/core/executor/CPUExecutor.hpp b/include/NeoFOAM/core/executor/CPUExecutor.hpp index dda793733..b7cb49894 100644 --- a/include/NeoFOAM/core/executor/CPUExecutor.hpp +++ b/include/NeoFOAM/core/executor/CPUExecutor.hpp @@ -19,6 +19,7 @@ class CPUExecutor public: using exec = Kokkos::Serial; + using space = exec::memory_space; CPUExecutor(); ~CPUExecutor(); diff --git a/include/NeoFOAM/core/mpi/fullDuplexCommBuffer.hpp b/include/NeoFOAM/core/mpi/fullDuplexCommBuffer.hpp index 468c26380..d1c228dc5 100644 --- a/include/NeoFOAM/core/mpi/fullDuplexCommBuffer.hpp +++ b/include/NeoFOAM/core/mpi/fullDuplexCommBuffer.hpp @@ -2,9 +2,10 @@ // SPDX-FileCopyrightText: 2023 NeoFOAM authors #pragma once -#include -#include #include +#include +#include +#include #include "NeoFOAM/core/mpi/environment.hpp" #include "NeoFOAM/core/mpi/halfDuplexCommBuffer.hpp" @@ -18,11 +19,13 @@ namespace mpi /** * @class FullDuplexCommBuffer * @brief A buffer for full-duplex communication in a distributed system using MPI. + * @tparam MemorySpace The memory space to use for the communication buffer. * * The FullDuplexCommBuffer class facilitates efficient, non-blocking, point-to-point data * exchange between MPI ranks, allowing for simultaneous send and receive operations. It * manages two HalfDuplexCommBuffer instances: one for sending data and one for receiving data. */ +template class FullDuplexCommBuffer { public: @@ -59,8 +62,8 @@ class FullDuplexCommBuffer template void initComm(std::string commName) { - send_.initComm(commName); - receive_.initComm(commName); + send_.template initComm(commName); + receive_.template initComm(commName); } /** @@ -72,7 +75,7 @@ class FullDuplexCommBuffer template std::span getSend(const int rank) { - return send_.get(rank); + return send_.template get(rank); } /** @@ -84,7 +87,7 @@ class FullDuplexCommBuffer template std::span getSend(const int rank) const { - return send_.get(rank); + return send_.template get(rank); } /** @@ -96,7 +99,7 @@ class FullDuplexCommBuffer template std::span getReceive(const int rank) { - return receive_.get(rank); + return receive_.template get(rank); } /** @@ -108,7 +111,7 @@ class FullDuplexCommBuffer template std::span getReceive(const int rank) const { - return receive_.get(rank); + return receive_.template get(rank); } /** @@ -124,7 +127,7 @@ class FullDuplexCommBuffer * @brief Check if the communication is complete. * @return True if the communication is complete, false otherwise. */ - inline bool isComplete() { return send_.isComplete() && receive_.isComplete(); } + inline bool isComplete() { return send_.isActive() && receive_.isActive(); } /** * @brief Blocking wait for the communication to complete. @@ -146,10 +149,26 @@ class FullDuplexCommBuffer private: - HalfDuplexCommBuffer send_; /**< The send buffer. */ - HalfDuplexCommBuffer receive_; /**< The receive buffer. */ + HalfDuplexCommBuffer send_; /**< The send buffer. */ + HalfDuplexCommBuffer receive_; /**< The receive buffer. */ }; +// using BufferVariant = std::variant FullDuplexCommBuffer, +// #ifdef KOKKOS_ENABLE_CUDA +// FullDuplexCommBuffer, FullDuplexCommBuffer, +// #endif +// #ifdef KOKKOS_ENABLE_HIP +// FullDuplexCommBuffer, +// #endif +// #ifdef KOKKOS_ENABLE_OPENMPTARGET +// FullDuplexCommBuffer, +// #endif +// #ifdef KOKKOS_ENABLE_SYCL +// FullDuplexCommBuffer, FullDuplexCommBuffer, +// FullDuplexCommBuffer, +// #endif +// FullDuplexCommBuffer > ; + } } diff --git a/include/NeoFOAM/core/mpi/halfDuplexCommBuffer.hpp b/include/NeoFOAM/core/mpi/halfDuplexCommBuffer.hpp index 41baaee97..7147656fd 100644 --- a/include/NeoFOAM/core/mpi/halfDuplexCommBuffer.hpp +++ b/include/NeoFOAM/core/mpi/halfDuplexCommBuffer.hpp @@ -8,6 +8,7 @@ #include #include +#include #include "NeoFOAM/core/error.hpp" #include "NeoFOAM/core/mpi/environment.hpp" @@ -35,6 +36,16 @@ inline int bufferHash(const std::string& str) return (static_cast(tag) % maxTagValue) + 10; } + +template +concept MemorySpace = requires { + typename Type::execution_space; + { + Type::is_memory_space + } -> std::convertible_to; + requires Type::is_memory_space == true; +}; + /** * @class HalfDuplexCommBuffer * @brief A data buffer for half-duplex communication in a distributed system using MPI. @@ -44,7 +55,25 @@ inline int bufferHash(const std::string& str) * capable of handling various data types. The buffer does not shrink once initialized, minimizing * memory reallocation and improving memory efficiency. The class operates in a half-duplex mode, * meaning it is either sending or receiving data at any given time. + * + * States and changes of states: + * 1. Initialized: Is the buffer initialized for a communication, with a name and data type. + * 2. Active: The buffer is actively sending or receiving data. + * + * These states can be queried using the isCommInit() and isActive() functions. However isActive() + * can only be called when the buffer is initialized, where as isCommInit() can be called at any + * time. + * + * The states are changed through the following functions: + * 1. initComm(): Sets the buffer to an initialized state. + * 2. send() & receive(): Sets the buffer to active state. + * 3. waitComplete(): One return sets the buffer to an inactive state. + * 4. finaliseComm(): Sets the buffer to an uninitialized state. + * + * It is critical once the data has been copied out of the buffer, the buffer set back to an + * uninitialized state so it can be re-used. */ +template class HalfDuplexCommBuffer { @@ -103,7 +132,8 @@ class HalfDuplexCommBuffer "Rank size mismatch. " << rankCommSize.size() << " vs. " << mpiEnviron_.sizeRank() ); typeSize_ = sizeof(valueType); - rankOffset_.resize(rankCommSize.size() + 1); + Kokkos::resize(rankOffsetSpace_, rankCommSize.size() + 1); + Kokkos::resize(rankOffsetHost_, rankCommSize.size() + 1); request_.resize(rankCommSize.size(), MPI_REQUEST_NULL); updateDataSize([&](const int rank) { return rankCommSize[rank]; }, sizeof(valueType)); } @@ -133,31 +163,101 @@ class HalfDuplexCommBuffer inline const std::string& getCommName() const { return commName_; } /** - * @brief Check if the communication is complete. + * @brief Check if the active communication is complete. * @return true if the communication is complete else false. */ - bool isComplete(); + bool isActive() + { + NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); + bool isActive = false; + for (auto& request : request_) + { + if (request != MPI_REQUEST_NULL) + { + int flag = 0; + int err = MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + NF_DEBUG_ASSERT(err == MPI_SUCCESS, "MPI_Test failed."); + if (!flag) + { + isActive = true; + break; + } + } + } + return isActive; + } /** * @brief Post send for data to begin sending to all ranks this rank communicates with. */ - void send(); + void send() + { + NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); + NF_DEBUG_ASSERT(!isActive(), "Communication buffer is already actively sending."); + for (auto rank = 0; rank < mpiEnviron_.sizeRank(); ++rank) + { + if (rankOffsetHost_(rank + 1) - rankOffsetHost_(rank) == 0) continue; + isend( + rankBuffer_.data() + rankOffsetHost_(rank), + rankOffsetHost_(rank + 1) - rankOffsetHost_(rank), + rank, + tag_, + mpiEnviron_.comm(), + &request_[rank] + ); + } + } /** * @brief Post receive for data to begin receiving from all ranks this rank communicates * with. */ - void receive(); + void receive() + { + NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); + NF_DEBUG_ASSERT(!isActive(), "Communication buffer is already actively receiving."); + for (auto rank = 0; rank < mpiEnviron_.sizeRank(); ++rank) + { + if (rankOffsetHost_(rank + 1) - rankOffsetHost_(rank) == 0) continue; + irecv( + rankBuffer_.data() + rankOffsetHost_(rank), + rankOffsetHost_(rank + 1) - rankOffsetHost_(rank), + rank, + tag_, + mpiEnviron_.comm(), + &request_[rank] + ); + } + } + /** * @brief Blocking wait for the communication to finish. */ - void waitComplete(); + void waitComplete() + { + NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); + while (isActive()) + { + // todo deadlock prevention. + // wait for the communication to finish. + } + } /** * @brief Finalise the communication. */ - void finaliseComm(); + void finaliseComm() + { + NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); + NF_DEBUG_ASSERT(!isActive(), "Cannot finalise while buffer is active."); + for (auto& request : request_) + NF_DEBUG_ASSERT( + request == MPI_REQUEST_NULL, "MPI_Request not null, communication not complete." + ); + tag_ = -1; + commName_ = "unassigned"; + } /** * @brief Get a span of the buffer data for a given rank. @@ -172,8 +272,8 @@ class HalfDuplexCommBuffer NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); NF_DEBUG_ASSERT(typeSize_ == sizeof(valueType), "Data type (size) mismatch."); return std::span( - reinterpret_cast(rankBuffer_.data() + rankOffset_[rank]), - (rankOffset_[rank + 1] - rankOffset_[rank]) / sizeof(valueType) + reinterpret_cast(rankBuffer_.data() + rankOffsetSpace_(rank)), + (rankOffsetSpace_(rank + 1) - rankOffsetSpace_(rank)) / sizeof(valueType) ); } @@ -190,8 +290,8 @@ class HalfDuplexCommBuffer NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); NF_DEBUG_ASSERT(typeSize_ == sizeof(valueType), "Data type (size) mismatch."); return std::span( - reinterpret_cast(rankBuffer_.data() + rankOffset_[rank]), - (rankOffset_[rank + 1] - rankOffset_[rank]) / sizeof(valueType) + reinterpret_cast(rankBuffer_.data() + rankOffsetSpace_(rank)), + (rankOffsetSpace_(rank + 1) - rankOffsetSpace_(rank)) / sizeof(valueType) ); } @@ -202,9 +302,12 @@ class HalfDuplexCommBuffer std::size_t typeSize_ {sizeof(char)}; /*< The data type currently stored in the buffer. */ MPIEnvironment mpiEnviron_; /*< The MPI environment. */ std::vector request_; /*< The MPI request for communication with each rank. */ - std::vector rankBuffer_; /*< The buffer data for all ranks. Never shrinks. */ - std::vector - rankOffset_; /*< The offset (in bytes) for a rank data in the buffer. */ + Kokkos::View + rankBuffer_; /*< The buffer data for all ranks. Never shrinks. */ + Kokkos::View + rankOffsetSpace_; /*< The offset (in bytes) for a rank data in the buffer. */ + Kokkos::View + rankOffsetHost_; /*< The offset (in bytes) for a rank data used for MPI communication. */ /** * @brief Set the data type for the buffer. @@ -217,8 +320,8 @@ class HalfDuplexCommBuffer ); if (0 == (typeSize_ - sizeof(valueType))) return; updateDataSize( - [rankOffset = rankOffset_, typeSize = typeSize_](const int rank) - { return (rankOffset[rank + 1] - rankOffset[rank]) / typeSize; }, + [rankOffset = rankOffsetSpace_, typeSize = typeSize_](const int rank) + { return (rankOffset(rank + 1) - rankOffset(rank)) / typeSize; }, sizeof(valueType) ); @@ -236,13 +339,17 @@ class HalfDuplexCommBuffer void updateDataSize(func rankSize, std::size_t newSize) { std::size_t dataSize = 0; + + // This works because rankOffsetHost_ is guaranteed to be on host. for (auto rank = 0; rank < mpiEnviron_.sizeRank(); ++rank) { - rankOffset_[rank] = dataSize; + rankOffsetHost_(rank) = dataSize; dataSize += rankSize(rank) * newSize; } - rankOffset_.back() = dataSize; - if (rankBuffer_.size() < dataSize) rankBuffer_.resize(dataSize); // we never size down. + rankOffsetHost_(mpiEnviron_.sizeRank()) = dataSize; + + if (rankBuffer_.size() < dataSize) Kokkos::resize(rankBuffer_, dataSize); + Kokkos::deep_copy(rankOffsetSpace_, rankOffsetHost_); } }; diff --git a/include/NeoFOAM/mesh/unstructured/communicator.hpp b/include/NeoFOAM/mesh/unstructured/communicator.hpp index c1d44f701..3a1a70b0b 100644 --- a/include/NeoFOAM/mesh/unstructured/communicator.hpp +++ b/include/NeoFOAM/mesh/unstructured/communicator.hpp @@ -44,7 +44,7 @@ class Communicator { public: - using bufferType = mpi::FullDuplexCommBuffer; + using bufferType = mpi::FullDuplexCommBuffer; /** * @brief Default constructor. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 764582cbe..712fbbc5f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,8 +8,7 @@ include(GNUInstallDirs) target_sources( NeoFOAM - PRIVATE "core/mpi/halfDuplexCommBuffer.cpp" - "core/primitives/vector.cpp" + PRIVATE "core/primitives/vector.cpp" "core/time.cpp" "core/dictionary.cpp" "core/kokkos.cpp" diff --git a/src/core/mpi/halfDuplexCommBuffer.cpp b/src/core/mpi/halfDuplexCommBuffer.cpp deleted file mode 100644 index 77e794db3..000000000 --- a/src/core/mpi/halfDuplexCommBuffer.cpp +++ /dev/null @@ -1,85 +0,0 @@ -// SPDX-License-Identifier: MIT -// SPDX-FileCopyrightText: 2023 NeoFOAM authors - -#include "NeoFOAM/core/mpi/halfDuplexCommBuffer.hpp" - -namespace NeoFOAM -{ - -namespace mpi -{ - -bool HalfDuplexCommBuffer::isComplete() -{ - NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); - int flag; - for (auto& request : request_) - { - int err = MPI_Test(&request, &flag, MPI_STATUS_IGNORE); - NF_DEBUG_ASSERT(err == MPI_SUCCESS, "MPI_Test failed."); - if (!flag) return false; - } - return static_cast(flag); -} - -void HalfDuplexCommBuffer::send() -{ - NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); - NF_DEBUG_ASSERT(isComplete(), "Communication buffer is already active."); - for (auto rank = 0; rank < mpiEnviron_.sizeRank(); ++rank) - { - if (rankOffset_[rank + 1] - rankOffset_[rank] == 0) continue; - isend( - rankBuffer_.data() + rankOffset_[rank], - rankOffset_[rank + 1] - rankOffset_[rank], - rank, - tag_, - mpiEnviron_.comm(), - &request_[rank] - ); - } -} - -void HalfDuplexCommBuffer::receive() -{ - NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); - NF_DEBUG_ASSERT(isComplete(), "Communication buffer is already active."); - for (auto rank = 0; rank < mpiEnviron_.sizeRank(); ++rank) - { - if (rankOffset_[rank + 1] - rankOffset_[rank] == 0) continue; - irecv( - rankBuffer_.data() + rankOffset_[rank], - rankOffset_[rank + 1] - rankOffset_[rank], - rank, - tag_, - mpiEnviron_.comm(), - &request_[rank] - ); - } -} - -void HalfDuplexCommBuffer::waitComplete() -{ - NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); - while (!isComplete()) - { - // todo deadlock prevention. - // wait for the communication to finish. - } -} - -void HalfDuplexCommBuffer::finaliseComm() -{ - NF_DEBUG_ASSERT(isCommInit(), "Communication buffer is not initialised."); - NF_DEBUG_ASSERT(isComplete(), "Cannot finalise while buffer is active."); - for (auto& request : request_) - NF_DEBUG_ASSERT( - request == MPI_REQUEST_NULL, "MPI_Request not null, communication not complete." - ); - tag_ = -1; - commName_ = "unassigned"; -} - -} - -} // namespace NeoFoam diff --git a/test/core/mpi/halfDuplexCommBuffer.cpp b/test/core/mpi/halfDuplexCommBuffer.cpp index f732f203d..a51739fdb 100644 --- a/test/core/mpi/halfDuplexCommBuffer.cpp +++ b/test/core/mpi/halfDuplexCommBuffer.cpp @@ -5,6 +5,8 @@ #include #include +#include + #include "NeoFOAM/core/mpi/halfDuplexCommBuffer.hpp" #include "NeoFOAM/core/mpi/environment.hpp" @@ -16,7 +18,7 @@ TEST_CASE("halfDuplexBuffer") MPIEnvironment mpiEnviron; std::vector rankCommSize(mpiEnviron.sizeRank(), 1); - HalfDuplexCommBuffer buffer(mpiEnviron, rankCommSize); + HalfDuplexCommBuffer buffer(mpiEnviron, rankCommSize); SECTION("Default Constructor") { @@ -24,18 +26,25 @@ TEST_CASE("halfDuplexBuffer") REQUIRE_FALSE(buffer2.isCommInit()); } - SECTION("Parameterized Constructor") { REQUIRE_FALSE(buffer.isCommInit()); } + SECTION("Parameterized Constructor") + { + // Simba remember + HalfDuplexCommBuffer buffer(mpiEnviron, rankCommSize); + REQUIRE_FALSE(buffer.isCommInit()); + REQUIRE_FALSE(buffer.isCommInit()); + } SECTION("Init and Finalise") { + REQUIRE(false == buffer.isCommInit()); + REQUIRE("unassigned" == buffer.getCommName()); buffer.initComm("Init Comm"); - REQUIRE(buffer.isCommInit()); - REQUIRE(true == buffer.isComplete()); - REQUIRE(buffer.getCommName() == "Init Comm"); + REQUIRE(true == buffer.isCommInit()); + REQUIRE(false == buffer.isActive()); + REQUIRE("Init Comm" == buffer.getCommName()); buffer.finaliseComm(); - REQUIRE(buffer.getCommName() == "unassigned"); - REQUIRE(true == buffer.isComplete()); - REQUIRE(!buffer.isCommInit()); + REQUIRE("unassigned" == buffer.getCommName()); + REQUIRE(false == buffer.isCommInit()); } SECTION("Set Comm Rank Size") @@ -66,12 +75,20 @@ TEST_CASE("halfDuplexBuffer") data[0] = rank; } + REQUIRE(false == send.isActive()); + REQUIRE(false == receive.isActive()); + send.send(); receive.receive(); + // we cant test isActive is true here because it becomes a race condition. + send.waitComplete(); receive.waitComplete(); + REQUIRE(false == send.isActive()); + REQUIRE(false == receive.isActive()); + for (int rank = 0; rank < mpiEnviron.sizeRank(); ++rank) { auto data = receive.get(rank);