-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathibv_transmit.h
175 lines (146 loc) · 4.4 KB
/
ibv_transmit.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/* Copyright 2015-2016, 2020 SKA South Africa
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef UDPREPLAY_IBV_TRANSMIT_H
#define UDPREPLAY_IBV_TRANSMIT_H
#include <config.h>
#if HAVE_IBV
#include <memory>
#include <vector>
#include <deque>
#include <array>
#include <cstdint>
#include <cstring>
#include <boost/noncopyable.hpp>
#include <boost/asio.hpp>
#include <rdma/rdma_cma.h>
#include <infiniband/verbs.h>
#include <ifaddrs.h>
#include <sys/types.h>
#include <sys/mman.h>
#include "common.h"
typedef std::array<std::uint8_t, 6> mac_address;
struct freeifaddrs_deleter
{
void operator()(ifaddrs *ifa) const { freeifaddrs(ifa); }
};
struct mr_deleter
{
void operator()(ibv_mr *mr) const { ibv_dereg_mr(mr); }
};
struct cq_deleter
{
void operator()(ibv_cq *cq) const { ibv_destroy_cq(cq); }
};
struct qp_deleter
{
void operator()(ibv_qp *qp) const { ibv_destroy_qp(qp); }
};
struct pd_deleter
{
void operator()(ibv_pd *pd) const { ibv_dealloc_pd(pd); }
};
struct event_channel_deleter
{
void operator()(rdma_event_channel *event_channel) const { rdma_destroy_event_channel(event_channel); }
};
struct cm_id_deleter
{
void operator()(rdma_cm_id *cm_id) const { rdma_destroy_id(cm_id); }
};
template<typename T>
class mmap_deleter
{
private:
std::size_t size;
public:
mmap_deleter() = default;
mmap_deleter(std::size_t size) : size(size) {}
void operator()(T *ptr) const
{
munmap(ptr, size);
}
};
class ibv_collector
{
private:
struct slab
{
std::unique_ptr<std::uint8_t[], mmap_deleter<std::uint8_t>> data;
std::unique_ptr<ibv_mr, mr_deleter> mr;
std::size_t capacity;
std::size_t used;
slab(ibv_pd *pd, std::size_t capacity);
};
struct frame : public boost::noncopyable
{
ibv_sge sge{};
ibv_send_wr wr{};
std::size_t packet_size;
duration timestamp;
};
ibv_pd *pd;
boost::asio::ip::udp::endpoint src_endpoint;
mac_address src_mac;
std::uint8_t ttl;
// Cannot use a vector, because it is non-copyable
std::deque<frame> frames;
std::vector<slab> slabs;
std::size_t slab_size;
std::size_t total_bytes = 0;
public:
explicit ibv_collector(
ibv_pd *pd,
const boost::asio::ip::udp::endpoint &src_endpoint,
const mac_address &src_mac,
std::uint8_t ttl,
std::size_t slab_size = 64 * 1024 * 1024);
void add_packet(const packet &pkt);
std::size_t num_packets() const;
std::size_t packet_size(std::size_t idx) const;
duration packet_timestamp(std::size_t idx) const;
std::size_t bytes() const;
frame &get_frame(std::size_t idx);
};
class ibv_transmit
{
public:
static constexpr int depth = 256;
static constexpr int batch_size = 16;
private:
std::unique_ptr<rdma_event_channel, event_channel_deleter> event_channel;
std::unique_ptr<rdma_cm_id, cm_id_deleter> cm_id;
std::unique_ptr<ibv_pd, pd_deleter> pd;
std::unique_ptr<ibv_qp, qp_deleter> qp;
std::unique_ptr<ibv_cq, cq_deleter> cq;
boost::asio::ip::udp::socket socket; // only to allocate a port number
std::size_t slots = depth;
std::unique_ptr<ibv_collector> collector;
std::uint32_t rate_limit_kbps = false;
bool set_rate_limit = false;
std::uint32_t max_burst_size = 0;
void modify_state(ibv_qp_state state, int port_num = -1);
void wait_for_wc(std::size_t min_slots);
public:
typedef ibv_collector collector_type;
ibv_transmit(const options &opts, boost::asio::io_service &io_service);
collector_type &get_collector() { return *collector; }
void send_packets(std::size_t first, std::size_t last, time_point start);
void flush();
bool handles_rate_limit() const;
};
bool handles_rate_limit(const ibv_transmit &transmitter);
#endif // HAVE_IBV
#endif // UDPREPLAY_IBV_TRANSMIT_H