• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5998177654

28 Aug 2023 09:34AM UTC coverage: 77.911% (+2.9%) from 74.976%
5998177654

push

github

web-flow
Merge pull request #246 from ska-sa/meson

Switch to Meson build system

5446 of 6990 relevant lines covered (77.91%)

54679.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/common_ibv.cpp
1
/* Copyright 2016-2020 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <spead2/common_features.h>
22
#if SPEAD2_USE_IBV
23
#include <cerrno>
24
#include <cstring>
25
#include <cassert>
26
#include <cstdlib>
27
#include <memory>
28
#include <atomic>
29
#include <algorithm>
30
#include <system_error>
31
#include <boost/asio.hpp>
32
#include <spead2/common_logging.h>
33
#include <spead2/common_ibv.h>
34
#include <spead2/common_semaphore.h>
35
#include <spead2/common_endian.h>
36
#include <spead2/common_raw_packet.h>
37
#include <infiniband/verbs.h>
38
#include <rdma/rdma_cma.h>
39

40
namespace spead2
41
{
42

43
namespace detail
44
{
45

46
/* While compilation requires a relatively modern rdma-core, at runtime we
47
 * may be linked to an old version. The ABI for ibv_create_flow and
48
 * ibv_destroy_flow changed at around v12 or v13.
49
 *
50
 * We can work around it by mimicking the internals of verbs.h and
51
 * selecting the correct slot.
52
 */
53
struct verbs_context_fixed
54
{
55
    int (*ibv_destroy_flow)(ibv_flow *flow);
56
    int (*old_ibv_destroy_flow)(ibv_flow *flow);
57
    ibv_flow * (*ibv_create_flow)(ibv_qp *qp, ibv_flow_attr *flow_attr);
58
    ibv_flow * (*old_ibv_create_flow)(ibv_qp *qp, ibv_flow_attr *flow_attr);
59
    void (*padding[6])(void);
60
    std::uint64_t has_comp_mask;
61
    std::size_t sz;
62
    ibv_context context;
63
};
64

65
// Returns the wrapping verbs_context_fixed if it seems like there is one,
66
// otherwise NULL.
67
static const verbs_context_fixed *get_verbs_context_fixed(ibv_context *ctx)
×
68
{
69
    if (!ctx || ctx->abi_compat != __VERBS_ABI_IS_EXTENDED)
×
70
        return NULL;
×
71
    const verbs_context_fixed *vctx = (const verbs_context_fixed *)(
×
72
        (const char *)(ctx) - offsetof(verbs_context_fixed, context));
73
    if (vctx->sz >= sizeof(*vctx))
×
74
        return vctx;
×
75
    else
76
        return NULL;
×
77
}
78

79
static ibv_flow *wrap_ibv_create_flow(ibv_qp *qp, ibv_flow_attr *flow_attr)
×
80
{
81
    errno = 0;
×
82
    ibv_flow *flow = ibv_create_flow(qp, flow_attr);
×
83
    if (!flow && (errno == 0 || errno == EOPNOTSUPP))
×
84
    {
85
        const verbs_context_fixed *vctx = get_verbs_context_fixed(qp->context);
×
86
        if (vctx->old_ibv_create_flow && !vctx->ibv_create_flow)
×
87
            flow = vctx->old_ibv_create_flow(qp, flow_attr);
×
88
        else if (errno == 0)
×
89
            errno = EOPNOTSUPP;  // old versions of ibv_create_flow neglect to set errno
×
90
    }
91
    return flow;
×
92
}
93

94
static int wrap_ibv_destroy_flow(ibv_flow *flow)
×
95
{
96
    errno = 0;
×
97
    int result = ibv_destroy_flow(flow);
×
98
    /* While ibv_destroy_flow is supposed to return an errno on failure, the
99
     * header files have in the past returned negated error numbers.
100
     */
101
    if (result != 0)
×
102
    {
103
        if (std::abs(result) == ENOSYS || std::abs(result) == EOPNOTSUPP)
×
104
        {
105
            const verbs_context_fixed *vctx = get_verbs_context_fixed(flow->context);
×
106
            if (vctx->old_ibv_destroy_flow && !vctx->ibv_destroy_flow)
×
107
                result = vctx->old_ibv_destroy_flow(flow);
×
108
        }
109
    }
110
    return result;
×
111
}
112

113
void ibv_flow_deleter::operator()(ibv_flow *flow)
×
114
{
115
    wrap_ibv_destroy_flow(flow);
×
116
}
×
117

118
} // namespace detail
119

120
rdma_event_channel_t::rdma_event_channel_t()
×
121
{
122
    errno = 0;
×
123
    rdma_event_channel *event_channel = rdma_create_event_channel();
×
124
    if (!event_channel)
×
125
        throw_errno("rdma_create_event_channel failed");
×
126
    reset(event_channel);
×
127
}
×
128

129
rdma_cm_id_t::rdma_cm_id_t(const rdma_event_channel_t &event_channel, void *context, rdma_port_space ps)
×
130
{
131
    rdma_cm_id *cm_id = nullptr;
×
132
    errno = 0;
×
133
    int status = rdma_create_id(event_channel.get(), &cm_id, context, ps);
×
134
    if (status < 0)
×
135
        throw_errno("rdma_create_id failed");
×
136
    reset(cm_id);
×
137
}
×
138

139
void rdma_cm_id_t::bind_addr(const boost::asio::ip::address &addr)
×
140
{
141
    assert(get());
×
142
    boost::asio::ip::udp::endpoint endpoint(addr, 0);
×
143
    errno = 0;
×
144
    int status = rdma_bind_addr(get(), endpoint.data());
×
145
    if (status < 0)
×
146
        throw_errno("rdma_bind_addr failed");
×
147
    if (get()->verbs == nullptr)
×
148
        throw_errno("rdma_bind_addr did not bind to an RDMA device", ENODEV);
×
149
}
×
150

151
ibv_device_attr rdma_cm_id_t::query_device() const
×
152
{
153
    assert(get());
×
154
    ibv_device_attr attr;
155
    std::memset(&attr, 0, sizeof(attr));
×
156
    int status = ibv_query_device(get()->verbs, &attr);
×
157
    if (status != 0)
×
158
        throw_errno("ibv_query_device failed", status);
×
159
    return attr;
×
160
}
161

162
ibv_device_attr_ex rdma_cm_id_t::query_device_ex(const struct ibv_query_device_ex_input *input) const
×
163
{
164
    assert(get());
×
165
    ibv_device_attr_ex attr;
166
    ibv_query_device_ex_input dummy_input;
167
    if (!input)
×
168
    {
169
        std::memset(&dummy_input, 0, sizeof(dummy_input));
×
170
        input = &dummy_input;
×
171
    }
172
    std::memset(&attr, 0, sizeof(attr));
×
173
    int status = ibv_query_device_ex(get()->verbs, input, &attr);
×
174
    if (status != 0)
×
175
        throw_errno("ibv_query_device_ex failed", status);
×
176
    return attr;
×
177
}
178

179
#if SPEAD2_USE_MLX5DV
180
bool rdma_cm_id_t::mlx5dv_is_supported() const
×
181
{
182
    assert(get());
×
183
    try
184
    {
185
        return spead2::mlx5dv_is_supported(get()->verbs->device);
×
186
    }
187
    catch (std::system_error &)
×
188
    {
189
        return false;
×
190
    }
×
191
}
192

193
mlx5dv_context rdma_cm_id_t::mlx5dv_query_device() const
×
194
{
195
    assert(get());
×
196
    mlx5dv_context attr;
197
    std::memset(&attr, 0, sizeof(attr));
×
198
    // TODO: set other flags if they're defined (will require configure-time
199
    // detection).
200
    attr.comp_mask = MLX5DV_CONTEXT_MASK_STRIDING_RQ | MLX5DV_CONTEXT_MASK_CLOCK_INFO_UPDATE;
×
201
    int status = spead2::mlx5dv_query_device(get()->verbs, &attr);
×
202
    if (status != 0)
×
203
        throw_errno("mlx5dv_query_device failed", status);
×
204
    return attr;
×
205
}
206
#endif  // SPEAD2_USE_MLX5DV
207

208
ibv_context_t::ibv_context_t(struct ibv_device *device)
×
209
{
210
    ibv_context *ctx = ibv_open_device(device);
×
211
    if (!ctx)
×
212
        throw_errno("ibv_open_device failed");
×
213
    reset(ctx);
×
214
}
×
215

216
ibv_context_t::ibv_context_t(const boost::asio::ip::address &addr)
×
217
{
218
    /* Use rdma_cm_id_t to get an existing device context, then
219
     * query it for its GUID and find the corresponding device.
220
     */
221
    rdma_event_channel_t event_channel;
×
222
    rdma_cm_id_t cm_id(event_channel, nullptr, RDMA_PS_UDP);
×
223
    cm_id.bind_addr(addr);
×
224
    ibv_device_attr attr = cm_id.query_device();
×
225

226
    struct ibv_device **devices;
227
    devices = ibv_get_device_list(nullptr);
×
228
    if (devices == nullptr)
×
229
        throw_errno("ibv_get_device_list failed");
×
230

231
    ibv_device *device = nullptr;
×
232
    for (ibv_device **d = devices; *d != nullptr; d++)
×
233
        if (ibv_get_device_guid(*d) == attr.node_guid)
×
234
        {
235
            device = *d;
×
236
            break;
×
237
        }
238
    if (device == nullptr)
×
239
    {
240
        ibv_free_device_list(devices);
×
241
        throw_errno("no matching device found", ENOENT);
×
242
    }
243

244
    ibv_context *ctx = ibv_open_device(device);
×
245
    if (!ctx)
×
246
    {
247
        ibv_free_device_list(devices);
×
248
        throw_errno("ibv_open_device failed");
×
249
    }
250
    reset(ctx);
×
251
    ibv_free_device_list(devices);
×
252
}
×
253

254
ibv_comp_channel_t::ibv_comp_channel_t(const rdma_cm_id_t &cm_id)
×
255
{
256
    errno = 0;
×
257
    ibv_comp_channel *comp_channel = ibv_create_comp_channel(cm_id->verbs);
×
258
    if (!comp_channel)
×
259
        throw_errno("ibv_create_comp_channel failed");
×
260
    reset(comp_channel);
×
261
}
×
262

263
boost::asio::posix::stream_descriptor ibv_comp_channel_t::wrap(
×
264
    boost::asio::io_service &io_service) const
265
{
266
    assert(get());
×
267
    return wrap_fd(io_service, get()->fd);
×
268
}
269

270
bool ibv_comp_channel_t::get_event(ibv_cq **cq, void **context)
×
271
{
272
    assert(get());
×
273
    errno = 0;
×
274
    int status = ibv_get_cq_event(get(), cq, context);
×
275
    if (status < 0)
×
276
    {
277
        if (errno == EAGAIN)
×
278
            return false;
×
279
        else
280
            throw_errno("ibv_get_cq_event failed");
×
281
    }
282
    return true;
×
283
}
284

285
ibv_cq_t::ibv_cq_t(
×
286
    const rdma_cm_id_t &cm_id, int cqe, void *context,
287
    const ibv_comp_channel_t &comp_channel, int comp_vector)
×
288
{
289
    errno = 0;
×
290
    ibv_cq *cq = ibv_create_cq(cm_id->verbs, cqe, context, comp_channel.get(), comp_vector);
×
291
    if (!cq)
×
292
        throw_errno("ibv_create_cq failed");
×
293
    reset(cq);
×
294
}
×
295

296
ibv_cq_t::ibv_cq_t(const rdma_cm_id_t &cm_id, int cqe, void *context)
×
297
{
298
    errno = 0;
×
299
    ibv_cq *cq = ibv_create_cq(cm_id->verbs, cqe, context, nullptr, 0);
×
300
    if (!cq)
×
301
        throw_errno("ibv_create_cq failed");
×
302
    reset(cq);
×
303
}
×
304

305
void ibv_cq_t::req_notify(bool solicited_only)
×
306
{
307
    assert(get());
×
308
    int status = ibv_req_notify_cq(get(), int(solicited_only));
×
309
    if (status != 0)
×
310
        throw_errno("ibv_req_notify_cq failed", status);
×
311
}
×
312

313
int ibv_cq_t::poll(int num_entries, ibv_wc *wc)
×
314
{
315
    assert(get());
×
316
    int received = ibv_poll_cq(get(), num_entries, wc);
×
317
    if (received < 0)
×
318
        throw_errno("ibv_poll_cq failed");
×
319
    return received;
×
320
}
321

322
void ibv_cq_t::ack_events(unsigned int nevents)
×
323
{
324
    assert(get());
×
325
    ibv_ack_cq_events(get(), nevents);
×
326
}
×
327

328
ibv_cq_ex_t::ibv_cq_ex_t(const rdma_cm_id_t &cm_id, ibv_cq_init_attr_ex *cq_attr)
×
329
{
330
    errno = 0;
×
331
    ibv_cq_ex *cq = ibv_create_cq_ex(cm_id->verbs, cq_attr);
×
332
    if (!cq)
×
333
        throw_errno("ibv_create_cq_ex failed");
×
334
    reset(ibv_cq_ex_to_cq(cq));
×
335
}
×
336

337
ibv_pd_t::ibv_pd_t(const rdma_cm_id_t &cm_id)
×
338
{
339
    errno = 0;
×
340
    ibv_pd *pd = ibv_alloc_pd(cm_id->verbs);
×
341
    if (!pd)
×
342
        throw_errno("ibv_alloc_pd failed");
×
343
    reset(pd);
×
344
}
×
345

346
ibv_qp_t::ibv_qp_t(const ibv_pd_t &pd, ibv_qp_init_attr *init_attr)
×
347
{
348
    errno = 0;
×
349
    ibv_qp *qp = ibv_create_qp(pd.get(), init_attr);
×
350
    if (!qp)
×
351
    {
352
        if (errno == EINVAL && init_attr->qp_type == IBV_QPT_RAW_PACKET)
×
353
            throw_errno(
×
354
                "ibv_create_qp failed (could be a permission problem - do you have CAP_NET_RAW?)");
355
        else
356
            throw_errno("ibv_create_qp failed");
×
357
    }
358
    reset(qp);
×
359
}
×
360

361
ibv_qp_t::ibv_qp_t(const rdma_cm_id_t &cm_id, ibv_qp_init_attr_ex *init_attr)
×
362
{
363
    errno = 0;
×
364
    ibv_qp *qp = ibv_create_qp_ex(cm_id->verbs, init_attr);
×
365
    if (!qp)
×
366
    {
367
        if (errno == EINVAL && init_attr->qp_type == IBV_QPT_RAW_PACKET)
×
368
            throw_errno(
×
369
                "ibv_create_qp_ex failed (could be a permission problem - do you have CAP_NET_RAW?)");
370
        else
371
            throw_errno("ibv_create_qp_ex failed");
×
372
    }
373
    reset(qp);
×
374
}
×
375

376
ibv_mr_t::ibv_mr_t(const ibv_pd_t &pd, void *addr, std::size_t length, int access,
×
377
                   bool allow_relaxed_ordering)
×
378
{
379
#ifndef IBV_ACCESS_RELAXED_ORDERING
380
    const int IBV_ACCESS_OPTIONAL_RANGE = 0x3ff00000;
×
381
    const int IBV_ACCESS_RELAXED_ORDERING = 1 << 20;
×
382
#endif
383
    if (allow_relaxed_ordering)
×
384
        access |= IBV_ACCESS_RELAXED_ORDERING;
×
385
    /* Emulate the ibv_reg_mr macro in verbs.h. If access contains optional
386
     * flags, we have to call ibv_reg_mr_iova2 rather than the ibv_reg_mr
387
     * symbol. If the function is not available, just mask out the bits,
388
     * which is what ibv_reg_mr_iova2 does if the kernel doesn't support
389
     * them.
390
     */
391
    errno = 0;
×
392
    ibv_mr *mr;
393
    if (!(access & IBV_ACCESS_OPTIONAL_RANGE))
×
394
        mr = ibv_reg_mr(pd.get(), addr, length, access);
×
395
    else if (!has_ibv_reg_mr_iova2())
×
396
        mr = ibv_reg_mr(pd.get(), addr, length, access & ~IBV_ACCESS_OPTIONAL_RANGE);
×
397
    else
398
        mr = ibv_reg_mr_iova2(pd.get(), addr, length, std::uintptr_t(addr), access);
×
399
    if (!mr)
×
400
        throw_errno("ibv_reg_mr failed");
×
401
    reset(mr);
×
402
}
×
403

404
void ibv_qp_t::modify(ibv_qp_attr *attr, int attr_mask)
×
405
{
406
    assert(get());
×
407
    int status = ibv_modify_qp(get(), attr, attr_mask);
×
408
    if (status != 0)
×
409
        throw_errno("ibv_modify_qp failed", status);
×
410
}
×
411

412
void ibv_qp_t::modify(ibv_qp_state qp_state)
×
413
{
414
    ibv_qp_attr attr;
415
    std::memset(&attr, 0, sizeof(attr));
×
416
    attr.qp_state = qp_state;
×
417
    modify(&attr, IBV_QP_STATE);
×
418
}
×
419

420
void ibv_qp_t::modify(ibv_qp_state qp_state, int port_num)
×
421
{
422
    ibv_qp_attr attr;
423
    std::memset(&attr, 0, sizeof(attr));
×
424
    attr.qp_state = qp_state;
×
425
    attr.port_num = port_num;
×
426
    modify(&attr, IBV_QP_STATE | IBV_QP_PORT);
×
427
}
×
428

429
void ibv_qp_t::post_recv(ibv_recv_wr *wr)
×
430
{
431
    assert(get());
×
432
    ibv_recv_wr *bad_wr;
433
    int status = ibv_post_recv(get(), wr, &bad_wr);
×
434
    if (status != 0)
×
435
        throw_errno("ibv_post_recv failed", status);
×
436
}
×
437

438
void ibv_qp_t::post_send(ibv_send_wr *wr)
×
439
{
440
    assert(get());
×
441
    ibv_send_wr *bad_wr;
442
    int status = ibv_post_send(get(), wr, &bad_wr);
×
443
    if (status != 0)
×
444
        throw_errno("ibv_post_send failed", status);
×
445
}
×
446

447
ibv_flow_t::ibv_flow_t(const ibv_qp_t &qp, ibv_flow_attr *flow_attr)
×
448
{
449
    ibv_flow *flow = detail::wrap_ibv_create_flow(qp.get(), flow_attr);
×
450
    if (!flow)
×
451
        throw_errno("ibv_create_flow failed");
×
452
    reset(flow);
×
453
}
×
454

455
ibv_flow_t create_flow(
×
456
    const ibv_qp_t &qp, const boost::asio::ip::udp::endpoint &endpoint,
457
    int port_num)
458
{
459
    struct
460
    {
461
        ibv_flow_attr attr;
462
        ibv_flow_spec_eth eth __attribute__((packed));
463
        ibv_flow_spec_ipv4 ip __attribute__((packed));
464
        ibv_flow_spec_tcp_udp udp __attribute__((packed));
465
    } flow_rule;
466
    std::memset(&flow_rule, 0, sizeof(flow_rule));
×
467

468
    flow_rule.attr.type = IBV_FLOW_ATTR_NORMAL;
×
469
    flow_rule.attr.priority = 0;
×
470
    flow_rule.attr.size = sizeof(flow_rule);
×
471
    flow_rule.attr.num_of_specs = 3;
×
472
    flow_rule.attr.port = port_num;
×
473

474
    flow_rule.eth.type = IBV_FLOW_SPEC_ETH;
×
475
    flow_rule.eth.size = sizeof(flow_rule.eth);
×
476
    flow_rule.ip.type = IBV_FLOW_SPEC_IPV4;
×
477
    flow_rule.ip.size = sizeof(flow_rule.ip);
×
478

479
    if (!endpoint.address().is_unspecified())
×
480
    {
481
        /* At least the ConnectX-3 cards seem to require an Ethernet match. We
482
         * thus have to construct the either the MAC address corresponding to
483
         * the IP multicast address from RFC 7042, the interface address for
484
         * unicast.
485
         */
486
        mac_address dst_mac;
487
        if (endpoint.address().is_multicast())
×
488
            dst_mac = multicast_mac(endpoint.address());
×
489
        else
490
            dst_mac = interface_mac(endpoint.address());
×
491
        std::memcpy(&flow_rule.eth.val.dst_mac, &dst_mac, sizeof(dst_mac));
×
492
        std::memset(&flow_rule.eth.mask.dst_mac, 0xFF, sizeof(flow_rule.eth.mask.dst_mac));
×
493

494
        auto bytes = endpoint.address().to_v4().to_bytes(); // big-endian address
×
495
        std::memcpy(&flow_rule.ip.val.dst_ip, &bytes, sizeof(bytes));
×
496
        std::memset(&flow_rule.ip.mask.dst_ip, 0xFF, sizeof(flow_rule.ip.mask.dst_ip));
497
    }
498

499
    flow_rule.udp.type = IBV_FLOW_SPEC_UDP;
×
500
    flow_rule.udp.size = sizeof(flow_rule.udp);
×
501
    flow_rule.udp.val.dst_port = htobe16(endpoint.port());
×
502
    flow_rule.udp.mask.dst_port = 0xFFFF;
×
503

504
    return ibv_flow_t(qp, &flow_rule.attr);
×
505
}
506

507
std::vector<ibv_flow_t> create_flows(
×
508
    const ibv_qp_t &qp,
509
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
510
    int port_num)
511
{
512
    /* Note: some NICs support flow rules with non-trivial masks. However,
513
     * using such rules can lead to subtle problems when there are multiple
514
     * receivers on the same NIC subscribing to common groups. See #66 for
515
     * more details.
516
     */
517
    std::vector<ibv_flow_t> flows;
×
518
    for (const auto &ep : endpoints)
×
519
        flows.push_back(create_flow(qp, ep, port_num));
×
520
    return flows;
×
521
}
×
522

523
ibv_wq_t::ibv_wq_t(const rdma_cm_id_t &cm_id, ibv_wq_init_attr *attr)
×
524
{
525
    ibv_wq *wq = ibv_create_wq(cm_id->verbs, attr);
×
526
    if (!wq)
×
527
        throw_errno("ibv_create_wq failed");
×
528
    reset(wq);
×
529
}
×
530

531
void ibv_wq_t::modify(ibv_wq_state state)
×
532
{
533
    ibv_wq_attr wq_attr;
534
    std::memset(&wq_attr, 0, sizeof(wq_attr));
×
535
    wq_attr.wq_state = state;
×
536
    wq_attr.attr_mask = IBV_WQ_ATTR_STATE;
×
537
    int status = ibv_modify_wq(get(), &wq_attr);
×
538
    if (status != 0)
×
539
        throw_errno("ibv_modify_wq failed", status);
×
540
}
×
541

542
#if SPEAD2_USE_MLX5DV
543
ibv_wq_mprq_t::ibv_wq_mprq_t(const rdma_cm_id_t &cm_id, ibv_wq_init_attr *attr, mlx5dv_wq_init_attr *mlx5_attr)
×
544
    : stride_size(1U << mlx5_attr->striding_rq_attrs.single_stride_log_num_of_bytes),
×
545
    n_strides(1U << mlx5_attr->striding_rq_attrs.single_wqe_log_num_of_strides),
×
546
    data_offset(mlx5_attr->striding_rq_attrs.two_byte_shift_en ? 2 : 0)
×
547
{
548
    assert(mlx5_attr->comp_mask & MLX5DV_WQ_INIT_ATTR_MASK_STRIDING_RQ);
×
549
    ibv_wq *wq = mlx5dv_create_wq(cm_id->verbs, attr, mlx5_attr);
×
550
    if (!wq)
×
551
        throw_errno("mlx5dv_create_wq failed");
×
552
    mlx5dv_obj obj;
553
    obj.rwq.in = wq;
×
554
    obj.rwq.out = &rwq;
×
555
    int ret = mlx5dv_init_obj(&obj, MLX5DV_OBJ_RWQ);
×
556
    if (ret != 0)
×
557
    {
558
        ibv_destroy_wq(wq);
×
559
        throw_errno("mlx5dv_init_obj failed", ret);
×
560
    }
561
    if (rwq.stride != sizeof(mlx5_mprq_wqe))
×
562
    {
563
        ibv_destroy_wq(wq);
×
564
        throw_errno("multi-packet receive queue has unexpected stride", EOPNOTSUPP);
×
565
    }
566
    reset(wq);
×
567
}
×
568

569
void ibv_wq_mprq_t::post_recv(ibv_sge *sge)
×
570
{
571
    if (head - tail >= rwq.wqe_cnt)
×
572
        throw_errno("Multi-packet receive queue is full", ENOMEM);
×
573

574
    int ind = head & (rwq.wqe_cnt - 1);
×
575
    mlx5_mprq_wqe *wqe = (mlx5_mprq_wqe *) rwq.buf + ind;
×
576
    memset(&wqe->nseg, 0, sizeof(wqe->nseg));
×
577
    wqe->dseg.byte_count = htobe32(sge->length);
×
578
    wqe->dseg.lkey = htobe32(sge->lkey);
×
579
    wqe->dseg.addr = htobe64(sge->addr);
×
580
    head++;
×
581
    /* Update the doorbell to tell the HW about the new entry. This must
582
     * be ordered after the writes to the buffer, so we hope that any
583
     * sensible platform will make std::atomic_uint32_t just a wrapper
584
     * around a uint32_t.
585
     */
586
    static_assert(sizeof(std::atomic_uint32_t) == sizeof(std::uint32_t),
587
                  "std::atomic_uint32_t has the wrong size");
588
    std::atomic<std::uint32_t> *dbrec = reinterpret_cast<std::atomic<std::uint32_t> *>(rwq.dbrec);
×
589
    dbrec->store(htobe32(head & 0xffff), std::memory_order_release);
×
590
}
×
591

592
void ibv_wq_mprq_t::read_wc(const ibv_cq_ex_t &cq, std::uint32_t &byte_len,
×
593
                            std::uint32_t &offset, int &flags) noexcept
594
{
595
    /* This is actually a packed field: lower 16 bytes are the byte count,
596
     * top bit is the "filler" flag, remaining bits are the number of
597
     * strides consumed.
598
     */
599
    std::uint32_t byte_cnt = ibv_wc_read_byte_len(cq.get());
×
600
    std::uint32_t strides = byte_cnt >> 16;
×
601
    byte_len = (byte_cnt & 0xffff) - data_offset;
×
602
    offset = tail_strides * stride_size + data_offset;
×
603
    flags = 0;
×
604
    if (strides & 0x8000)
×
605
    {
606
        strides -= 0x8000;
×
607
        flags |= FLAG_FILLER;
×
608
    }
609
    tail_strides += strides;
×
610
    if (tail_strides >= n_strides)
×
611
    {
612
        assert(tail_strides <= n_strides);
×
613
        flags |= FLAG_LAST;
×
614
        tail++;
×
615
        tail_strides = 0;
×
616
    }
617
}
×
618

619
#endif // SPEAD2_USE_MLX5DV
620

621
ibv_rwq_ind_table_t::ibv_rwq_ind_table_t(const rdma_cm_id_t &cm_id, ibv_rwq_ind_table_init_attr *attr)
×
622
{
623
    ibv_rwq_ind_table *table = ibv_create_rwq_ind_table(cm_id->verbs, attr);
×
624
    if (!table)
×
625
        throw_errno("ibv_create_rwq_ind_table failed");
×
626
    reset(table);
×
627
}
×
628

629
ibv_rwq_ind_table_t create_rwq_ind_table(const rdma_cm_id_t &cm_id, const ibv_wq_t &wq)
×
630
{
631
    ibv_rwq_ind_table_init_attr attr;
632
    ibv_wq *tbl[1] = {wq.get()};
×
633
    std::memset(&attr, 0, sizeof(attr));
×
634
    attr.log_ind_tbl_size = 0;
×
635
    attr.ind_tbl = tbl;
×
636
    return ibv_rwq_ind_table_t(cm_id, &attr);
×
637
}
638

639
} // namespace spead
640

641
#endif // SPEAD2_USE_IBV
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc