• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6084906674

05 Sep 2023 12:55PM UTC coverage: 78.235% (-0.2%) from 78.424%
6084906674

Pull #266

github

bmerry
Simplify wrapping of pointers to members

I did some microbenching and found no evidence that the SPEAD2_PTMF
wrapper was improving performance at all, compared to just passing the
pointer-to-member-function to pybind11, even though the latter involves
making a call via a pointer (bad for branch prediction). So I've
eliminated the SPEAD2_PTMF macro entirely, and simplified
SPEAD2_PTMF_VOID to use a more obvious approach. The latter also isn't
needed for any const member functions so I eliminated that support too.
Pull Request #266: Simplify wrapping of pointers to members

107 of 107 new or added lines in 4 files covered. (100.0%)

5291 of 6763 relevant lines covered (78.23%)

56505.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.92
/src/py_send.cpp
1
/* Copyright 2015, 2017, 2019-2021, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <boost/system/system_error.hpp>
24
#include <stdexcept>
25
#include <mutex>
26
#include <vector>
27
#include <utility>
28
#include <memory>
29
#include <unistd.h>
30
#include <spead2/send_heap.h>
31
#include <spead2/send_stream.h>
32
#include <spead2/send_udp.h>
33
#include <spead2/send_udp_ibv.h>
34
#include <spead2/send_tcp.h>
35
#include <spead2/send_streambuf.h>
36
#include <spead2/send_inproc.h>
37
#include <spead2/common_thread_pool.h>
38
#include <spead2/common_semaphore.h>
39
#include <spead2/py_common.h>
40
#include "common_unique.h"
41

42
namespace py = pybind11;
43

44
namespace spead2
45
{
46
namespace send
47
{
48

49
using spead2::detail::discard_result;
50

51
class heap_wrapper : public heap
52
{
53
private:
54
    std::vector<py::buffer_info> item_buffers;
55

56
public:
57
    using heap::heap;
58
    void add_item(py::object item);
59
    void add_descriptor(py::object descriptor);
60
    flavour get_flavour() const;
61
};
62

63
void heap_wrapper::add_item(py::object item)
10,823✔
64
{
65
    std::int64_t id = item.attr("id").cast<std::int64_t>();
10,823✔
66
    py::buffer buffer = item.attr("to_buffer")().cast<py::buffer>();
21,646✔
67
    bool allow_immediate = item.attr("allow_immediate")().cast<bool>();
10,823✔
68
    item_buffers.emplace_back(request_buffer_info(buffer, PyBUF_C_CONTIGUOUS));
10,823✔
69
    heap::add_item(id, item_buffers.back().ptr,
10,823✔
70
                   item_buffers.back().itemsize * item_buffers.back().size,
10,823✔
71
                   allow_immediate);
72
}
10,823✔
73

74
void heap_wrapper::add_descriptor(py::object object)
1,218✔
75
{
76
    heap::add_descriptor(object.attr("to_raw")(heap::get_flavour()).cast<descriptor>());
1,218✔
77
}
1,218✔
78

79
flavour heap_wrapper::get_flavour() const
×
80
{
81
    return heap::get_flavour();
×
82
}
83

84
py::bytes packet_generator_next(packet_generator &gen)
25✔
85
{
86
    auto scratch = spead2::detail::make_unique_for_overwrite<std::uint8_t[]>(gen.get_max_packet_size());
25✔
87
    auto buffers = gen.next_packet(scratch.get());
25✔
88
    if (buffers.empty())
25✔
89
        throw py::stop_iteration();
12✔
90
    return py::bytes(std::string(boost::asio::buffers_begin(buffers),
26✔
91
                                 boost::asio::buffers_end(buffers)));
26✔
92
}
37✔
93

94
static py::object make_io_error(const boost::system::error_code &ec)
206✔
95
{
96
    if (ec)
206✔
97
    {
98
        py::object exc_class = py::reinterpret_borrow<py::object>(PyExc_IOError);
3✔
99
        return exc_class(ec.value(), ec.message());
6✔
100
    }
3✔
101
    else
102
        return py::none();
203✔
103
}
104

105
class heap_reference_list
106
{
107
private:
108
    std::vector<heap_reference> heaps;
109
    // Python references to the heaps, to keep them alive
110
    std::vector<py::object> objects;
111

112
public:
113
    heap_reference_list(std::vector<heap_reference> heaps);
114
    const std::vector<heap_reference> &get_heaps() const { return heaps; }
72✔
115
};
116

117
heap_reference_list::heap_reference_list(std::vector<heap_reference> heaps)
54✔
118
{
119
    objects.reserve(heaps.size());
54✔
120
    for (const heap_reference &h : heaps)
270✔
121
        objects.push_back(py::cast(static_cast<const heap_wrapper *>(&h.heap)));
216✔
122
    this->heaps = std::move(heaps);
54✔
123
}
54✔
124

125
template<typename Base>
126
class stream_wrapper : public Base
127
{
128
private:
129
    struct callback_state
130
    {
131
        /**
132
         * Semaphore triggered by the callback. It uses a semaphore rather
133
         * than a promise because a semaphore can be interrupted.
134
         */
135
        semaphore sem;
136
        /**
137
         * Error code from the callback.
138
         */
139
        boost::system::error_code ec;
140
        /**
141
         * Bytes transferred (encoded heap size).
142
         */
143
        item_pointer_t bytes_transferred = 0;
144
    };
145

146
public:
147
    using Base::Base;
148

149
    /// Sends heap synchronously
150
    item_pointer_t send_heap(const heap_wrapper &h, s_item_pointer_t cnt = -1, std::size_t substream_index = 0)
4,949✔
151
    {
152
        /* The semaphore state needs to be in shared_ptr because if we are
153
         * interrupted and throw an exception, it still needs to exist until
154
         * the heap is sent.
155
         */
156
        auto state = std::make_shared<callback_state>();
4,949✔
157
        Base::async_send_heap(h, [state] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
14,847✔
158
        {
159
            state->ec = ec;
4,949✔
160
            state->bytes_transferred = bytes_transferred;
4,949✔
161
            state->sem.put();
4,949✔
162
        }, cnt, substream_index);
163
        semaphore_get(state->sem, gil_release_tag());
4,949✔
164
        if (state->ec)
4,949✔
165
            throw boost_io_error(state->ec);
4✔
166
        else
167
            return state->bytes_transferred;
9,890✔
168
    }
4,949✔
169

170
    /// Sends multiple heaps synchronously
171
    item_pointer_t send_heaps(const std::vector<heap_reference> &heaps, group_mode mode)
76✔
172
    {
173
        // See comments in send_heap
174
        auto state = std::make_shared<callback_state>();
76✔
175
        Base::async_send_heaps(
76✔
176
            heaps.begin(), heaps.end(),
177
            [state] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
152✔
178
            {
179
                state->ec = ec;
76✔
180
                state->bytes_transferred = bytes_transferred;
76✔
181
                state->sem.put();
76✔
182
            }, mode);
183
        semaphore_get(state->sem, gil_release_tag());
76✔
184
        if (state->ec)
76✔
185
            throw boost_io_error(state->ec);
2✔
186
        else
187
            return state->bytes_transferred;
148✔
188
    }
76✔
189

190
    /// Sends multiple heaps synchronously, from a pre-built heap_reference_list
191
    item_pointer_t send_heaps_hrl(const heap_reference_list &heaps, group_mode mode)
36✔
192
    {
193
        return send_heaps(heaps.get_heaps(), mode);
36✔
194
    }
195
};
196

197
struct callback_item
198
{
199
    py::handle callback;
200
    std::vector<py::handle> heaps;  // kept here because they can only be freed with the GIL
201
    boost::system::error_code ec;
202
    item_pointer_t bytes_transferred;
203
};
204

205
static void free_callback_items(const std::vector<callback_item> &callbacks)
×
206
{
207
    for (const callback_item &item : callbacks)
×
208
    {
209
        for (py::handle h : item.heaps)
×
210
            h.dec_ref();
×
211
        if (item.callback)
×
212
            item.callback.dec_ref();
×
213
    }
214
}
×
215

216
template<typename Base>
217
class asyncio_stream_wrapper : public Base
218
{
219
private:
220
    semaphore_fd sem;
221
    std::vector<callback_item> callbacks;
222
    std::mutex callbacks_mutex;
223

224
    // Prevent copying: the callbacks vector cannot sanely be copied
225
    asyncio_stream_wrapper(const asyncio_stream_wrapper &) = delete;
226
    asyncio_stream_wrapper &operator=(const asyncio_stream_wrapper &) = delete;
227

228
    void handler(py::handle callback_ptr, std::vector<py::handle> h_ptr,
193✔
229
                 const boost::system::error_code &ec, item_pointer_t bytes_transferred)
230
    {
231
        bool was_empty;
232
        {
233
            std::unique_lock<std::mutex> lock(callbacks_mutex);
193✔
234
            was_empty = callbacks.empty();
193✔
235
            callbacks.push_back(callback_item{callback_ptr, std::move(h_ptr), ec, bytes_transferred});
193✔
236
        }
193✔
237
        if (was_empty)
193✔
238
            sem.put();
193✔
239
    }
193✔
240

241
public:
242
    using Base::Base;
243

244
    int get_fd() const { return sem.get_fd(); }
372✔
245

246
    bool async_send_heap_obj(py::object h, py::object callback,
156✔
247
                             s_item_pointer_t cnt = -1, std::size_t substream_index = 0)
248
    {
249
        /* Normally the callback should not refer to this, since it could have
250
         * been reaped by the time the callback occurs. We rely on Python to
251
         * hang on to a reference to self.
252
         *
253
         * The callback and heap are passed around by raw reference, because
254
         * it is not safe to use incref/decref operations without the GIL, and
255
         * attempting to use py::object instead of py::handle tends to cause
256
         * these operations to occur without it being obvious.
257
         */
258
        py::handle h_ptr = h.ptr();
156✔
259
        py::handle callback_ptr = callback.ptr();
156✔
260
        h_ptr.inc_ref();
156✔
261
        callback_ptr.inc_ref();
156✔
262
        return Base::async_send_heap(
312✔
263
            h.cast<const heap_wrapper &>(),
156✔
264
            [this, callback_ptr, h_ptr] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
312✔
265
            {
266
                handler(callback_ptr, {h_ptr}, ec, bytes_transferred);
156✔
267
            },
268
            cnt, substream_index);
312✔
269
    }
270

271
    bool async_send_heaps_obj(const std::vector<heap_reference> &heaps,
19✔
272
                              py::object callback, group_mode mode)
273
    {
274
        // See comments in async_send_heap_obj
275
        std::vector<py::handle> h_ptrs;
19✔
276
        h_ptrs.reserve(heaps.size());
19✔
277
        for (const auto &h : heaps)
92✔
278
            h_ptrs.push_back(py::cast(static_cast<const heap_wrapper *>(&h.heap)).release());
73✔
279
        py::handle callback_ptr = callback.ptr();
19✔
280
        callback_ptr.inc_ref();
19✔
281
        return Base::async_send_heaps(
38✔
282
            heaps.begin(), heaps.end(),
283
            [this, callback_ptr, h_ptrs = std::move(h_ptrs)] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
57✔
284
            {
285
                handler(callback_ptr, std::move(h_ptrs), ec, bytes_transferred);
19✔
286
            },
287
            mode);
38✔
288
    }
19✔
289

290
    // Overload that takes a HeapReferenceList
291
    bool async_send_heaps_hrl(const heap_reference_list &heaps,
18✔
292
                              py::object callback, group_mode mode)
293
    {
294
        /* In this overload, we just keep the heap_reference_list alive (in Python),
295
         * and it in turn keeps the individual heaps alive - this requires less
296
         * reference counting.
297
         */
298
        py::handle h_ptr = py::cast(&heaps).release();
18✔
299
        py::handle callback_ptr = callback.ptr();
18✔
300
        callback_ptr.inc_ref();
18✔
301
        return Base::async_send_heaps(
72✔
302
            heaps.get_heaps().begin(), heaps.get_heaps().end(),
36✔
303
            [this, callback_ptr, h_ptr] (const boost::system::error_code &ec, item_pointer_t bytes_transferred)
36✔
304
            {
305
                handler(callback_ptr, {h_ptr}, ec, bytes_transferred);
18✔
306
            },
307
            mode);
36✔
308
    }
309

310
    void process_callbacks()
193✔
311
    {
312
        semaphore_get(sem, gil_release_tag());
193✔
313
        std::vector<callback_item> current_callbacks;
193✔
314
        {
315
            std::unique_lock<std::mutex> lock(callbacks_mutex);
193✔
316
            current_callbacks.swap(callbacks);
193✔
317
        }
193✔
318
        try
319
        {
320
            for (callback_item &item : current_callbacks)
386✔
321
            {
322
                while (!item.heaps.empty())
440✔
323
                {
324
                    item.heaps.back().dec_ref();
247✔
325
                    item.heaps.pop_back();
247✔
326
                }
327
                item.heaps.shrink_to_fit();
193✔
328
                py::object callback = py::reinterpret_steal<py::object>(item.callback);
193✔
329
                item.callback = py::handle();
193✔
330
                callback(make_io_error(item.ec), item.bytes_transferred);
193✔
331
            }
332
        }
333
        catch (py::error_already_set &e)
×
334
        {
335
            log_warning("send callback raised Python exception; expect deadlocks!");
×
336
            free_callback_items(current_callbacks);
×
337
            throw;
×
338
        }
339
        catch (std::bad_alloc &e)
×
340
        {
341
            /* If we're out of memory we might not be able to construct a log
342
             * message. Just rely on Python to report an error.
343
             */
344
            free_callback_items(current_callbacks);
×
345
            throw;
×
346
        }
347
        catch (std::exception &e)
×
348
        {
349
            log_warning("unexpected error in process_callbacks: %1%", e.what());
×
350
            free_callback_items(current_callbacks);
×
351
            throw;
×
352
        }
353
    }
193✔
354

355
    ~asyncio_stream_wrapper()
108✔
356
    {
357
        for (const callback_item &item : callbacks)
89✔
358
        {
359
            for (py::handle h : item.heaps)
×
360
                h.dec_ref();
×
361
            item.callback.dec_ref();
×
362
        }
363
    }
197✔
364
};
365

366
static boost::asio::ip::address make_address(
467✔
367
    boost::asio::io_service &io_service, const std::string &hostname)
368
{
369
    py::gil_scoped_release gil;
467✔
370
    return make_address_no_release(io_service, hostname,
371
                                   boost::asio::ip::resolver_query_base::flags(0));
933✔
372
}
467✔
373

374
template<typename Protocol>
375
static typename Protocol::endpoint make_endpoint(
336✔
376
    boost::asio::io_service &io_service, const std::string &hostname, std::uint16_t port)
377
{
378
    return typename Protocol::endpoint(make_address(io_service, hostname), port);
336✔
379
}
380

381
template<typename Protocol>
382
static std::vector<typename Protocol::endpoint> make_endpoints(
190✔
383
    boost::asio::io_service &io_service, const std::vector<std::pair<std::string, std::uint16_t>> &endpoints)
384
{
385
    std::vector<typename Protocol::endpoint> out;
190✔
386
    out.reserve(endpoints.size());
190✔
387
    for (const auto &[host, port] : endpoints)
526✔
388
        out.push_back(make_endpoint<Protocol>(io_service, host, port));
336✔
389
    return out;
190✔
390
}
×
391

392
template<typename Base>
393
class udp_stream_wrapper : public Base
394
{
395
public:
396
    udp_stream_wrapper(
68✔
397
        io_service_ref io_service,
398
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
399
        const stream_config &config,
400
        std::size_t buffer_size,
401
        const std::string &interface_address)
402
        : Base(
403
            io_service,
404
            make_endpoints<boost::asio::ip::udp>(*io_service, endpoints),
405
            config, buffer_size,
406
            make_address(*io_service, interface_address))
72✔
407
    {
408
    }
66✔
409

410
    udp_stream_wrapper(
×
411
        io_service_ref io_service,
412
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
413
        const stream_config &config,
414
        std::size_t buffer_size,
415
        int ttl)
416
        : Base(
417
            io_service,
418
            make_endpoints<boost::asio::ip::udp>(*io_service, endpoints),
419
            config, buffer_size, ttl)
×
420
    {
421
    }
×
422

423
    udp_stream_wrapper(
19✔
424
        io_service_ref io_service,
425
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
426
        const stream_config &config,
427
        std::size_t buffer_size,
428
        int ttl,
429
        const std::string &interface_address)
430
        : Base(
431
            io_service,
432
            make_endpoints<boost::asio::ip::udp>(*io_service, endpoints),
433
            config, buffer_size, ttl,
434
            interface_address.empty() ?
19✔
435
                boost::asio::ip::address() :
436
                make_address(*io_service, interface_address))
38✔
437
    {
438
    }
19✔
439

440
    udp_stream_wrapper(
19✔
441
        io_service_ref io_service,
442
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
443
        const stream_config &config,
444
        std::size_t buffer_size,
445
        int ttl,
446
        unsigned int interface_index)
447
        : Base(
448
            io_service,
449
            make_endpoints<boost::asio::ip::udp>(*io_service, endpoints),
450
            config, buffer_size, ttl, interface_index)
19✔
451
    {
452
    }
19✔
453

454
    udp_stream_wrapper(
38✔
455
        io_service_ref io_service,
456
        const socket_wrapper<boost::asio::ip::udp::socket> &socket,
457
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
458
        const stream_config &config)
459
        : Base(
460
            io_service,
461
            socket.copy(*io_service),
462
            make_endpoints<boost::asio::ip::udp>(*io_service, endpoints),
463
            config)
38✔
464
    {
465
    }
38✔
466
};
467

468
#if SPEAD2_USE_IBV
469

470
/* Managing the endpoint and memory region lists requires some sleight of
471
 * hand. We store a separate copy in the wrapper in a Python-centric format.
472
 * When constructing the stream, we make a copy with the C++ view.
473
 */
474
class udp_ibv_config_wrapper : public udp_ibv_config
475
{
476
public:
477
    std::vector<std::pair<std::string, std::uint16_t>> py_endpoints;
478
    std::vector<py::buffer> py_memory_regions;
479
    std::string py_interface_address;
480
};
481

482
template<typename Base>
483
class udp_ibv_stream_wrapper : public Base
484
{
485
private:
486
    // Keeps the buffer requests alive
487
    std::vector<py::buffer_info> buffer_infos;
488

489
public:
490
    udp_ibv_stream_wrapper(
3✔
491
        std::shared_ptr<thread_pool> pool,
492
        const stream_config &config,
493
        const udp_ibv_config &ibv_config,
494
        std::vector<py::buffer_info> &&buffer_infos)
495
        : Base(pool,
496
               config,
497
               ibv_config),
498
        buffer_infos(std::move(buffer_infos))
6✔
499
    {
500
    }
×
501
};
502
#endif
503

504
class bytes_stream : private std::stringbuf, public stream_wrapper<streambuf_stream>
505
{
506
public:
507
    bytes_stream(std::shared_ptr<thread_pool> pool, const stream_config &config = stream_config())
25✔
508
        : stream_wrapper<streambuf_stream>(std::move(pool), *this, config)
25✔
509
    {
510
    }
25✔
511

512
    py::bytes getvalue() const
18✔
513
    {
514
        return str();
36✔
515
    }
516
};
517

518
template<typename T>
519
static py::class_<T, stream> udp_stream_register(py::module &m, const char *name)
10✔
520
{
521
    using namespace pybind11::literals;
522

523
    return py::class_<T, stream>(m, name)
20✔
524
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, std::string>(),
40✔
525
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
526
             "config"_a = stream_config(),
20✔
527
             "buffer_size"_a = T::default_buffer_size,
20✔
528
             "interface_address"_a = std::string())
20✔
529
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, int>(),
30✔
530
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
531
             "config"_a = stream_config(),
20✔
532
             "buffer_size"_a = T::default_buffer_size,
10✔
533
             "ttl"_a)
20✔
534
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, int, std::string>(),
30✔
535
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
536
             "config"_a = stream_config(),
20✔
537
             "buffer_size"_a = T::default_buffer_size,
10✔
538
             "ttl"_a,
10✔
539
             "interface_address"_a)
20✔
540
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &, std::size_t, int, unsigned int>(),
30✔
541
             "thread_pool"_a.none(false), "endpoints"_a,
10✔
542
             "config"_a = stream_config(),
20✔
543
             "buffer_size"_a = T::default_buffer_size,
10✔
544
             "ttl"_a,
10✔
545
             "interface_index"_a)
20✔
546
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const socket_wrapper<boost::asio::ip::udp::socket> &, const std::vector<std::pair<std::string, std::uint16_t>> &, const stream_config &>(),
20✔
547
             "thread_pool"_a.none(false), "socket"_a, "endpoints"_a,
20✔
548
             "config"_a = stream_config())
20✔
549

550
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &T::default_buffer_size);
20✔
551
}
552

553
#if SPEAD2_USE_IBV
554
template<typename T>
555
static py::class_<T, stream> udp_ibv_stream_register(py::module &m, const char *name)
10✔
556
{
557
    using namespace pybind11::literals;
558

559
    return py::class_<T, stream>(m, name)
20✔
560
        .def(py::init([](std::shared_ptr<thread_pool_wrapper> thread_pool,
28✔
561
                         const stream_config &config,
562
                         const udp_ibv_config_wrapper &ibv_config_wrapper)
563
            {
564
                udp_ibv_config ibv_config = ibv_config_wrapper;
8✔
565
                ibv_config.set_endpoints(
10✔
566
                    make_endpoints<boost::asio::ip::udp>(
567
                        thread_pool->get_io_service(),
8✔
568
                        ibv_config_wrapper.py_endpoints));
8✔
569
                ibv_config.set_interface_address(
6✔
570
                    make_address(thread_pool->get_io_service(),
6✔
571
                                 ibv_config_wrapper.py_interface_address));
6✔
572
                std::vector<std::pair<const void *, std::size_t>> regions;
4✔
573
                std::vector<py::buffer_info> buffer_infos;
4✔
574
                regions.reserve(ibv_config_wrapper.py_memory_regions.size());
4✔
575
                buffer_infos.reserve(regions.size());
4✔
576
                for (auto &buffer : ibv_config_wrapper.py_memory_regions)
7✔
577
                {
578
                    buffer_infos.push_back(request_buffer_info(buffer, PyBUF_C_CONTIGUOUS));
3✔
579
                    regions.emplace_back(
3✔
580
                        buffer_infos.back().ptr,
3✔
581
                        buffer_infos.back().itemsize * buffer_infos.back().size);
6✔
582
                }
583
                ibv_config.set_memory_regions(regions);
4✔
584

585
                return new T(std::move(thread_pool), config, ibv_config, std::move(buffer_infos));
9✔
586
            }),
16✔
587
            "thread_pool"_a.none(false),
20✔
588
            "config"_a = stream_config(),
10✔
589
            "udp_ibv_config"_a);
40✔
590
}
591
#endif
592

593
template<typename Base>
594
class tcp_stream_wrapper : public Base
595
{
596
public:
597
    /* All wrapping constructors that use a connect_handler take it as the
598
     * first argument, to faciliate the meta-programming used by registration
599
     * code.
600
     */
601
    template<typename ConnectHandler>
602
    tcp_stream_wrapper(
38✔
603
        ConnectHandler&& connect_handler,
604
        io_service_ref io_service,
605
        const std::vector<std::pair<std::string, std::uint16_t>> &endpoints,
606
        const stream_config &config,
607
        std::size_t buffer_size,
608
        const std::string &interface_address)
609
        : Base(io_service, std::forward<ConnectHandler>(connect_handler),
610
               make_endpoints<boost::asio::ip::tcp>(*io_service, endpoints),
611
               config, buffer_size, make_address(*io_service, interface_address))
38✔
612
    {
613
    }
38✔
614

615
    tcp_stream_wrapper(
24✔
616
        io_service_ref io_service,
617
        const socket_wrapper<boost::asio::ip::tcp::socket> &socket,
618
        const stream_config &config)
619
        : Base(io_service, socket.copy(*io_service), config)
24✔
620
    {
621
    }
24✔
622
};
623

624
/* This is a different design than the other registration functions, because
625
 * the TCP sync and async classes are constructed very differently (because of
626
 * the handling around connecting). The callback is called (several times) with
627
 * a function object that generates the unique_ptr<T> plus additional arguments
628
 * to pass to py::class_::def.
629
 */
630
template<typename Registrar>
631
static py::class_<typename Registrar::stream_type, stream> tcp_stream_register(py::module &m, const char *name)
10✔
632
{
633
    using namespace pybind11::literals;
634

635
    typedef typename Registrar::stream_type T;
636
    py::class_<T, stream> class_(m, name);
10✔
637
    class_
638
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
20✔
639
                      const socket_wrapper<boost::asio::ip::tcp::socket> &,
640
                      const stream_config &>(),
10✔
641
             "thread_pool"_a.none(false), "socket"_a, "config"_a = stream_config())
30✔
642
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &T::default_buffer_size);
10✔
643
    Registrar::template apply<
644
            std::shared_ptr<thread_pool_wrapper>,
645
            const std::vector<std::pair<std::string, std::uint16_t>> &,
646
            const stream_config &, std::size_t, const std::string &>(
40✔
647
        class_,
648
        "thread_pool"_a.none(false), "endpoints"_a,
20✔
649
        "config"_a = stream_config(),
20✔
650
        "buffer_size"_a = T::default_buffer_size,
20✔
651
        "interface_address"_a = "");
10✔
652
    return class_;
10✔
653
}
×
654

655
// Function object passed to tcp_stream_register to register the synchronous class
656
class tcp_stream_register_sync
657
{
658
private:
659
    struct connect_state
660
    {
661
        semaphore sem;
662
        boost::system::error_code ec;
663
    };
664

665
public:
666
    typedef tcp_stream_wrapper<stream_wrapper<tcp_stream>> stream_type;
667

668
private:
669
    /* Template args are explicit, hence no Args&&... */
670
    template<typename... Args>
671
    static std::unique_ptr<stream_type> construct(Args... args)
25✔
672
    {
673
        std::shared_ptr<connect_state> state = std::make_shared<connect_state>();
25✔
674
        auto connect_handler = [state](const boost::system::error_code &ec)
25✔
675
        {
676
            state->ec = ec;
25✔
677
            state->sem.put();
25✔
678
        };
679
        auto stream = std::make_unique<stream_type>(connect_handler, std::forward<Args>(args)...);
25✔
680
        semaphore_get(state->sem, gil_release_tag());
25✔
681
        if (state->ec)
25✔
682
            throw boost_io_error(state->ec);
1✔
683
        return stream;
48✔
684
    }
27✔
685

686
public:
687
    template<typename... Args, typename... Extra>
688
    static void apply(py::class_<stream_type, stream> &class_, Extra&&... extra)
5✔
689
    {
690
        class_.def(py::init(&tcp_stream_register_sync::construct<Args...>),
5✔
691
                   std::forward<Extra>(extra)...);
5✔
692
    }
5✔
693
};
694

695
// Function object passed to tcp_stream_register to register the asynchronous class
696
class tcp_stream_register_async
697
{
698
private:
699
    struct connect_state
700
    {
701
        py::handle callback;
702
    };
703

704
public:
705
    typedef tcp_stream_wrapper<asyncio_stream_wrapper<tcp_stream>> stream_type;
706

707
private:
708
    /* Template args are explicit, hence no Args&&... */
709
    template<typename... Args>
710
    static std::unique_ptr<stream_type> construct(py::object callback, Args... args)
13✔
711
    {
712
        std::shared_ptr<connect_state> state = std::make_shared<connect_state>();
13✔
713
        auto connect_handler = [state](boost::system::error_code ec)
26✔
714
        {
715
            py::gil_scoped_acquire gil;
13✔
716
            py::object callback = py::reinterpret_steal<py::object>(state->callback);
13✔
717
            callback(make_io_error(ec));
13✔
718
        };
13✔
719
        auto stream = std::make_unique<stream_type>(connect_handler, std::forward<Args>(args)...);
13✔
720
        /* The state takes over the references. These are dealt with using
721
         * py::handle rather than py::object to avoid manipulating refcounts
722
         * without the GIL. Note that while the connect_handler could occur
723
         * immediately, the GIL serialises access to state.
724
         */
725
        state->callback = callback.release();
13✔
726
        return stream;
26✔
727
    }
13✔
728

729
public:
730
    template<typename... Args, typename... Extra>
731
    static void apply(py::class_<stream_type, stream> &class_, Extra&&... extra)
5✔
732
    {
733
        using namespace pybind11::literals;
734
        class_.def(py::init(&tcp_stream_register_async::construct<Args...>),
5✔
735
                   "callback"_a, std::forward<Extra>(extra)...);
5✔
736
    }
5✔
737
};
738

739
template<typename T>
740
static py::class_<T, stream> inproc_stream_register(py::module &m, const char *name)
10✔
741
{
742
    using namespace pybind11::literals;
743
    return py::class_<T, stream>(m, name)
20✔
744
        .def(py::init<std::shared_ptr<thread_pool_wrapper>, const std::vector<std::shared_ptr<inproc_queue>> &, const stream_config &>(),
20✔
745
             "thread_pool"_a.none(false), "queues"_a, "config"_a = stream_config())
30✔
746
        .def_property_readonly("queues", &T::get_queues);
30✔
747
}
748

749
template<typename T>
750
static void sync_stream_register(py::class_<T, stream> &stream_class)
25✔
751
{
752
    using namespace pybind11::literals;
753
    stream_class.def("send_heap", &T::send_heap,
75✔
754
                     "heap"_a, "cnt"_a = s_item_pointer_t(-1),
75✔
755
                     "substream_index"_a = std::size_t(0));
25✔
756
    stream_class.def("send_heaps", &T::send_heaps,
25✔
757
                     "heaps"_a, "mode"_a);
25✔
758
    stream_class.def("send_heaps", &T::send_heaps_hrl,
25✔
759
                     "heaps"_a, "mode"_a);
25✔
760
}
25✔
761

762
template<typename T>
763
static void async_stream_register(py::class_<T, stream> &stream_class)
20✔
764
{
765
    using namespace pybind11::literals;
766
    stream_class
767
        .def_property_readonly("fd", &T::get_fd)
20✔
768
        .def("async_send_heap", &T::async_send_heap_obj,
60✔
769
             "heap"_a, "callback"_a, "cnt"_a = s_item_pointer_t(-1),
60✔
770
             "substream_index"_a = std::size_t(0))
20✔
771
        .def("async_send_heaps", &T::async_send_heaps_obj,
20✔
772
             "heaps"_a, "callback"_a, "mode"_a)
20✔
773
        .def("async_send_heaps", &T::async_send_heaps_hrl,
20✔
774
             "heaps"_a, "callback"_a, "mode"_a)
20✔
775
        .def("flush", &T::flush)
20✔
776
        .def("process_callbacks", &T::process_callbacks);
20✔
777
}
20✔
778

779
/// Register the send module with Boost.Python
780
py::module register_module(py::module &parent)
5✔
781
{
782
    using namespace pybind11::literals;
783

784
    py::module m = parent.def_submodule("send");
5✔
785

786
    py::class_<heap_wrapper>(m, "Heap")
5✔
787
        .def(py::init<flavour>(), "flavour"_a = flavour())
10✔
788
        .def_property_readonly("flavour", &heap_wrapper::get_flavour)
5✔
789
        .def("add_item", &heap_wrapper::add_item, "item"_a)
5✔
790
        .def("add_descriptor", &heap_wrapper::add_descriptor, "descriptor"_a)
5✔
791
        .def("add_start", &heap_wrapper::add_start)
5✔
792
        .def("add_end", &heap_wrapper::add_end)
5✔
793
        .def_property("repeat_pointers",
5✔
794
                      &heap_wrapper::get_repeat_pointers,
×
795
                      &heap_wrapper::set_repeat_pointers);
5✔
796

797
    // keep_alive is safe to use here in spite of pybind/pybind11#856, because
798
    // the destructor of packet_generator doesn't reference the heap.
799
    py::class_<packet_generator>(m, "PacketGenerator")
5✔
800
        .def(py::init<heap_wrapper &, item_pointer_t, std::size_t>(),
5✔
801
             "heap"_a, "cnt"_a, "max_packet_size"_a,
5✔
802
             py::keep_alive<1, 2>())
×
803
        .def("__iter__", [](py::object self) { return self; })
17✔
804
        .def("__next__", &packet_generator_next);
5✔
805

806
    py::enum_<rate_method>(m, "RateMethod")
10✔
807
        .value("SW", rate_method::SW)
5✔
808
        .value("HW", rate_method::HW)
5✔
809
        .value("AUTO", rate_method::AUTO);
5✔
810

811
    py::enum_<group_mode>(m, "GroupMode")
10✔
812
        .value("ROUND_ROBIN", group_mode::ROUND_ROBIN)
5✔
813
        .value("SERIAL", group_mode::SERIAL);
5✔
814

815
    py::class_<heap_reference>(m, "HeapReference")
5✔
816
        .def(py::init<const heap_wrapper &, s_item_pointer_t, std::size_t>(),
5✔
817
             "heap"_a, py::kw_only(), "cnt"_a = -1, "substream_index"_a = 0,
10✔
818
             py::keep_alive<1, 2>())
5✔
819
        .def_property_readonly(
×
820
            "heap",
821
            [](const heap_reference &h) { return static_cast<const heap_wrapper *>(&h.heap); },
×
822
            py::return_value_policy::reference)
10✔
823
        .def_readwrite("cnt", &heap_reference::cnt)
5✔
824
        .def_readwrite("substream_index", &heap_reference::substream_index);
5✔
825

826
    py::class_<heap_reference_list>(m, "HeapReferenceList")
5✔
827
        .def(py::init<std::vector<heap_reference>>(), "heaps"_a);
5✔
828

829
    py::class_<stream_config>(m, "StreamConfig")
5✔
830
        .def(py::init(&data_class_constructor<stream_config>))
5✔
831
        .def_property("max_packet_size",
10✔
832
                      &stream_config::get_max_packet_size,
5✔
833
                      SPEAD2_PTMF_VOID(stream_config, set_max_packet_size))
5✔
834
        .def_property("rate",
10✔
835
                      &stream_config::get_rate,
5✔
836
                      SPEAD2_PTMF_VOID(stream_config, set_rate))
5✔
837
        .def_property("burst_size",
10✔
838
                      &stream_config::get_burst_size,
5✔
839
                      SPEAD2_PTMF_VOID(stream_config, set_burst_size))
5✔
840
        .def_property("max_heaps",
10✔
841
                      &stream_config::get_max_heaps,
5✔
842
                      SPEAD2_PTMF_VOID(stream_config, set_max_heaps))
5✔
843
        .def_property("burst_rate_ratio",
10✔
844
                      &stream_config::get_burst_rate_ratio,
5✔
845
                      SPEAD2_PTMF_VOID(stream_config, set_burst_rate_ratio))
5✔
846
        .def_property("rate_method",
10✔
847
                      &stream_config::get_rate_method,
5✔
848
                      SPEAD2_PTMF_VOID(stream_config, set_rate_method))
5✔
849
        .def_property_readonly("burst_rate",
10✔
850
                               &stream_config::get_burst_rate)
10✔
851
        .def_readonly_static("DEFAULT_MAX_PACKET_SIZE", &stream_config::default_max_packet_size)
5✔
852
        .def_readonly_static("DEFAULT_MAX_HEAPS", &stream_config::default_max_heaps)
5✔
853
        .def_readonly_static("DEFAULT_BURST_SIZE", &stream_config::default_burst_size)
5✔
854
        .def_readonly_static("DEFAULT_BURST_RATE_RATIO", &stream_config::default_burst_rate_ratio)
5✔
855
        .def_readonly_static("DEFAULT_RATE_METHOD", &stream_config::default_rate_method);
5✔
856

857
    py::class_<stream>(m, "Stream")
5✔
858
        .def("set_cnt_sequence", &stream::set_cnt_sequence,
5✔
859
             "next"_a, "step"_a)
5✔
860
        .def_property_readonly("num_substreams", &stream::get_num_substreams);
5✔
861

862
    {
863
        auto stream_class = udp_stream_register<udp_stream_wrapper<stream_wrapper<udp_stream>>>(m, "UdpStream");
5✔
864
        sync_stream_register(stream_class);
5✔
865
    }
5✔
866
    {
867
        auto stream_class = udp_stream_register<udp_stream_wrapper<asyncio_stream_wrapper<udp_stream>>>(m, "UdpStreamAsyncio");
5✔
868
        async_stream_register(stream_class);
5✔
869
    }
5✔
870

871
#if SPEAD2_USE_IBV
872
    py::class_<udp_ibv_config_wrapper>(m, "UdpIbvConfig")
5✔
873
        .def(py::init(&data_class_constructor<udp_ibv_config_wrapper>))
5✔
874
        .def_readwrite("endpoints", &udp_ibv_config_wrapper::py_endpoints)
5✔
875
        .def_readwrite("memory_regions", &udp_ibv_config_wrapper::py_memory_regions)
5✔
876
        .def_readwrite("interface_address", &udp_ibv_config_wrapper::py_interface_address)
5✔
877
        .def_property("buffer_size",
10✔
878
                      &udp_ibv_config_wrapper::get_buffer_size,
5✔
879
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_buffer_size))
5✔
880
        .def_property("ttl",
10✔
881
                      &udp_ibv_config_wrapper::get_ttl,
5✔
882
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_ttl))
5✔
883
        .def_property("comp_vector",
10✔
884
                      &udp_ibv_config_wrapper::get_comp_vector,
5✔
885
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_comp_vector))
5✔
886
        .def_property("max_poll",
10✔
887
                      &udp_ibv_config_wrapper::get_max_poll,
5✔
888
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_poll))
10✔
889
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &udp_ibv_config_wrapper::default_buffer_size)
5✔
890
        .def_readonly_static("DEFAULT_MAX_POLL", &udp_ibv_config_wrapper::default_max_poll);
5✔
891

892
    {
893
        auto stream_class = udp_ibv_stream_register<udp_ibv_stream_wrapper<stream_wrapper<udp_ibv_stream>>>(m, "UdpIbvStream");
5✔
894
        sync_stream_register(stream_class);
5✔
895
    }
5✔
896
    {
897
        auto stream_class = udp_ibv_stream_register<udp_ibv_stream_wrapper<asyncio_stream_wrapper<udp_ibv_stream>>>(m, "UdpIbvStreamAsyncio");
5✔
898
        async_stream_register(stream_class);
5✔
899
    }
5✔
900
#endif
901

902
    {
903
        auto stream_class = tcp_stream_register<tcp_stream_register_sync>(m, "TcpStream");
5✔
904
        sync_stream_register(stream_class);
5✔
905
    }
5✔
906
    {
907
        auto stream_class = tcp_stream_register<tcp_stream_register_async>(m, "TcpStreamAsyncio");
5✔
908
        async_stream_register(stream_class);
5✔
909
    }
5✔
910

911
    {
912
        py::class_<bytes_stream, stream> stream_class(m, "BytesStream", py::multiple_inheritance());
5✔
913
        stream_class
914
            .def(py::init<std::shared_ptr<thread_pool_wrapper>, const stream_config &>(),
5✔
915
                 "thread_pool"_a.none(false), "config"_a = stream_config())
5✔
916
            .def("getvalue", &bytes_stream::getvalue);
5✔
917
        sync_stream_register(stream_class);
5✔
918
    }
5✔
919

920
    {
921
        auto stream_class = inproc_stream_register<stream_wrapper<inproc_stream>>(m, "InprocStream");
5✔
922
        sync_stream_register(stream_class);
5✔
923
    }
5✔
924
    {
925
        auto stream_class = inproc_stream_register<asyncio_stream_wrapper<inproc_stream>>(m, "InprocStreamAsyncio");
5✔
926
        async_stream_register(stream_class);
5✔
927
    }
5✔
928

929
    return m;
5✔
930
}
×
931

932
} // namespace send
933
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc