• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 25304462134

04 May 2026 06:27AM UTC coverage: 78.463% (+0.004%) from 78.459%
25304462134

push

github

bmerry
Update type annotations for MemoryRegion class

5585 of 7118 relevant lines covered (78.46%)

119181.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.57
/src/send_udp_ibv.cpp
1
/* Copyright 2016, 2019-2020, 2023, 2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#ifndef _GNU_SOURCE
22
# define _GNU_SOURCE
23
#endif
24
#include <spead2/common_features.h>
25
#if SPEAD2_USE_IBV
26

27
#include <cassert>
28
#include <utility>
29
#include <memory>
30
#include <algorithm>
31
#include <set>
32
#include <boost/noncopyable.hpp>
33
#include <spead2/common_ibv.h>
34
#include <spead2/common_memory_allocator.h>
35
#include <spead2/common_raw_packet.h>
36
#include <spead2/common_logging.h>
37
#include <spead2/send_udp_ibv.h>
38
#include <spead2/send_stream.h>
39
#include <spead2/send_writer.h>
40

41
namespace spead2
42
{
43
namespace send
44
{
45

46
namespace
47
{
48

49
static constexpr int max_sge = 4;
50

51
/**
52
 * Stream using Infiniband Verbs for acceleration. Only IPv4 multicast
53
 * with an explicit source address is supported.
54
 */
55
class udp_ibv_writer : public writer
56
{
57
private:
58
    struct memory_region
59
    {
60
        const void *ptr;
61
        std::size_t size;
62
        ibv_mr_t mr;
63

64
        memory_region(const ibv_pd_t &pd, const void *ptr, std::size_t size);
65

66
        /* Construct from a dmabuf. The ptr is used to match pointers used in
67
         * heaps, so needs to use some address space that will not conflict
68
         * with any other data used in heaps.
69
         */
70
        memory_region(const ibv_pd_t &pd, const void *ptr, std::size_t size,
71
                      int fd, std::uint64_t offset);
72

73
        /* Used purely to construct a memory region for comparison e.g. with
74
         * std::set<memory_region>::lower_bound. Remove once c++14 is the
75
         * minimum version, since it allows types other than the key type for
76
         * lower_bound.
77
         */
78
        memory_region(const void *ptr, std::size_t size);
79

80
        bool operator<(const memory_region &other) const
×
81
        {
82
            // We order by decreasing address so that lower_bound will give us the containing region
83
            return ptr > other.ptr;
×
84
        }
85

86
        bool contains(const memory_region &other) const
×
87
        {
88
            return ptr <= other.ptr
×
89
                && static_cast<const std::uint8_t *>(ptr) + size
×
90
                >= static_cast<const std::uint8_t *>(other.ptr) + other.size;
×
91
        }
92
    };
93

94
    struct slot : public boost::noncopyable
95
    {
96
        ibv_send_wr wr{};
97
        ibv_sge sge[max_sge]{};
98
        ethernet_frame frame;
99
        std::uint8_t *payload;         ///< points to UDP payload within frame
100
        detail::queue_item *item = nullptr;
101
        bool last;   ///< Last packet in the heap
102
    };
103

104
    const std::size_t n_slots;
105
    const std::size_t target_batch;
106
    boost::asio::ip::udp::socket socket; // used only to assign a source UDP port
107
    boost::asio::ip::udp::endpoint source;
108
    const std::vector<boost::asio::ip::udp::endpoint> endpoints;
109
    std::vector<mac_address> mac_addresses; ///< MAC addresses corresponding to endpoints
110
    memory_allocator::pointer buffer;
111
    rdma_event_channel_t event_channel;
112
    rdma_cm_id_t cm_id;
113
    ibv_pd_t pd;
114
    ibv_comp_channel_t comp_channel;
115
    boost::asio::posix::stream_descriptor comp_channel_wrapper;
116
    ibv_cq_t send_cq, recv_cq;
117
    ibv_qp_t qp;
118
    ibv_mr_t mr;     ///< Memory region for internal buffer
119
    std::set<memory_region> memory_regions;   ///< User-registered memory regions
120
    std::unique_ptr<slot[]> slots;
121
    std::size_t head = 0, tail = 0;
122
    std::size_t available;
123
    const int max_poll;
124
    unsigned int send_flags;
125

126
    static ibv_qp_t
127
    create_qp(const ibv_pd_t &pd, const ibv_cq_t &send_cq, const ibv_cq_t &recv_cq,
128
              std::size_t n_slots);
129

130
    /// Modify the QP with a rate limit, returning true on success
131
    static bool setup_hw_rate(const ibv_qp_t &qp, const stream_config &config);
132

133
    /**
134
     * Clear out the completion queue and return slots to the queue.
135
     * It will stop after freeing up @ref target_batch slots or
136
     * find no completions @ref max_poll times.
137
     *
138
     * Returns @c true if there are possibly more completions still in the
139
     * queue.
140
     */
141
    bool reap();
142

143
    /**
144
     * Schedule a call to wakeup when it should check for space in the buffer again.
145
     */
146
    void wait_for_space();
147

148
    virtual void wakeup() override final;
149

150
public:
151
    udp_ibv_writer(
152
        io_context_ref io_context,
153
        const stream_config &config,
154
        const udp_ibv_config &ibv_config);
155

156
    virtual std::size_t get_num_substreams() const override final { return endpoints.size(); }
×
157
};
158

159
udp_ibv_writer::memory_region::memory_region(
×
160
    const ibv_pd_t &pd, const void *ptr, std::size_t size)
×
161
    : ptr(ptr), size(size), mr(pd, const_cast<void *>(ptr), size, IBV_ACCESS_LOCAL_WRITE)
×
162
{
163
}
×
164

165
udp_ibv_writer::memory_region::memory_region(
×
166
    const ibv_pd_t &pd, const void *ptr, std::size_t size, int fd, std::uint64_t offset)
×
167
    : ptr(ptr), size(size),
×
168
    mr(ibv_mr_t::from_dmabuf(pd, offset, size, reinterpret_cast<std::uint64_t>(ptr), fd, IBV_ACCESS_LOCAL_WRITE))
×
169
{
170
}
×
171

172
udp_ibv_writer::memory_region::memory_region(
×
173
    const void *ptr, std::size_t size)
×
174
    : ptr(ptr), size(size)
×
175
{
176
}
×
177

178
static constexpr int header_length =
179
    ethernet_frame::min_size + ipv4_packet::min_size + udp_packet::min_size;
180

181
ibv_qp_t udp_ibv_writer::create_qp(
×
182
    const ibv_pd_t &pd, const ibv_cq_t &send_cq, const ibv_cq_t &recv_cq, std::size_t n_slots)
183
{
184
    ibv_qp_init_attr attr;
185
    memset(&attr, 0, sizeof(attr));
×
186
    attr.send_cq = send_cq.get();
×
187
    attr.recv_cq = recv_cq.get();
×
188
    attr.qp_type = IBV_QPT_RAW_PACKET;
×
189
    attr.cap.max_send_wr = n_slots;
×
190
    attr.cap.max_recv_wr = 0;
×
191
    attr.cap.max_send_sge = max_sge;
×
192
    attr.cap.max_recv_sge = 0;
×
193
    attr.sq_sig_all = 0;
×
194
    return ibv_qp_t(pd, &attr);
×
195
}
196

197
bool udp_ibv_writer::setup_hw_rate([[maybe_unused]] const ibv_qp_t &qp, [[maybe_unused]] const stream_config &config)
×
198
{
199
#if SPEAD2_USE_IBV_HW_RATE_LIMIT
200
    ibv_device_attr_ex attr;
201
    if (ibv_query_device_ex(qp->context, nullptr, &attr) != 0)
×
202
    {
203
        log_debug("Not using HW rate limiting because ibv_query_device_ex failed");
×
204
        return false;
×
205
    }
206
    if (!ibv_is_qpt_supported(attr.packet_pacing_caps.supported_qpts, IBV_QPT_RAW_PACKET)
×
207
        || attr.packet_pacing_caps.qp_rate_limit_max == 0)
×
208
    {
209
        log_debug("Not using HW rate limiting because it is not supported by the device");
×
210
        return false;
×
211
    }
212
    // User rate is Bps, for UDP payload. Convert to rate for Ethernet frames, in kbps.
213
    std::size_t frame_size = config.get_max_packet_size() + 42;
×
214
    double overhead = double(frame_size) / config.get_max_packet_size();
×
215
    double rate_kbps = config.get_rate() * 8e-3 * overhead;
×
216
    if (rate_kbps < attr.packet_pacing_caps.qp_rate_limit_min
×
217
        || rate_kbps > attr.packet_pacing_caps.qp_rate_limit_max)
×
218
    {
219
        log_debug("Not using HW rate limiting because the HW does not support the rate");
×
220
        return false;
×
221
    }
222

223
    ibv_qp_rate_limit_attr limit_attr = {};
×
224
    limit_attr.rate_limit = rate_kbps;
×
225
    limit_attr.typical_pkt_sz = frame_size;
×
226
    limit_attr.max_burst_sz = std::max(frame_size, config.get_burst_size());
×
227
    if (ibv_modify_qp_rate_limit(qp.get(), &limit_attr) != 0)
×
228
    {
229
        log_debug("Not using HW rate limiting because ibv_modify_qp_rate_limit failed");
×
230
        return false;
×
231
    }
232

233
    return true;
×
234
#else
235
    log_debug("Not using HW rate limiting because support was not found at compile time");
236
    return false;
237
#endif
238
}
239

240
bool udp_ibv_writer::reap()
×
241
{
242
    ibv_wc wc;
243
    int retries = max_poll;
×
244
    int groups = 0;
×
245
    std::size_t min_available = std::min(n_slots, available + target_batch);
×
246
    // TODO: this could probably be faster with the cq_ex polling API
247
    // because it could avoid locking.
248
    while (available < min_available)
×
249
    {
250
        int done = send_cq.poll(1, &wc);
×
251
        if (done == 0)
×
252
        {
253
            retries--;
×
254
            if (retries <= 0)
×
255
                break;
×
256
            else
257
                continue;
×
258
        }
259

260
        boost::system::error_code ec;
×
261
        if (wc.status != IBV_WC_SUCCESS)
×
262
        {
263
            log_warning("Work Request failed with code %1%", wc.status);
×
264
            // TODO: create some mapping from ibv_wc_status to system error codes
265
            ec = boost::system::error_code(EIO, boost::asio::error::get_system_category());
×
266
        }
267
        int batch = wc.wr_id;
×
268
        for (int i = 0; i < batch; i++)
×
269
        {
270
            const slot *s = &slots[head];
×
271
            auto *item = s->item;
×
272
            if (ec)
×
273
            {
274
                if (!item->result)
×
275
                    item->result = ec;
×
276
            }
277
            else
278
            {
279
                for (int j = 0; j < s->wr.num_sge; j++)
×
280
                    item->bytes_sent += s->sge[j].length;
×
281
                item->bytes_sent -= header_length;
×
282
            }
283
            groups += s->last;
×
284
            if (++head == n_slots)
×
285
                head = 0;
×
286
        }
287
        available += batch;
×
288
    }
289
    if (groups > 0)
×
290
        groups_completed(groups);
×
291
    return available < n_slots && retries > 0;
×
292
}
293

294
void udp_ibv_writer::wait_for_space()
×
295
{
296
    if (comp_channel)
×
297
    {
298
        send_cq.req_notify(false);
×
299
        auto handler = [this](const boost::system::error_code &)
×
300
        {
301
            ibv_cq *event_cq;
302
            void *event_cq_context;
303
            // This should be non-blocking, since we were woken up, but
304
            // spurious wakeups have been observed.
305
            while (comp_channel.get_event(&event_cq, &event_cq_context))
×
306
                send_cq.ack_events(1);
×
307
            wakeup();
×
308
        };
×
309
        comp_channel_wrapper.async_wait(comp_channel_wrapper.wait_read, handler);
×
310
    }
311
    else
312
        post_wakeup();
×
313
}
×
314

315
void udp_ibv_writer::wakeup()
×
316
{
317
    bool more_cqe = reap();
×
318
    if (available < target_batch)
×
319
    {
320
        wait_for_space();
×
321
    }
322
    else
323
    {
324
        std::size_t i;
325
        packet_result result;
326
        slot *prev = nullptr;
×
327
        slot *first = &slots[tail];
×
328
        for (i = 0; i < target_batch; i++)
×
329
        {
330
            slot *s = &slots[tail];
×
331
            transmit_packet data;
×
332
            result = get_packet(data, s->payload);
×
333
            if (result != packet_result::SUCCESS)
×
334
                break;
×
335

336
            std::size_t payload_size = data.size;
×
337
            ipv4_packet ipv4 = s->frame.payload_ipv4();
×
338
            ipv4.total_length(payload_size + udp_packet::min_size + ipv4.header_length());
×
339
            udp_packet udp = ipv4.payload_udp();
×
340
            udp.length(payload_size + udp_packet::min_size);
×
341
            if (get_num_substreams() > 1)
×
342
            {
343
                const std::size_t substream_index = data.substream_index;
×
344
                const auto &endpoint = endpoints[substream_index];
×
345
                s->frame.destination_mac(mac_addresses[substream_index]);
×
346
                ipv4.destination_address(endpoint.address().to_v4());
×
347
                udp.destination_port(endpoint.port());
×
348
            }
349
            if (!(send_flags & IBV_SEND_IP_CSUM))
×
350
                ipv4.update_checksum();
×
351
            s->wr.num_sge = 1;
×
352
            // TODO: addr and lkey can be fixed by constructor
353
            s->sge[0].addr = (uintptr_t) s->frame.data();
×
354
            s->sge[0].lkey = mr->lkey;
×
355
            // The packet_generator writes the SPEAD header and item pointers
356
            // directly into the payload.
357
            assert(data.buffers[0].data() == s->payload);
×
358
            std::uint8_t *copy_target = s->payload + boost::asio::buffer_size(data.buffers[0]);
×
359
            s->sge[0].length = copy_target - s->frame.data();
×
360
            /* The first SGE is used for both the IP/UDP header and the
361
             * SPEAD header and item pointers.
362
             *
363
             * This is a conservative estimate, because merges are
364
             * possible (particularly if not all items fall into registered
365
             * ranges), but the cost of doing two passes to check for this
366
             * case would be expensive.
367
             */
368
            bool can_skip_copy = data.buffers.size() <= max_sge;
×
369
            for (std::size_t j = 1; j < data.buffers.size(); j++)
×
370
            {
371
                const auto &buffer = data.buffers[j];
×
372
                ibv_sge cur;
373
                const std::uint8_t *ptr = static_cast<const std::uint8_t *>(buffer.data());
×
374
                cur.length = boost::asio::buffer_size(buffer);
×
375
                // Check if it belongs to a user-registered region
376
                memory_region cmp(ptr, cur.length);
×
377
                std::set<memory_region>::const_iterator it;
×
378
                if (can_skip_copy
×
379
                    && (it = memory_regions.lower_bound(cmp)) != memory_regions.end()
×
380
                    && it->contains(cmp))
×
381
                {
382
                    cur.addr = (uintptr_t) ptr;
×
383
                    cur.lkey = it->mr->lkey;  // TODO: cache the lkey to avoid pointer lookup?
×
384
                }
385
                else
386
                {
387
                    // We have to copy it
388
                    cur.addr = (uintptr_t) copy_target;
×
389
                    cur.lkey = mr->lkey;
×
390
                    std::memcpy(copy_target, ptr, cur.length);
×
391
                    copy_target += cur.length;
×
392
                }
393
                ibv_sge &prev = s->sge[s->wr.num_sge - 1];
×
394
                if (prev.lkey == cur.lkey && prev.addr + prev.length == cur.addr)
×
395
                {
396
                    // Can merge with the previous one
397
                    prev.length += cur.length;
×
398
                }
399
                else
400
                {
401
                    // Have to create a new one.
402
                    s->sge[s->wr.num_sge++] = cur;
×
403
                }
404
            }
×
405
            s->wr.next = nullptr;
×
406
            s->wr.send_flags = send_flags;
×
407
            s->item = data.item;
×
408
            s->last = data.last;
×
409
            if (prev != nullptr)
×
410
                prev->wr.next = &s->wr;
×
411
            prev = s;
×
412

413
            if (++tail == n_slots)
×
414
                tail = 0;
×
415
            available--;
×
416
        }
×
417

418
        if (i > 0)
×
419
        {
420
            prev->wr.wr_id = i;
×
421
            prev->wr.send_flags |= IBV_SEND_SIGNALED;
×
422
            qp.post_send(&first->wr);
×
423
        }
424

425
        if (i > 0 || more_cqe)
×
426
        {
427
            /* There may be more completions immediately available (either existing
428
             * ones, or for the packets we've just posted).
429
             */
430
            post_wakeup();
×
431
        }
432
        else if (result == packet_result::SLEEP)
×
433
        {
434
            /* Experimentally it seems that if this condition and the next one
435
             * both hold, it is better to sleep than to wait for completions
436
             * ("better" meaning more likely to hit the target rate),
437
             * presumably because it favours getting more packets into the
438
             * send queue as soon as possible.
439
             */
440
            sleep();
×
441
        }
442
        else if (available < n_slots)
×
443
        {
444
            /* We ran out of packets and completions, but we need to monitor
445
             * the CQ for future completions.
446
             */
447
            wait_for_space();
×
448
        }
449
        else
450
        {
451
            assert(result == packet_result::EMPTY);
×
452
            request_wakeup();
×
453
        }
454
    }
455
}
×
456

457
static std::size_t calc_n_slots(const stream_config &config, std::size_t buffer_size)
3✔
458
{
459
    return std::max(std::size_t(1), buffer_size / (config.get_max_packet_size() + header_length));
3✔
460
}
461

462
static std::size_t calc_target_batch(const stream_config &config, std::size_t n_slots)
3✔
463
{
464
    std::size_t packet_size = config.get_max_packet_size() + header_length;
3✔
465
    return std::clamp(n_slots / 4, std::size_t(1), 262144 / packet_size);
3✔
466
}
467

468
udp_ibv_writer::udp_ibv_writer(
3✔
469
    io_context_ref io_context,
470
    const stream_config &config,
471
    const udp_ibv_config &ibv_config)
3✔
472
    : writer(std::move(io_context), config),
3✔
473
    n_slots(calc_n_slots(config, ibv_config.get_buffer_size())),
3✔
474
    target_batch(calc_target_batch(config, n_slots)),
3✔
475
    socket(get_io_context(), boost::asio::ip::udp::v4()),
3✔
476
    endpoints(ibv_config.get_endpoints()),
3✔
477
    event_channel(nullptr),
3✔
478
    comp_channel_wrapper(get_io_context()),
3✔
479
    available(n_slots),
3✔
480
    max_poll(ibv_config.get_max_poll())
15✔
481
{
482
    if (endpoints.empty())
3✔
483
        throw std::invalid_argument("endpoints is empty");
1✔
484
    mac_addresses.reserve(endpoints.size());
2✔
485
    for (const auto &endpoint : endpoints)
4✔
486
        mac_addresses.push_back(multicast_mac(endpoint.address()));
2✔
487
    const boost::asio::ip::address &interface_address = ibv_config.get_interface_address();
2✔
488
    if (interface_address.is_unspecified())
2✔
489
        throw std::invalid_argument("interface address has not been specified");
1✔
490
    // Check that registered memory regions don't overlap
491
    auto config_regions = ibv_config.get_memory_regions();
1✔
492
    std::sort(config_regions.begin(), config_regions.end(),
1✔
493
              [](const udp_ibv_config::memory_region &a,
2✔
494
                 const udp_ibv_config::memory_region &b) { return a.get_pointer() < b.get_pointer(); });
2✔
495
    for (std::size_t i = 1; i < config_regions.size(); i++)
1✔
496
        if (static_cast<const std::uint8_t *>(config_regions[i - 1].get_pointer())
1✔
497
            + config_regions[i - 1].get_size() > config_regions[i].get_pointer())
1✔
498
            throw std::invalid_argument("memory regions overlap");
1✔
499

500
    socket.bind(boost::asio::ip::udp::endpoint(interface_address, 0));
×
501
    // Re-compute buffer_size as a whole number of slots
502
    const std::size_t max_raw_size = config.get_max_packet_size() + header_length;
×
503
    std::size_t buffer_size = n_slots * max_raw_size;
×
504

505
    event_channel = rdma_event_channel_t();
×
506
    cm_id = rdma_cm_id_t(event_channel, nullptr, RDMA_PS_UDP);
×
507
    cm_id.bind_addr(interface_address);
×
508
    pd = ibv_pd_t(cm_id);
×
509
    int comp_vector = ibv_config.get_comp_vector();
×
510
    if (comp_vector >= 0)
×
511
    {
512
        comp_channel = ibv_comp_channel_t(cm_id);
×
513
        comp_channel_wrapper = comp_channel.wrap(get_io_context());
×
514
        send_cq = ibv_cq_t(cm_id, n_slots, nullptr,
×
515
                           comp_channel, comp_vector % cm_id->verbs->num_comp_vectors);
×
516
    }
517
    else
518
        send_cq = ibv_cq_t(cm_id, n_slots, nullptr);
×
519
    recv_cq = ibv_cq_t(cm_id, 1, nullptr);
×
520
    qp = create_qp(pd, send_cq, recv_cq, n_slots);
×
521
    qp.modify(IBV_QPS_INIT, cm_id->port_num);
×
522
    qp.modify(IBV_QPS_RTR);
×
523
    qp.modify(IBV_QPS_RTS);
×
524

525
    /* For now AUTO is treated the same as SW; further investigation is
526
     * needed to determine the conditions under which HW rate limiting
527
     * behaves well.
528
     */
529
    if (config.get_rate_method() == rate_method::HW && config.get_rate() > 0.0)
×
530
    {
531
        if (setup_hw_rate(qp, config))
×
532
            enable_hw_rate();
×
533
    }
534

535
    std::shared_ptr<mmap_allocator> allocator = std::make_shared<mmap_allocator>(0, true);
×
536
    buffer = allocator->allocate(max_raw_size * n_slots, nullptr);
×
537
    mr = ibv_mr_t(pd, buffer.get(), buffer_size, IBV_ACCESS_LOCAL_WRITE);
×
538
    for (const auto &region : ibv_config.get_memory_regions())
×
539
    {
540
        if (region.get_fd() >= 0)
×
541
            memory_regions.emplace(
×
542
                pd, region.get_pointer(), region.get_size(), region.get_fd(), region.get_offset());
×
543
        else
544
            memory_regions.emplace(
×
545
                pd, region.get_pointer(), region.get_size());
×
546
    }
547
    slots.reset(new slot[n_slots]);
×
548
    // We fill in the destination details for the first endpoint. If there are
549
    // multiple endpoints, they'll get updated for each packet.
550
    mac_address source_mac = interface_mac(interface_address);
×
551
    for (std::size_t i = 0; i < n_slots; i++)
×
552
    {
553
        slots[i].frame = ethernet_frame(buffer.get() + i * max_raw_size, max_raw_size);
×
554
        memset(&slots[i].sge, 0, sizeof(slots[i].sge));
×
555
        slots[i].wr.sg_list = slots[i].sge;
×
556
        slots[i].wr.opcode = IBV_WR_SEND;
×
557
        slots[i].frame.destination_mac(mac_addresses[0]);
×
558
        slots[i].frame.source_mac(source_mac);
×
559
        slots[i].frame.ethertype(ipv4_packet::ethertype);
×
560
        ipv4_packet ipv4 = slots[i].frame.payload_ipv4();
×
561
        ipv4.version_ihl(0x45);  // IPv4, 20 byte header
×
562
        // total_length will change later to the actual packet size
563
        ipv4.total_length(config.get_max_packet_size() + ipv4_packet::min_size + udp_packet::min_size);
×
564
        ipv4.flags_frag_off(ipv4_packet::flag_do_not_fragment);
×
565
        ipv4.ttl(ibv_config.get_ttl());
×
566
        ipv4.protocol(udp_packet::protocol);
×
567
        ipv4.source_address(interface_address.to_v4());
×
568
        ipv4.destination_address(endpoints[0].address().to_v4());
×
569
        udp_packet udp = ipv4.payload_udp();
×
570
        udp.source_port(socket.local_endpoint().port());
×
571
        udp.destination_port(endpoints[0].port());
×
572
        udp.length(config.get_max_packet_size() + udp_packet::min_size);
×
573
        udp.checksum(0);
×
574
        slots[i].payload = static_cast<std::uint8_t *>(udp.payload().data());
×
575
    }
576

577
    if (cm_id.query_device_ex().raw_packet_caps & IBV_RAW_PACKET_CAP_IP_CSUM)
×
578
        send_flags = IBV_SEND_IP_CSUM;
×
579
    else
580
        send_flags = 0;
×
581
}
49✔
582

583
} // anonymous namespace
584

585
void udp_ibv_config::validate_endpoint(const boost::asio::ip::udp::endpoint &endpoint)
7✔
586
{
587
    if (!endpoint.address().is_v4() || !endpoint.address().is_multicast())
7✔
588
        throw std::invalid_argument("endpoint is not an IPv4 multicast address");
2✔
589
}
5✔
590

591
void udp_ibv_config::validate_memory_region(const udp_ibv_config::memory_region &region)
3✔
592
{
593
    if (region.get_size() == 0)
3✔
594
        throw std::invalid_argument("memory region must have non-zero size");
1✔
595
}
2✔
596

597
udp_ibv_config &udp_ibv_config::set_ttl(std::uint8_t ttl)
1✔
598
{
599
    this->ttl = ttl;
1✔
600
    return *this;
1✔
601
}
602

603
udp_ibv_config &udp_ibv_config::set_memory_regions(const std::vector<memory_region> &memory_regions)
4✔
604
{
605
    for (const memory_region &region : memory_regions)
6✔
606
        validate_memory_region(region);
3✔
607
    this->memory_regions = memory_regions;
3✔
608
    return *this;
3✔
609
}
610

611
udp_ibv_config &udp_ibv_config::set_memory_regions(const std::vector<std::pair<const void *, std::size_t>> &memory_regions)
×
612
{
613
    return set_memory_regions(std::vector<memory_region>(memory_regions.begin(), memory_regions.end()));
×
614
}
615

616
udp_ibv_config &udp_ibv_config::add_memory_region(const void *ptr, std::size_t size)
×
617
{
618
    memory_region region(ptr, size);
×
619
    validate_memory_region(region);
×
620
    memory_regions.push_back(region);
×
621
    return *this;
×
622
}
623

624
udp_ibv_config &udp_ibv_config::add_memory_region(const void *ptr, std::size_t size, int fd, std::uint64_t offset)
×
625
{
626
    memory_region region(ptr, size, fd, offset);
×
627
    validate_memory_region(region);
×
628
    memory_regions.push_back(region);
×
629
    return *this;
×
630
}
631

632
udp_ibv_stream::udp_ibv_stream(
3✔
633
    io_context_ref io_context,
634
    const stream_config &config,
635
    const udp_ibv_config &ibv_config)
3✔
636
    : stream(std::make_unique<udp_ibv_writer>(std::move(io_context), config, ibv_config))
3✔
637
{
638
}
×
639

640
} // namespace send
641

642
template class detail::udp_ibv_config_base<send::udp_ibv_config>;
643

644
} // namespace spead2
645

646
#endif // SPEAD2_USE_IBV
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc