Skip to content

Commit

Permalink
[coro_rdma][feat] add rdmapp and coro_rdma example
Browse files Browse the repository at this point in the history
  • Loading branch information
howardlau1999 committed Jan 21, 2025
1 parent c4fc58e commit 97f49a4
Show file tree
Hide file tree
Showing 11 changed files with 2,117 additions and 0 deletions.
17 changes: 17 additions & 0 deletions cmake/Find/Findibverbs.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
find_path(IBVERBS_INCLUDE_DIRS
NAMES infiniband/verbs.h
HINTS
${IBVERBS_INCLUDE_DIR}
${IBVERBS_ROOT_DIR}
${IBVERBS_ROOT_DIR}/include)

find_library(IBVERBS_LIBRARIES
NAMES ibverbs
HINTS
${IBVERBS_LIB_DIR}
${IBVERBS_ROOT_DIR}
${IBVERBS_ROOT_DIR}/lib)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(ibverbs DEFAULT_MSG IBVERBS_INCLUDE_DIRS IBVERBS_LIBRARIES)
mark_as_advanced(IBVERBS_INCLUDE_DIR IBVERBS_LIBRARIES)
162 changes: 162 additions & 0 deletions include/ylt/standalone/rdmapp/cq.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#pragma once

#include <infiniband/verbs.h>

#include <array>
#include <memory>
#include <vector>

#include "device.h"
#include "error.h"
#include "fcntl.h"

namespace rdmapp {

class qp;
class cq;
typedef cq *cq_ptr;

class comp_channel {
struct ibv_comp_channel *comp_channel_;

public:
comp_channel(device_ptr device) {
comp_channel_ = ::ibv_create_comp_channel(device->ctx());
check_ptr(comp_channel_, "failed to create comp channel");
}

void set_non_blocking() {
int flags = ::fcntl(comp_channel_->fd, F_GETFL);
if (flags < 0) {
check_errno(errno, "failed to get flags");
}
int ret = ::fcntl(comp_channel_->fd, F_SETFL, flags | O_NONBLOCK);
if (ret < 0) {
check_errno(errno, "failed to set flags");
}
}

cq_ptr get_event() {
struct ibv_cq *cq;
void *ev_ctx;
check_rc(::ibv_get_cq_event(comp_channel_, &cq, &ev_ctx),
"failed to get event");
auto cq_obj_ptr = reinterpret_cast<cq_ptr>(ev_ctx);
return cq_obj_ptr;
}

int fd() const { return comp_channel_->fd; }

~comp_channel() {
if (comp_channel_ == nullptr) [[unlikely]] {
return;
}
if (auto rc = ::ibv_destroy_comp_channel(comp_channel_); rc != 0) {
}
else {
}
}

struct ibv_comp_channel *channel() const { return comp_channel_; }
};

typedef comp_channel *comp_channel_ptr;

/**
* @brief This class is an abstraction of a Completion Queue.
*
*/
class cq {
device_ptr device_;
struct ibv_cq *cq_;
friend class qp;

public:
cq(cq const &) = delete;
cq &operator=(cq const &) = delete;

/**
* @brief Construct a new cq object.
*
* @param device The device to use.
* @param num_cqe The number of completion entries to allocate.
* @param channel If not null, assign this cq to the completion channel
*/
cq(device_ptr device, size_t nr_cqe = 128, comp_channel_ptr channel = nullptr)
: device_(device) {
cq_ = ::ibv_create_cq(device->ctx_, nr_cqe, this,
channel ? channel->channel() : nullptr, 0);
check_ptr(cq_, "failed to create cq");
}

void request_notify() {
check_rc(::ibv_req_notify_cq(cq_, 0), "failed to request notify");
}

void ack_event(unsigned int nr_events = 1) {
::ibv_ack_cq_events(cq_, nr_events);
}

/**
* @brief Poll the completion queue.
*
* @param wc If any, this will be filled with a completion entry.
* @return true If there is a completion entry.
* @return false If there is no completion entry.
* @exception std::runtime_exception Error occured while polling the
* completion queue.
*/
bool poll(struct ibv_wc &wc) {
if (auto rc = ::ibv_poll_cq(cq_, 1, &wc); rc < 0) [[unlikely]] {
check_rc(-rc, "failed to poll cq");
}
else if (rc == 0) {
return false;
}
else {
return true;
}
return false;
}

/**
* @brief Poll the completion queue.
*
* @param wc_vec If any, this will be filled with completion entries up to the
* size of the vector.
* @return size_t The number of completion entries. 0 means no completion
* entry.
* @exception std::runtime_exception Error occured while polling the
* completion queue.
*/
size_t poll(std::vector<struct ibv_wc> &wc_vec) {
return poll(&wc_vec[0], wc_vec.size());
}

template <class It>
size_t poll(It wc, int count) {
int rc = ::ibv_poll_cq(cq_, count, wc);
if (rc < 0) {
throw_with("failed to poll cq: %s (rc=%d)", strerror(rc), rc);
}
return rc;
}

template <int N>
size_t poll(std::array<struct ibv_wc, N> &wc_array) {
return poll(&wc_array[0], N);
}

~cq() {
if (cq_ == nullptr) [[unlikely]] {
return;
}

if (auto rc = ::ibv_destroy_cq(cq_); rc != 0) [[unlikely]] {
}
else {
}
}
};

} // namespace rdmapp
63 changes: 63 additions & 0 deletions include/ylt/standalone/rdmapp/detail/serdes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#pragma once

#include <algorithm>
#include <cstdint>
#include <endian.h>
#include <netinet/in.h>
#include <type_traits>

#include <infiniband/verbs.h>

namespace rdmapp {
namespace detail {

static inline uint16_t ntoh(uint16_t const &value) { return ::be16toh(value); }

static inline uint32_t ntoh(uint32_t const &value) { return ::be32toh(value); }

static inline uint64_t ntoh(uint64_t const &value) { return ::be64toh(value); }

static inline uint16_t hton(uint16_t const &value) { return ::htobe16(value); }

static inline uint32_t hton(uint32_t const &value) { return ::htobe32(value); }

static inline uint64_t hton(uint64_t const &value) { return ::htobe64(value); }

template <class T, class It>
typename std::enable_if<std::is_integral<T>::value>::type
serialize(T const &value, It &it) {
T nvalue = hton(value);
std::copy_n(reinterpret_cast<uint8_t *>(&nvalue), sizeof(T), it);
}

template <class T, class It>
typename std::enable_if<std::is_same<T, union ibv_gid>::value>::type
serialize(T const &value, It &it) {
std::copy_n(reinterpret_cast<uint8_t const *>(&value), sizeof(T), it);
}

template <class T, class It>
typename std::enable_if<std::is_integral<T>::value>::type
deserialize(It &it, T &value) {
std::copy_n(it, sizeof(T), reinterpret_cast<uint8_t *>(&value));
it += sizeof(T);
value = ntoh(value);
}

template <class T, class It>
typename std::enable_if<std::is_same<T, void *>::value>::type
deserialize(It &it, T &value) {
std::copy_n(it, sizeof(T), reinterpret_cast<uint8_t *>(&value));
it += sizeof(T);
value = reinterpret_cast<void *>(ntoh(reinterpret_cast<uint64_t>(value)));
}

template <class T, class It>
typename std::enable_if<std::is_same<T, union ibv_gid>::value>::type
deserialize(It &it, T &value) {
std::copy_n(it, sizeof(T), reinterpret_cast<uint8_t *>(&value));
it += sizeof(T);
}

} // namespace detail
} // namespace rdmapp
Loading

0 comments on commit 97f49a4

Please sign in to comment.