• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 12652972619

07 Jan 2025 02:00PM UTC coverage: 78.871% (+0.02%) from 78.852%
12652972619

push

github

web-flow
Merge pull request #368 from ska-sa/bump-boost-1.87

Support Boost 1.87

115 of 137 new or added lines in 32 files covered. (83.94%)

2 existing lines in 2 files now uncovered.

5577 of 7071 relevant lines covered (78.87%)

91360.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.1
/src/py_send.cpp
1
/* Copyright 2015, 2017, 2019-2021, 2023-2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <boost/system/system_error.hpp>
24
#include <stdexcept>
25
#include <mutex>
26
#include <vector>
27
#include <utility>
28
#include <memory>
29
#include <unistd.h>
30
#include <spead2/send_heap.h>
31
#include <spead2/send_stream.h>
32
#include <spead2/send_udp.h>
33
#include <spead2/send_udp_ibv.h>
34
#include <spead2/send_tcp.h>
35
#include <spead2/send_streambuf.h>
36
#include <spead2/send_inproc.h>
37
#include <spead2/common_thread_pool.h>
38
#include <spead2/common_semaphore.h>
39
#include <spead2/py_common.h>
40
#include "common_unique.h"
41

42
namespace py = pybind11;
43

44
namespace spead2
45
{
46
namespace send
47
{
48

49
using spead2::detail::discard_result;
50

51
class heap_wrapper : public heap
52
{
53
private:
54
    std::vector<py::buffer_info> item_buffers;
55

56
public:
57
    using heap::heap;
58
    void add_item(py::object item);
59
    void add_descriptor(py::object descriptor);
60
    flavour get_flavour() const;
61
};
62

63
void heap_wrapper::add_item(py::object item)
10,839✔
64
{
65
    std::int64_t id = item.attr("id").cast<std::int64_t>();
10,839✔
66
    py::buffer buffer = item.attr("to_buffer")().cast<py::buffer>();
21,678✔
67
    bool allow_immediate = item.attr("allow_immediate")().cast<bool>();
10,839✔
68
    item_buffers.emplace_back(request_buffer_info(buffer, PyBUF_C_CONTIGUOUS));
10,839✔
69
    heap::add_item(id, item_buffers.back().ptr,
10,839✔
70
                   item_buffers.back().itemsize * item_buffers.back().size,
10,839✔
71
                   allow_immediate);
72
}
10,839✔
73

74
void heap_wrapper::add_descriptor(py::object object)
1,234✔
75
{
76
    heap::add_descriptor(object.attr("to_raw")(heap::get_flavour()).cast<descriptor>());
1,234✔
77
}
1,234✔
78

79
flavour heap_wrapper::get_flavour() const
×
80
{
81
    return heap::get_flavour();
×
82
}
83

84
py::bytes packet_generator_next(packet_generator &gen)
25✔
85
{
86
    auto scratch = spead2::detail::make_unique_for_overwrite<std::uint8_t[]>(gen.get_max_packet_size());
25✔
87
    std::vector<boost::asio::const_buffer> buffers;
25✔
88
    gen.next_packet(scratch.get(), buffers);
25✔
89
    if (buffers.empty())
25✔
90
        throw py::stop_iteration();
12✔
91
    return py::bytes(std::string(boost::asio::buffers_begin(buffers),
26✔
92
                                 boost::asio::buffers_end(buffers)));
26✔
93
}
37✔
94

95
static py::object make_io_error(const boost::system::error_code &ec)
235✔
96
{
97
    if (ec)
235✔
98
    {
99
        py::object exc_class = py::reinterpret_borrow<py::object>(PyExc_IOError);
3✔
100
        return exc_class(ec.value(), ec.message());
6✔
101
    }
3✔
102
    else
103
        return py::none();
232✔
104
}
105

106
class heap_reference_list
107
{
108
private:
109
    std::vector<heap_reference> heaps;
110
    // Python references to the heaps, to keep them alive
111
    std::vector<py::object> objects;
112

113
    heap_reference_list(std::vector<heap_reference> heaps, std::vector<py::object> objects)
108✔
114
        : heaps(std::move(heaps)), objects(std::move(objects)) {}
108✔
115
public:
116
    heap_reference_list(std::vector<heap_reference> heaps);
117
    const std::vector<heap_reference> &get_heaps() const { return heaps; }
144✔
118
    std::size_t size() const { return heaps.size(); }
×
119
    heap_reference_list get_slice(const py::slice &slice) const;
120
};
121

122
heap_reference_list::heap_reference_list(std::vector<heap_reference> heaps)
55✔
123
{
124
    objects.reserve(heaps.size());
55✔
125
    for (const heap_reference &h : heaps)
271✔
126
        objects.push_back(py::cast(static_cast<const heap_wrapper *>(&h.heap)));
216✔
127
    this->heaps = std::move(heaps);
55✔
128
}
55✔
129

130
heap_reference_list heap_reference_list::get_slice(const py::slice &slice) const
109✔
131
{
132
    std::size_t start, stop, step, slicelength;
133
    if (!slice.compute(heaps.size(), &start, &stop, &step, &slicelength))
109✔
134
        throw py::error_already_set();
1✔
135
    std::vector<heap_reference> new_heaps;
108✔
136
    std::vector<py::object> new_objects;
108✔
137
    new_heaps.reserve(slicelength);
108✔
138
    new_objects.reserve(slicelength);
108✔
139
    for (std::size_t i = 0; i < slicelength; i++)
324✔
140
    {
141
        new_heaps.push_back(heaps[start]);
216✔
142
        new_objects.push_back(objects[start]);
216✔
143
        start += step;
216✔
144
    }
145
    return heap_reference_list(std::move(new_heaps), std::move(new_objects));
216✔
146
}
108✔
147

148
template<typename Base>
149
class stream_wrapper : public Base
150
{
151
private:
152
    struct callback_state
153
    {
154
        /**
155
         * Semaphore triggered by the callback. It uses a semaphore rather
156
         * than a promise because a semaphore can be interrupted.
157
         */
158
        semaphore sem;
159
        /**
160
         * Error code from the callback.
161
         */
162
        boost::system::error_code ec;
163
        /**
164
         * Bytes transferred (encoded heap size).
165
         */
166
        item_pointer_t bytes_transferred = 0;
167
    };
168

169
public:
170
    using Base::Base;
171

172
    /// Sends heap synchronously
173
    item_pointer_t send_heap(
4,969✔
174
        const heap_wrapper &h,
175
        s_item_pointer_t cnt = -1,
176
        std::size_t substream_index = 0,
177
        double rate = -1.0)
178
    {
179
        /* The semaphore state needs to be in shared_ptr because if we are
180
         * interrupted and throw an exception, it still needs to exist until
181
         * the heap is sent.
182
         */
183
        auto state = std::make_shared<callback_state>();
4,969✔
184
        Base::async_send_heap(h, [state] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
14,907✔
185
        {
186
            state->ec = ec;
4,969✔
187
            state->bytes_transferred = bytes_transferred;
4,969✔
188
            state->sem.put();
4,969✔
189
        }, cnt, substream_index, rate);
190
        semaphore_get(state->sem, gil_release_tag());
4,969✔
191
        if (state->ec)
4,969✔
192
            throw boost_io_error(state->ec);
4✔
193
        else
194
            return state->bytes_transferred;
9,930✔
195
    }
4,969✔
196

197
    /// Sends multiple heaps synchronously
198
    item_pointer_t send_heaps(const std::vector<heap_reference> &heaps, group_mode mode)
112✔
199
    {
200
        // See comments in send_heap
201
        auto state = std::make_shared<callback_state>();
112✔
202
        Base::async_send_heaps(
112✔
203
            heaps.begin(), heaps.end(),
204
            [state] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
224✔
205
            {
206
                state->ec = ec;
112✔
207
                state->bytes_transferred = bytes_transferred;
112✔
208
                state->sem.put();
112✔
209
            }, mode);
210
        semaphore_get(state->sem, gil_release_tag());
112✔
211
        if (state->ec)
112✔
212
            throw boost_io_error(state->ec);
2✔
213
        else
214
            return state->bytes_transferred;
220✔
215
    }
112✔
216

217
    /// Sends multiple heaps synchronously, from a pre-built heap_reference_list
218
    item_pointer_t send_heaps_hrl(const heap_reference_list &heaps, group_mode mode)
72✔
219
    {
220
        return send_heaps(heaps.get_heaps(), mode);
72✔
221
    }
222
};
223

224
struct callback_item
225
{
226
    py::handle callback;
227
    std::vector<py::handle> heaps;  // kept here because they can only be freed with the GIL
228
    boost::system::error_code ec;
229
    item_pointer_t bytes_transferred;
230
};
231

232
static void free_callback_items(const std::vector<callback_item> &callbacks)
×
233
{
234
    for (const callback_item &item : callbacks)
×
235
    {
236
        for (py::handle h : item.heaps)
×
237
            h.dec_ref();
×
238
        if (item.callback)
×
239
            item.callback.dec_ref();
×
240
    }
241
}
×
242

243
template<typename Base>
244
class asyncio_stream_wrapper : public Base
245
{
246
private:
247
    semaphore_fd sem;
248
    std::vector<callback_item> callbacks;
249
    std::mutex callbacks_mutex;
250

251
    // Prevent copying: the callbacks vector cannot sanely be copied
252
    asyncio_stream_wrapper(const asyncio_stream_wrapper &) = delete;
253
    asyncio_stream_wrapper &operator=(const asyncio_stream_wrapper &) = delete;
254

255
    void handler(py::handle callback_ptr, std::vector<py::handle> h_ptr,
221✔
256
                 const boost::system::error_code &ec, item_pointer_t bytes_transferred)
257
    {
258
        bool was_empty;
259
        {
260
            std::unique_lock<std::mutex> lock(callbacks_mutex);
221✔
261
            was_empty = callbacks.empty();
221✔
262
            callbacks.push_back(callback_item{callback_ptr, std::move(h_ptr), ec, bytes_transferred});
221✔
263
        }
221✔
264
        if (was_empty)
221✔
265
            sem.put();
221✔
266
    }
221✔
267

268
public:
269
    using Base::Base;
270

271
    int get_fd() const { return sem.get_fd(); }
428✔
272

273
    bool async_send_heap_obj(
166✔
274
        py::object h,
275
        py::object callback,
276
        s_item_pointer_t cnt = -1,
277
        std::size_t substream_index = 0,
278
        double rate = -1.0)
279
    {
280
        /* Normally the callback should not refer to this, since it could have
281
         * been reaped by the time the callback occurs. We rely on Python to
282
         * hang on to a reference to self.
283
         *
284
         * The callback and heap are passed around by raw reference, because
285
         * it is not safe to use incref/decref operations without the GIL, and
286
         * attempting to use py::object instead of py::handle tends to cause
287
         * these operations to occur without it being obvious.
288
         */
289
        py::handle h_ptr = h.ptr();
166✔
290
        py::handle callback_ptr = callback.ptr();
166✔
291
        h_ptr.inc_ref();
166✔
292
        callback_ptr.inc_ref();
166✔
293
        return Base::async_send_heap(
332✔
294
            h.cast<const heap_wrapper &>(),
166✔
295
            [this, callback_ptr, h_ptr] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
332✔
296
            {
297
                handler(callback_ptr, {h_ptr}, ec, bytes_transferred);
166✔
298
            },
299
            cnt, substream_index, rate);
332✔
300
    }
301

302
    bool async_send_heaps_obj(const std::vector<heap_reference> &heaps,
19✔
303
                              py::object callback, group_mode mode)
304
    {
305
        // See comments in async_send_heap_obj
306
        std::vector<py::handle> h_ptrs;
19✔
307
        h_ptrs.reserve(heaps.size());
19✔
308
        for (const auto &h : heaps)
92✔
309
            h_ptrs.push_back(py::cast(static_cast<const heap_wrapper *>(&h.heap)).release());
73✔
310
        py::handle callback_ptr = callback.ptr();
19✔
311
        callback_ptr.inc_ref();
19✔
312
        return Base::async_send_heaps(
38✔
313
            heaps.begin(), heaps.end(),
314
            [this, callback_ptr, h_ptrs = std::move(h_ptrs)] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
57✔
315
            {
316
                handler(callback_ptr, std::move(h_ptrs), ec, bytes_transferred);
19✔
317
            },
318
            mode);
38✔
319
    }
19✔
320

321
    // Overload that takes a HeapReferenceList
322
    bool async_send_heaps_hrl(const heap_reference_list &heaps,
36✔
323
                              py::object callback, group_mode mode)
324
    {
325
        /* In this overload, we just keep the heap_reference_list alive (in Python),
326
         * and it in turn keeps the individual heaps alive - this requires less
327
         * reference counting.
328
         */
329
        py::handle h_ptr = py::cast(&heaps).release();
36✔
330
        py::handle callback_ptr = callback.ptr();
36✔
331
        callback_ptr.inc_ref();
36✔
332
        return Base::async_send_heaps(
144✔
333
            heaps.get_heaps().begin(), heaps.get_heaps().end(),
72✔
334
            [this, callback_ptr, h_ptr] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
72✔
335
            {
336
                handler(callback_ptr, {h_ptr}, ec, bytes_transferred);
36✔
337
            },
338
            mode);
72✔
339
    }
340

341
    void process_callbacks()
221✔
342
    {
343
        semaphore_get(sem, gil_release_tag());
221✔
344
        std::vector<callback_item> current_callbacks;
221✔
345
        {
346
            std::unique_lock<std::mutex> lock(callbacks_mutex);
221✔
347
            current_callbacks.swap(callbacks);
221✔
348
        }
221✔
349
        try
350
        {
351
            for (callback_item &item : current_callbacks)
442✔
352
            {
353
                while (!item.heaps.empty())
496✔
354
                {
355
                    item.heaps.back().dec_ref();
275✔
356
                    item.heaps.pop_back();
275✔
357
                }
358
                item.heaps.shrink_to_fit();
221✔
359
                py::object callback = py::reinterpret_steal<py::object>(item.callback);
221✔
360
                item.callback = py::handle();
221✔
361
                callback(make_io_error(item.ec), item.bytes_transferred);
221✔
362
            }
363
        }
364
        catch (py::error_already_set &e)
×
365
        {
366
            log_warning("send callback raised Python exception; expect deadlocks!");
×
367
            free_callback_items(current_callbacks);
×
368
            throw;
×
369
        }
370
        catch (std::bad_alloc &e)
×
371
        {
372
            /* If we're out of memory we might not be able to construct a log
373
             * message. Just rely on Python to report an error.
374
             */
375
            free_callback_items(current_callbacks);
×
376
            throw;
×
377
        }
378
        catch (std::exception &e)
×
379
        {
380
            log_warning("unexpected error in process_callbacks: %1%", e.what());
×
381
            free_callback_items(current_callbacks);
×
382
            throw;
×
383
        }
384
    }
221✔
385

386
    ~asyncio_stream_wrapper()
113✔
387
    {
388
        for (const callback_item &item : callbacks)
93✔
389
        {
390
            for (py::handle h : item.heaps)
×
391
                h.dec_ref();
×
392
            item.callback.dec_ref();
×
393
        }
394
    }
206✔
395
};
396

397
static boost::asio::ip::address make_address(
482✔
398
    boost::asio::io_context &io_context, const std::string &hostname)
399
{
400
    py::gil_scoped_release gil;
482✔
401
    return make_address_no_release(io_context, hostname,
402
                                   boost::asio::ip::resolver_query_base::flags(0));
963✔
403
}
482✔
404

405
template<typename Protocol>
406
static typename Protocol::endpoint make_endpoint(
345✔
407
    boost::asio::io_context &io_context, const std::string &hostname, std::uint16_t port)
408
{
409
    return typename Protocol::endpoint(make_address(io_context, hostname), port);
345✔
410
}
411

412
template<typename Protocol>
413
static std::vector<typename Protocol::endpoint> make_endpoints(
199✔
414
    boost::asio::io_context &io_context, const std::vector<std::pair<std::string, std::uint16_t>> &endpoints)
415
{
416
    std::vector<typename Protocol::endpoint> out;
199✔
417
    out.reserve(endpoints.size());
199✔
418
    for (const auto &[host, port] : endpoints)
544✔
419
        out.push_back(make_endpoint<Protocol>(io_context, host, port));
345✔
420
    return out;
199✔
421
}
×
422

423
template<typename Base>
424
class udp_stream_wrapper : public Base
425
{
426
public:
427
    udp_stream_wrapper(
70✔
428
        io_context_ref io_context,
429
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
430
        const stream_config &config,
431
        std::size_t buffer_size,
432
        const std::string &interface_address)
433
        : Base(
434
            io_context,
435
            make_endpoints<boost::asio::ip::udp>(*io_context, endpoints),
436
            config, buffer_size,
437
            make_address(*io_context, interface_address))
74✔
438
    {
439
    }
68✔
440

441
    udp_stream_wrapper(
×
442
        io_context_ref io_context,
443
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
444
        const stream_config &config,
445
        std::size_t buffer_size,
446
        int ttl)
447
        : Base(
448
            io_context,
449
            make_endpoints<boost::asio::ip::udp>(*io_context, endpoints),
UNCOV
450
            config, buffer_size, ttl)
×
451
    {
452
    }
×
453

454
    udp_stream_wrapper(
20✔
455
        io_context_ref io_context,
456
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
457
        const stream_config &config,
458
        std::size_t buffer_size,
459
        int ttl,
460
        const std::string &interface_address)
461
        : Base(
462
            io_context,
463
            make_endpoints<boost::asio::ip::udp>(*io_context, endpoints),
464
            config, buffer_size, ttl,
465
            interface_address.empty() ?
20✔
466
                boost::asio::ip::address() :
467
                make_address(*io_context, interface_address))
40✔
468
    {
469
    }
20✔
470

471
    udp_stream_wrapper(
20✔
472
        io_context_ref io_context,
473
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
474
        const stream_config &config,
475
        std::size_t buffer_size,
476
        int ttl,
477
        unsigned int interface_index)
478
        : Base(
479
            io_context,
480
            make_endpoints<boost::asio::ip::udp>(*io_context, endpoints),
481
            config, buffer_size, ttl, interface_index)
20✔
482
    {
483
    }
20✔
484

485
    udp_stream_wrapper(
40✔
486
        io_context_ref io_context,
487
        const socket_wrapper<boost::asio::ip::udp::socket> &socket,
488
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
489
        const stream_config &config)
490
        : Base(
491
            io_context,
492
            socket.copy(*io_context),
493
            make_endpoints<boost::asio::ip::udp>(*io_context, endpoints),
494
            config)
40✔
495
    {
496
    }
40✔
497
};
498

499
#if SPEAD2_USE_IBV
500

501
/* Managing the endpoint and memory region lists requires some sleight of
502
 * hand. We store a separate copy in the wrapper in a Python-centric format.
503
 * When constructing the stream, we make a copy with the C++ view.
504
 */
505
class udp_ibv_config_wrapper : public udp_ibv_config
506
{
507
public:
508
    std::vector<std::pair<std::string, std::uint16_t>> py_endpoints;
509
    std::vector<py::buffer> py_memory_regions;
510
    std::string py_interface_address;
511
};
512

513
template<typename Base>
514
class udp_ibv_stream_wrapper : public Base
515
{
516
private:
517
    // Keeps the buffer requests alive
518
    std::vector<py::buffer_info> buffer_infos;
519

520
public:
521
    udp_ibv_stream_wrapper(
3✔
522
        std::shared_ptr<thread_pool> pool,
523
        const stream_config &config,
524
        const udp_ibv_config &ibv_config,
525
        std::vector<py::buffer_info> &&buffer_infos)
526
        : Base(pool,
527
               config,
528
               ibv_config),
529
        buffer_infos(std::move(buffer_infos))
6✔
530
    {
531
    }
×
532
};
533
#endif
534

535
class bytes_stream : private std::stringbuf, public stream_wrapper<streambuf_stream>
536
{
537
public:
538
    bytes_stream(std::shared_ptr<thread_pool> pool, const stream_config &config = stream_config())
27✔
539
        : stream_wrapper<streambuf_stream>(std::move(pool), *this, config)
27✔
540
    {
541
    }
27✔
542

543
    py::bytes getvalue() const
19✔
544
    {
545
        return str();
38✔
546
    }
547
};
548

549
template<typename T>
550
static py::class_<T, stream> udp_stream_register(py::module &m, const char *name)
10✔
551
{
552
    using namespace pybind11::literals;
553

554
    return py::class_<T, stream>(m, name)
20✔
555
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, std::string>(),
40✔
556
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
557
             "config"_a = stream_config(),
20✔
558
             "buffer_size"_a = T::default_buffer_size,
20✔
559
             "interface_address"_a = std::string())
20✔
560
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, int>(),
30✔
561
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
562
             "config"_a = stream_config(),
20✔
563
             "buffer_size"_a = T::default_buffer_size,
10✔
564
             "ttl"_a)
20✔
565
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, int, std::string>(),
30✔
566
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
567
             "config"_a = stream_config(),
20✔
568
             "buffer_size"_a = T::default_buffer_size,
10✔
569
             "ttl"_a,
10✔
570
             "interface_address"_a)
20✔
571
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, int, unsigned int>(),
30✔
572
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
573
             "config"_a = stream_config(),
20✔
574
             "buffer_size"_a = T::default_buffer_size,
10✔
575
             "ttl"_a,
10✔
576
             "interface_index"_a)
20✔
577
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const socket_wrapper<boost::asio::ip::udp::socket> &, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &>(),
20✔
578
             "thread_pool"_a.none(false), "socket"_a, "endpoints"_a,
20✔
579
             "config"_a = stream_config())
20✔
580

581
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &T::default_buffer_size);
20✔
582
}
583

584
#if SPEAD2_USE_IBV
585
template<typename T>
586
static py::class_<T, stream> udp_ibv_stream_register(py::module &m, const char *name)
10✔
587
{
588
    using namespace pybind11::literals;
589

590
    return py::class_<T, stream>(m, name)
20✔
591
        .def(py::init([](std::shared_ptr<thread_pool_wrapper> thread_pool,
28✔
592
                         const stream_config &config,
593
                         const udp_ibv_config_wrapper &ibv_config_wrapper)
594
            {
595
                udp_ibv_config ibv_config = ibv_config_wrapper;
8✔
596
                ibv_config.set_endpoints(
10✔
597
                    make_endpoints<boost::asio::ip::udp>(
598
                        thread_pool->get_io_context(),
8✔
599
                        ibv_config_wrapper.py_endpoints));
8✔
600
                ibv_config.set_interface_address(
6✔
601
                    make_address(thread_pool->get_io_context(),
6✔
602
                                 ibv_config_wrapper.py_interface_address));
6✔
603
                std::vector<std::pair<const void *, std::size_t>> regions;
4✔
604
                std::vector<py::buffer_info> buffer_infos;
4✔
605
                regions.reserve(ibv_config_wrapper.py_memory_regions.size());
4✔
606
                buffer_infos.reserve(regions.size());
4✔
607
                for (auto &buffer : ibv_config_wrapper.py_memory_regions)
7✔
608
                {
609
                    buffer_infos.push_back(request_buffer_info(buffer, PyBUF_C_CONTIGUOUS));
3✔
610
                    regions.emplace_back(
3✔
611
                        buffer_infos.back().ptr,
3✔
612
                        buffer_infos.back().itemsize * buffer_infos.back().size);
6✔
613
                }
614
                ibv_config.set_memory_regions(regions);
4✔
615

616
                return new T(std::move(thread_pool), config, ibv_config, std::move(buffer_infos));
9✔
617
            }),
16✔
618
            "thread_pool"_a.none(false),
20✔
619
            "config"_a = stream_config(),
10✔
620
            "udp_ibv_config"_a);
40✔
621
}
622
#endif
623

624
template<typename Base>
625
class tcp_stream_wrapper : public Base
626
{
627
public:
628
    /* All wrapping constructors that use a connect_handler take it as the
629
     * first argument, to faciliate the meta-programming used by registration
630
     * code.
631
     */
632
    template<typename ConnectHandler>
633
    tcp_stream_wrapper(
41✔
634
        ConnectHandler&& connect_handler,
635
        io_context_ref io_context,
636
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
637
        const stream_config &config,
638
        std::size_t buffer_size,
639
        const std::string &interface_address)
640
        : Base(io_context, std::forward<ConnectHandler>(connect_handler),
641
               make_endpoints<boost::asio::ip::tcp>(*io_context, endpoints),
642
               config, buffer_size, make_address(*io_context, interface_address))
41✔
643
    {
644
    }
41✔
645

646
    tcp_stream_wrapper(
26✔
647
        io_context_ref io_context,
648
        const socket_wrapper<boost::asio::ip::tcp::socket> &socket,
649
        const stream_config &config)
650
        : Base(io_context, socket.copy(*io_context), config)
26✔
651
    {
652
    }
26✔
653
};
654

655
/* This is a different design than the other registration functions, because
656
 * the TCP sync and async classes are constructed very differently (because of
657
 * the handling around connecting). The callback is called (several times) with
658
 * a function object that generates the unique_ptr<T> plus additional arguments
659
 * to pass to py::class_::def.
660
 */
661
template<typename Registrar>
662
static py::class_<typename Registrar::stream_type, stream> tcp_stream_register(py::module &m, const char *name)
10✔
663
{
664
    using namespace pybind11::literals;
665

666
    typedef typename Registrar::stream_type T;
667
    py::class_<T, stream> class_(m, name);
10✔
668
    class_
669
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
20✔
670
                      const socket_wrapper<boost::asio::ip::tcp::socket> &,
671
                      const stream_config &>(),
10✔
672
             "thread_pool"_a.none(false), "socket"_a, "config"_a = stream_config())
30✔
673
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &T::default_buffer_size);
10✔
674
    Registrar::template apply<
675
            std::shared_ptr<thread_pool_wrapper>,
676
            const std::vector<std::pair<std::string, std::uint16_t>> &,
677
            const stream_config &, std::size_t, const std::string &>(
40✔
678
        class_,
679
        "thread_pool"_a.none(false), "endpoints"_a,
20✔
680
        "config"_a = stream_config(),
20✔
681
        "buffer_size"_a = T::default_buffer_size,
20✔
682
        "interface_address"_a = "");
10✔
683
    return class_;
10✔
684
}
×
685

686
// Function object passed to tcp_stream_register to register the synchronous class
687
class tcp_stream_register_sync
688
{
689
private:
690
    struct connect_state
691
    {
692
        semaphore sem;
693
        boost::system::error_code ec;
694
    };
695

696
public:
697
    typedef tcp_stream_wrapper<stream_wrapper<tcp_stream>> stream_type;
698

699
private:
700
    /* Template args are explicit, hence no Args&&... */
701
    template<typename... Args>
702
    static std::unique_ptr<stream_type> construct(Args... args)
27✔
703
    {
704
        std::shared_ptr<connect_state> state = std::make_shared<connect_state>();
27✔
705
        auto connect_handler = [state](const boost::system::error_code &ec)
27✔
706
        {
707
            state->ec = ec;
27✔
708
            state->sem.put();
27✔
709
        };
710
        auto stream = std::make_unique<stream_type>(connect_handler, std::forward<Args>(args)...);
27✔
711
        semaphore_get(state->sem, gil_release_tag());
27✔
712
        if (state->ec)
27✔
713
            throw boost_io_error(state->ec);
1✔
714
        return stream;
52✔
715
    }
29✔
716

717
public:
718
    template<typename... Args, typename... Extra>
719
    static void apply(py::class_<stream_type, stream> &class_, Extra&&... extra)
5✔
720
    {
721
        class_.def(py::init(&tcp_stream_register_sync::construct<Args...>),
5✔
722
                   std::forward<Extra>(extra)...);
5✔
723
    }
5✔
724
};
725

726
// Function object passed to tcp_stream_register to register the asynchronous class
727
class tcp_stream_register_async
728
{
729
private:
730
    struct connect_state
731
    {
732
        py::handle callback;
733
    };
734

735
public:
736
    typedef tcp_stream_wrapper<asyncio_stream_wrapper<tcp_stream>> stream_type;
737

738
private:
739
    /* Template args are explicit, hence no Args&&... */
740
    template<typename... Args>
741
    static std::unique_ptr<stream_type> construct(py::object callback, Args... args)
14✔
742
    {
743
        std::shared_ptr<connect_state> state = std::make_shared<connect_state>();
14✔
744
        auto connect_handler = [state](boost::system::error_code ec)
28✔
745
        {
746
            py::gil_scoped_acquire gil;
14✔
747
            py::object callback = py::reinterpret_steal<py::object>(state->callback);
14✔
748
            callback(make_io_error(ec));
14✔
749
        };
14✔
750
        auto stream = std::make_unique<stream_type>(connect_handler, std::forward<Args>(args)...);
14✔
751
        /* The state takes over the references. These are dealt with using
752
         * py::handle rather than py::object to avoid manipulating refcounts
753
         * without the GIL. Note that while the connect_handler could occur
754
         * immediately, the GIL serialises access to state.
755
         */
756
        state->callback = callback.release();
14✔
757
        return stream;
28✔
758
    }
14✔
759

760
public:
761
    template<typename... Args, typename... Extra>
762
    static void apply(py::class_<stream_type, stream> &class_, Extra&&... extra)
5✔
763
    {
764
        using namespace pybind11::literals;
765
        class_.def(py::init(&tcp_stream_register_async::construct<Args...>),
5✔
766
                   "callback"_a, std::forward<Extra>(extra)...);
5✔
767
    }
5✔
768
};
769

770
template<typename T>
771
static py::class_<T, stream> inproc_stream_register(py::module &m, const char *name)
10✔
772
{
773
    using namespace pybind11::literals;
774
    return py::class_<T, stream>(m, name)
20✔
775
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::shared_ptr<inproc_queue>> &, const stream_config &>(),
20✔
776
             "thread_pool"_a.none(false), "queues"_a, "config"_a = stream_config())
30✔
777
        .def_property_readonly("queues", &T::get_queues);
30✔
778
}
779

780
template<typename T>
781
static void sync_stream_register(py::class_<T, stream> &stream_class)
25✔
782
{
783
    using namespace pybind11::literals;
784
    stream_class.def("send_heap", &T::send_heap,
100✔
785
                     "heap"_a, "cnt"_a = s_item_pointer_t(-1),
75✔
786
                     "substream_index"_a = std::size_t(0),
50✔
787
                     "rate"_a = -1.0);
25✔
788
    stream_class.def("send_heaps", &T::send_heaps_hrl,
25✔
789
                     "heaps"_a, "mode"_a);
25✔
790
    stream_class.def("send_heaps", &T::send_heaps,
25✔
791
                     "heaps"_a, "mode"_a);
25✔
792
}
25✔
793

794
template<typename T>
795
static void async_stream_register(py::class_<T, stream> &stream_class)
20✔
796
{
797
    using namespace pybind11::literals;
798
    stream_class
799
        .def_property_readonly("fd", &T::get_fd)
20✔
800
        .def("async_send_heap", &T::async_send_heap_obj,
80✔
801
             "heap"_a, "callback"_a, "cnt"_a = s_item_pointer_t(-1),
60✔
802
             "substream_index"_a = std::size_t(0),
40✔
803
             "rate"_a = -1.0)
20✔
804
        .def("async_send_heaps", &T::async_send_heaps_hrl,
20✔
805
             "heaps"_a, "callback"_a, "mode"_a)
20✔
806
        .def("async_send_heaps", &T::async_send_heaps_obj,
20✔
807
             "heaps"_a, "callback"_a, "mode"_a)
20✔
808
        .def("flush", &T::flush)
20✔
809
        .def("process_callbacks", &T::process_callbacks);
20✔
810
}
20✔
811

812
/// Register the send module with Boost.Python
813
py::module register_module(py::module &parent)
5✔
814
{
815
    using namespace pybind11::literals;
816

817
    py::module m = parent.def_submodule("send");
5✔
818

819
    py::class_<heap_wrapper>(m, "Heap")
5✔
820
        .def(py::init<flavour>(), "flavour"_a = flavour())
10✔
821
        .def_property_readonly("flavour", &heap_wrapper::get_flavour)
5✔
822
        .def("add_item", &heap_wrapper::add_item, "item"_a)
5✔
823
        .def("add_descriptor", &heap_wrapper::add_descriptor, "descriptor"_a)
5✔
824
        .def("add_start", &heap_wrapper::add_start)
5✔
825
        .def("add_end", &heap_wrapper::add_end)
5✔
826
        .def_property("repeat_pointers",
5✔
827
                      &heap_wrapper::get_repeat_pointers,
×
828
                      &heap_wrapper::set_repeat_pointers);
5✔
829

830
    // keep_alive is safe to use here in spite of pybind/pybind11#856, because
831
    // the destructor of packet_generator doesn't reference the heap.
832
    py::class_<packet_generator>(m, "PacketGenerator")
5✔
833
        .def(py::init<heap_wrapper &, item_pointer_t, std::size_t>(),
5✔
834
             "heap"_a, "cnt"_a, "max_packet_size"_a,
5✔
835
             py::keep_alive<1, 2>())
×
836
        .def("__iter__", [](py::object self) { return self; })
17✔
837
        .def("__next__", &packet_generator_next);
5✔
838

839
    py::enum_<rate_method>(m, "RateMethod")
10✔
840
        .value("SW", rate_method::SW)
5✔
841
        .value("HW", rate_method::HW)
5✔
842
        .value("AUTO", rate_method::AUTO);
5✔
843

844
    py::enum_<group_mode>(m, "GroupMode")
10✔
845
        .value("ROUND_ROBIN", group_mode::ROUND_ROBIN)
5✔
846
        .value("SERIAL", group_mode::SERIAL);
5✔
847

848
    py::class_<heap_reference>(m, "HeapReference")
5✔
849
        .def(py::init<const heap_wrapper &, s_item_pointer_t, std::size_t, double>(),
5✔
850
             "heap"_a, py::kw_only(), "cnt"_a = -1, "substream_index"_a = 0, "rate"_a = -1.0,
10✔
851
             py::keep_alive<1, 2>())
5✔
852
        .def_property_readonly(
×
853
            "heap",
854
            [](const heap_reference &h) { return static_cast<const heap_wrapper *>(&h.heap); },
×
855
            py::return_value_policy::reference)
10✔
856
        .def_readwrite("cnt", &heap_reference::cnt)
5✔
857
        .def_readwrite("substream_index", &heap_reference::substream_index)
5✔
858
        .def_readwrite("rate", &heap_reference::rate);
5✔
859

860
    py::class_<heap_reference_list>(m, "HeapReferenceList")
5✔
861
        .def(py::init<std::vector<heap_reference>>(), "heaps"_a)
5✔
862
        .def("__len__", &heap_reference_list::size)
5✔
863
        .def("__getitem__", &heap_reference_list::get_slice);
5✔
864

865
    py::class_<stream_config>(m, "StreamConfig")
5✔
866
        .def(py::init(&data_class_constructor<stream_config>))
5✔
867
        .def_property("max_packet_size",
10✔
868
                      &stream_config::get_max_packet_size,
5✔
869
                      SPEAD2_PTMF_VOID(stream_config, set_max_packet_size))
5✔
870
        .def_property("rate",
10✔
871
                      &stream_config::get_rate,
5✔
872
                      SPEAD2_PTMF_VOID(stream_config, set_rate))
5✔
873
        .def_property("burst_size",
10✔
874
                      &stream_config::get_burst_size,
5✔
875
                      SPEAD2_PTMF_VOID(stream_config, set_burst_size))
5✔
876
        .def_property("max_heaps",
10✔
877
                      &stream_config::get_max_heaps,
5✔
878
                      SPEAD2_PTMF_VOID(stream_config, set_max_heaps))
5✔
879
        .def_property("burst_rate_ratio",
10✔
880
                      &stream_config::get_burst_rate_ratio,
5✔
881
                      SPEAD2_PTMF_VOID(stream_config, set_burst_rate_ratio))
5✔
882
        .def_property("rate_method",
10✔
883
                      &stream_config::get_rate_method,
5✔
884
                      SPEAD2_PTMF_VOID(stream_config, set_rate_method))
5✔
885
        .def_property_readonly("burst_rate",
10✔
886
                               &stream_config::get_burst_rate)
10✔
887
        .def_readonly_static("DEFAULT_MAX_PACKET_SIZE", &stream_config::default_max_packet_size)
5✔
888
        .def_readonly_static("DEFAULT_MAX_HEAPS", &stream_config::default_max_heaps)
5✔
889
        .def_readonly_static("DEFAULT_BURST_SIZE", &stream_config::default_burst_size)
5✔
890
        .def_readonly_static("DEFAULT_BURST_RATE_RATIO", &stream_config::default_burst_rate_ratio)
5✔
891
        .def_readonly_static("DEFAULT_RATE_METHOD", &stream_config::default_rate_method);
5✔
892

893
    py::class_<stream>(m, "Stream")
5✔
894
        .def("set_cnt_sequence", &stream::set_cnt_sequence,
5✔
895
             "next"_a, "step"_a)
5✔
896
        .def_property_readonly("num_substreams", &stream::get_num_substreams);
5✔
897

898
    {
899
        auto stream_class = udp_stream_register<udp_stream_wrapper<stream_wrapper<udp_stream>>>(m, "UdpStream");
5✔
900
        sync_stream_register(stream_class);
5✔
901
    }
5✔
902
    {
903
        auto stream_class = udp_stream_register<udp_stream_wrapper<asyncio_stream_wrapper<udp_stream>>>(m, "UdpStreamAsyncio");
5✔
904
        async_stream_register(stream_class);
5✔
905
    }
5✔
906

907
#if SPEAD2_USE_IBV
908
    py::class_<udp_ibv_config_wrapper>(m, "UdpIbvConfig")
5✔
909
        .def(py::init(&data_class_constructor<udp_ibv_config_wrapper>))
5✔
910
        .def_readwrite("endpoints", &udp_ibv_config_wrapper::py_endpoints)
5✔
911
        .def_readwrite("memory_regions", &udp_ibv_config_wrapper::py_memory_regions)
5✔
912
        .def_readwrite("interface_address", &udp_ibv_config_wrapper::py_interface_address)
5✔
913
        .def_property("buffer_size",
10✔
914
                      &udp_ibv_config_wrapper::get_buffer_size,
5✔
915
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_buffer_size))
5✔
916
        .def_property("ttl",
10✔
917
                      &udp_ibv_config_wrapper::get_ttl,
5✔
918
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_ttl))
5✔
919
        .def_property("comp_vector",
10✔
920
                      &udp_ibv_config_wrapper::get_comp_vector,
5✔
921
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_comp_vector))
5✔
922
        .def_property("max_poll",
10✔
923
                      &udp_ibv_config_wrapper::get_max_poll,
5✔
924
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_poll))
10✔
925
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &udp_ibv_config_wrapper::default_buffer_size)
5✔
926
        .def_readonly_static("DEFAULT_MAX_POLL", &udp_ibv_config_wrapper::default_max_poll);
5✔
927

928
    {
929
        auto stream_class = udp_ibv_stream_register<udp_ibv_stream_wrapper<stream_wrapper<udp_ibv_stream>>>(m, "UdpIbvStream");
5✔
930
        sync_stream_register(stream_class);
5✔
931
    }
5✔
932
    {
933
        auto stream_class = udp_ibv_stream_register<udp_ibv_stream_wrapper<asyncio_stream_wrapper<udp_ibv_stream>>>(m, "UdpIbvStreamAsyncio");
5✔
934
        async_stream_register(stream_class);
5✔
935
    }
5✔
936
#endif
937

938
    {
939
        auto stream_class = tcp_stream_register<tcp_stream_register_sync>(m, "TcpStream");
5✔
940
        sync_stream_register(stream_class);
5✔
941
    }
5✔
942
    {
943
        auto stream_class = tcp_stream_register<tcp_stream_register_async>(m, "TcpStreamAsyncio");
5✔
944
        async_stream_register(stream_class);
5✔
945
    }
5✔
946

947
    {
948
        py::class_<bytes_stream, stream> stream_class(m, "BytesStream", py::multiple_inheritance());
5✔
949
        stream_class
950
            .def(py::init<std::shared_ptr<thread_pool_wrapper>, const stream_config &>(),
5✔
951
                 "thread_pool"_a.none(false), "config"_a = stream_config())
5✔
952
            .def("getvalue", &bytes_stream::getvalue);
5✔
953
        sync_stream_register(stream_class);
5✔
954
    }
5✔
955

956
    {
957
        auto stream_class = inproc_stream_register<stream_wrapper<inproc_stream>>(m, "InprocStream");
5✔
958
        sync_stream_register(stream_class);
5✔
959
    }
5✔
960
    {
961
        auto stream_class = inproc_stream_register<asyncio_stream_wrapper<inproc_stream>>(m, "InprocStreamAsyncio");
5✔
962
        async_stream_register(stream_class);
5✔
963
    }
5✔
964

965
    return m;
5✔
966
}
×
967

968
} // namespace send
969
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc