• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6098178779

06 Sep 2023 02:03PM UTC coverage: 78.235% (-0.2%) from 78.424%
6098178779

push

github

web-flow
Merge pull request #266 from ska-sa/simpler-ptmf

Simplify wrapping of pointers to members

107 of 107 new or added lines in 4 files covered. (100.0%)

5291 of 6763 relevant lines covered (78.23%)

56444.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.5
/src/py_recv.cpp
1
/* Copyright 2015, 2017, 2020-2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <pybind11/operators.h>
24
#include <algorithm>
25
#include <stdexcept>
26
#include <type_traits>
27
#include <cstdint>
28
#include <cctype>
29
#include <unistd.h>
30
#include <sys/socket.h>
31
#include <spead2/recv_udp.h>
32
#include <spead2/recv_udp_ibv.h>
33
#include <spead2/recv_udp_pcap.h>
34
#include <spead2/recv_tcp.h>
35
#include <spead2/recv_mem.h>
36
#include <spead2/recv_inproc.h>
37
#include <spead2/recv_stream.h>
38
#include <spead2/recv_ring_stream.h>
39
#include <spead2/recv_chunk_stream.h>
40
#include <spead2/recv_chunk_stream_group.h>
41
#include <spead2/recv_live_heap.h>
42
#include <spead2/recv_heap.h>
43
#include <spead2/common_ringbuffer.h>
44
#include <spead2/py_common.h>
45

46
namespace py = pybind11;
47

48
namespace spead2
49
{
50
namespace recv
51
{
52

53
/* True if both arguments are plain function pointers and point to the same function */
54
template<typename R, typename... Args>
55
static bool equal_functions(const std::function<R(Args...)> &a,
249✔
56
                            const std::function<R(Args...)> &b)
57
{
58
    using ptr = R (*)(Args...);
59
    const ptr *x = a.template target<ptr>();
249✔
60
    const ptr *y = b.template target<ptr>();
249✔
61
    return x && y && *x == *y;
249✔
62
}
63

64
/**
65
 * Wraps @ref item to provide safe memory management. The item references
66
 * memory inside the heap, so it needs to hold a reference to that
67
 * heap, as do any memoryviews created on the value.
68
 */
69
class item_wrapper : public item
70
{
71
private:
72
    /// Python object containing a @ref heap
73
    py::object owning_heap;
74

75
public:
76
    item_wrapper() = default;
77
    item_wrapper(const item &it, py::object owning_heap)
1,195✔
78
        : item(it), owning_heap(std::move(owning_heap)) {}
1,195✔
79

80
    /**
81
     * Obtain the raw value using Python buffer protocol.
82
     */
83
    py::buffer_info get_value() const
1,189✔
84
    {
85
        return py::buffer_info(
86
            reinterpret_cast<void *>(ptr),
1,189✔
87
            1,      // size of individual elements
88
            py::format_descriptor<std::uint8_t>::format(),
1,189✔
89
            length);
2,378✔
90
    }
91
};
92

93
/**
94
 * Extends mem_reader to obtain data using the Python buffer protocol.
95
 * It steals the provided buffer view; it is not passed by rvalue reference
96
 * because it cannot be perfectly forwarded.
97
 */
98
class buffer_reader : public mem_reader
99
{
100
private:
101
    py::buffer_info view;
102
public:
103
    explicit buffer_reader(stream &s, py::buffer_info &view)
54✔
104
        : mem_reader(s, reinterpret_cast<const std::uint8_t *>(view.ptr), view.itemsize * view.size),
54✔
105
        view(std::move(view))
54✔
106
    {
107
    }
54✔
108
};
109

110
#if SPEAD2_USE_IBV
111
/* Managing the endpoints and interface address requires some sleight of
112
 * hand. We store a separate copy in the wrapper in a Python-centric format.
113
 * When constructing the reader, we make a copy with the C++ view.
114
 */
115
class udp_ibv_config_wrapper : public udp_ibv_config
116
{
117
public:
118
    std::vector<std::pair<std::string, std::uint16_t>> py_endpoints;
119
    std::string py_interface_address;
120
};
121
#endif // SPEAD2_USE_IBV
122

123
static boost::asio::ip::address make_address(stream &s, const std::string &hostname)
286✔
124
{
125
    return make_address_no_release(s.get_io_service(), hostname,
126
                                   boost::asio::ip::udp::resolver::query::passive);
286✔
127
}
128

129
template<typename Protocol>
130
static typename Protocol::endpoint make_endpoint(
242✔
131
    stream &s, const std::string &hostname, std::uint16_t port)
132
{
133
    return typename Protocol::endpoint(make_address(s, hostname), port);
242✔
134
}
135

136
static void add_buffer_reader(stream &s, py::buffer buffer)
54✔
137
{
138
    py::buffer_info info = request_buffer_info(buffer, PyBUF_C_CONTIGUOUS);
54✔
139
    py::gil_scoped_release gil;
54✔
140
    s.emplace_reader<buffer_reader>(std::ref(info));
54✔
141
}
54✔
142

143
static void add_udp_reader(
122✔
144
    stream &s,
145
    std::uint16_t port,
146
    std::size_t max_size,
147
    std::size_t buffer_size,
148
    const std::string &bind_hostname)
149
{
150
    py::gil_scoped_release gil;
122✔
151
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, bind_hostname, port);
122✔
152
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size);
122✔
153
}
122✔
154

155
static void add_udp_reader_socket(
80✔
156
    stream &s,
157
    const socket_wrapper<boost::asio::ip::udp::socket> &socket,
158
    std::size_t max_size = udp_reader::default_max_size)
159
{
160
    auto asio_socket = socket.copy(s.get_io_service());
80✔
161
    py::gil_scoped_release gil;
80✔
162
    s.emplace_reader<udp_reader>(std::move(asio_socket), max_size);
80✔
163
}
80✔
164

165
static void add_udp_reader_bind_v4(
40✔
166
    stream &s,
167
    const std::string &address,
168
    std::uint16_t port,
169
    std::size_t max_size,
170
    std::size_t buffer_size,
171
    const std::string &interface_address)
172
{
173
    py::gil_scoped_release gil;
40✔
174
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
40✔
175
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, make_address(s, interface_address));
40✔
176
}
40✔
177

178
static void add_udp_reader_bind_v6(
40✔
179
    stream &s,
180
    const std::string &address,
181
    std::uint16_t port,
182
    std::size_t max_size,
183
    std::size_t buffer_size,
184
    unsigned int interface_index)
185
{
186
    py::gil_scoped_release gil;
40✔
187
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
40✔
188
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, interface_index);
40✔
189
}
40✔
190

191
static void add_tcp_reader(
36✔
192
    stream &s,
193
    std::uint16_t port,
194
    std::size_t max_size,
195
    std::size_t buffer_size,
196
    const std::string &bind_hostname)
197
{
198
    py::gil_scoped_release gil;
36✔
199
    auto endpoint = make_endpoint<boost::asio::ip::tcp>(s, bind_hostname, port);
36✔
200
    s.emplace_reader<tcp_reader>(endpoint, max_size, buffer_size);
36✔
201
}
36✔
202

203
static void add_tcp_reader_socket(
29✔
204
    stream &s,
205
    const socket_wrapper<boost::asio::ip::tcp::acceptor> &acceptor,
206
    std::size_t max_size)
207
{
208
    auto asio_socket = acceptor.copy(s.get_io_service());
29✔
209
    py::gil_scoped_release gil;
29✔
210
    s.emplace_reader<tcp_reader>(std::move(asio_socket), max_size);
29✔
211
}
29✔
212

213
#if SPEAD2_USE_IBV
214
static void add_udp_ibv_reader(stream &s, const udp_ibv_config_wrapper &config_wrapper)
5✔
215
{
216
    py::gil_scoped_release gil;
5✔
217
    udp_ibv_config config = config_wrapper;
5✔
218
    for (const auto &[host, port] : config_wrapper.py_endpoints)
8✔
219
        config.add_endpoint(make_endpoint<boost::asio::ip::udp>(s, host, port));
4✔
220
    config.set_interface_address(
3✔
221
        make_address(s, config_wrapper.py_interface_address));
5✔
222
    s.emplace_reader<udp_ibv_reader>(config);
2✔
223
}
10✔
224
#endif  // SPEAD2_USE_IBV
225

226
#if SPEAD2_USE_PCAP
227
static void add_udp_pcap_file_reader(stream &s, const std::string &filename, const std::string &filter)
×
228
{
229
    py::gil_scoped_release gil;
×
230
    s.emplace_reader<udp_pcap_file_reader>(filename, filter);
×
231
}
×
232
#endif
233

234
static void add_inproc_reader(stream &s, std::shared_ptr<inproc_queue> queue)
147✔
235
{
236
    py::gil_scoped_release gil;
147✔
237
    s.emplace_reader<inproc_reader>(queue);
147✔
238
}
147✔
239

240
class ring_stream_config_wrapper : public ring_stream_config
241
{
242
private:
243
    bool incomplete_keep_payload_ranges = false;
244

245
public:
246
    ring_stream_config_wrapper() = default;
54✔
247

248
    ring_stream_config_wrapper(const ring_stream_config &base) :
×
249
        ring_stream_config(base)
×
250
    {
251
    }
×
252

253
    ring_stream_config_wrapper &set_incomplete_keep_payload_ranges(bool keep)
3✔
254
    {
255
        incomplete_keep_payload_ranges = keep;
3✔
256
        return *this;
3✔
257
    }
258

259
    bool get_incomplete_keep_payload_ranges() const
506✔
260
    {
261
        return incomplete_keep_payload_ranges;
506✔
262
    }
263
};
264

265
/**
266
 * Stream that handles the magic necessary to reflect heaps into
267
 * Python space and capture the reference to it.
268
 *
269
 * The GIL needs to be handled carefully. Any operation run by the thread pool
270
 * might need to take the GIL to do logging. Thus, any operation that blocks
271
 * on completion of code scheduled through the thread pool must drop the GIL
272
 * first.
273
 */
274
class ring_stream_wrapper : public ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >
275
{
276
private:
277
    bool incomplete_keep_payload_ranges;
278
    exit_stopper stopper{[this] { stop(); }};
1✔
279

280
    py::object to_object(live_heap &&h)
485✔
281
    {
282
        if (h.is_contiguous())
485✔
283
            return py::cast(heap(std::move(h)), py::return_value_policy::move);
966✔
284
        else
285
            return py::cast(incomplete_heap(std::move(h), false, incomplete_keep_payload_ranges),
4✔
286
                            py::return_value_policy::move);
4✔
287
    }
288

289
public:
290
    ring_stream_wrapper(
504✔
291
        io_service_ref io_service,
292
        const stream_config &config = stream_config(),
293
        const ring_stream_config_wrapper &ring_config = ring_stream_config_wrapper())
294
        : ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore>>(
504✔
295
            std::move(io_service), config, ring_config),
504✔
296
        incomplete_keep_payload_ranges(ring_config.get_incomplete_keep_payload_ranges())
504✔
297
    {}
504✔
298

299
    py::object next()
961✔
300
    {
301
        try
302
        {
303
            return get();
1,444✔
304
        }
305
        catch (ringbuffer_stopped &e)
478✔
306
        {
307
            throw py::stop_iteration();
478✔
308
        }
478✔
309
    }
310

311
    py::object get()
961✔
312
    {
313
        return to_object(ring_stream::pop_live(gil_release_tag()));
1,444✔
314
    }
315

316
    py::object get_nowait()
4✔
317
    {
318
        return to_object(try_pop_live());
6✔
319
    }
320

321
    int get_fd() const
145✔
322
    {
323
        return get_ringbuffer().get_data_sem().get_fd();
145✔
324
    }
325

326
    ring_stream_config_wrapper get_ring_config() const
×
327
    {
328
        ring_stream_config_wrapper ring_config(
329
            ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >::get_ring_config());
×
330
        ring_config.set_incomplete_keep_payload_ranges(incomplete_keep_payload_ranges);
×
331
        return ring_config;
×
332
    }
333

334
    virtual void stop() override
506✔
335
    {
336
        stopper.reset();
506✔
337
        py::gil_scoped_release gil;
506✔
338
        ring_stream::stop();
506✔
339
    }
506✔
340

341
    ~ring_stream_wrapper()
1,008✔
342
    {
504✔
343
        stop();
504✔
344
    }
1,008✔
345
};
346

347
/**
348
 * Package a chunk with a reference to the original Python object.
349
 * This is used
350
 * only for chunks in the ringbuffer, not those owned by Python.
351
 */
352
class chunk_wrapper : public chunk
353
{
354
public:
355
    py::object obj;
356
};
357

358
/**
359
 * Get the original Python object from a wrapped chunk, and
360
 * restore its pointers.
361
 */
362
static py::object unwrap_chunk(std::unique_ptr<chunk> &&c)
321✔
363
{
364
    chunk_wrapper &cw = dynamic_cast<chunk_wrapper &>(*c);
321✔
365
    chunk &orig = cw.obj.cast<chunk &>();
321✔
366
    py::object ret = std::move(cw.obj);
321✔
367
    orig = std::move(*c);
321✔
368
    return ret;
321✔
369
}
370

371
/**
372
 * Wrap up a Python chunk into an object that can traverse the ringbuffer.
373
 * Python doesn't allow ownership to be given away, so we have to create a
374
 * new C++ object which refers back to the original Python object to keep
375
 * it alive.
376
 */
377
static std::unique_ptr<chunk_wrapper> wrap_chunk(chunk &c)
430✔
378
{
379
    if (!c.data)
430✔
380
        throw std::invalid_argument("data buffer is not set");
1✔
381
    if (!c.present)
429✔
382
        throw std::invalid_argument("present buffer is not set");
1✔
383
    auto cw = std::make_unique<chunk_wrapper>();
428✔
384
    static_cast<chunk &>(*cw) = std::move(c);
428✔
385
    cw->obj = py::cast(c);
428✔
386
    return cw;
428✔
387
}
×
388

389
/**
390
 * Push a chunk onto a ringbuffer. The specific operation is described by
391
 * @a func; this function takes care of wrapping in @ref chunk_wrapper.
392
 */
393
template<typename T>
394
static void push_chunk(T func, chunk &c)
430✔
395
{
396
    /* Note: the type of 'wrapper' must exactly match what the ringbuffer
397
     * expects, otherwise it constructs a new, temporary unique_ptr by
398
     * moving from 'wrapper', and we lose ownership in the failure path.
399
     */
400
    std::unique_ptr<chunk> wrapper = wrap_chunk(c);
430✔
401
    try
402
    {
403
        func(std::move(wrapper));
428✔
404
    }
405
    catch (std::exception &)
10✔
406
    {
407
        // Undo the move that happened as part of wrapping
408
        if (wrapper)
5✔
409
            c = std::move(*wrapper);
5✔
410
        throw;
5✔
411
    }
412
}
428✔
413

414
typedef ringbuffer<std::unique_ptr<chunk>, semaphore_fd, semaphore_fd> chunk_ringbuffer;
415

416
/* Note: ring_stream_wrapper drops the GIL while stopping. We
417
 * can't do that here because stop() can free chunks that were
418
 * in flight, which involves interaction with the Python API.
419
 * I think the only reason ring_stream_wrapper drops the GIL is
420
 * that logging used to directly acquire the GIL, and so if stop()
421
 * did any logging it would deadlock. Now that logging is pushed
422
 * off to a separate thread that should no longer be an issue.
423
 */
424
#define EXIT_STOPPER_WRAPPER(cls, base)                   \
425
    class cls : public base                               \
426
    {                                                     \
427
    private:                                              \
428
        exit_stopper stopper{[this] { stop(); }};         \
429
    public:                                               \
430
        using base::base;                                 \
431
        virtual void stop() override                      \
432
        {                                                 \
433
            stopper.reset();                              \
434
            base::stop();                                 \
435
        }                                                 \
436
    }
437

438
// These aliases are needed because a type passed to a macro cannot contain a comma
439
using chunk_ring_stream_orig = chunk_ring_stream<chunk_ringbuffer, chunk_ringbuffer>;
440
using chunk_stream_ring_group_orig = chunk_stream_ring_group<chunk_ringbuffer, chunk_ringbuffer>;
441

442
EXIT_STOPPER_WRAPPER(chunk_ring_stream_wrapper, chunk_ring_stream_orig);
11✔
443
EXIT_STOPPER_WRAPPER(chunk_stream_ring_group_wrapper, chunk_stream_ring_group_orig);
17✔
444
EXIT_STOPPER_WRAPPER(chunk_stream_group_member_wrapper, chunk_stream_group_member);
4✔
445

446
#undef EXIT_STOPPER_WRAPPER
447

448
/// Register the receiver module with Python
449
py::module register_module(py::module &parent)
5✔
450
{
451
    using namespace pybind11::literals;
452

453
    // Create the module
454
    py::module m = parent.def_submodule("recv");
5✔
455

456
    py::class_<heap_base>(m, "HeapBase")
5✔
457
        .def_property_readonly("cnt", &heap_base::get_cnt)
5✔
458
        .def_property_readonly("flavour", &heap_base::get_flavour)
5✔
459
        .def("get_items", [](py::object &self) -> py::list
5✔
460
        {
461
            const heap_base &h = self.cast<const heap_base &>();
456✔
462
            std::vector<item> base = h.get_items();
456✔
463
            py::list out;
456✔
464
            for (const item &it : base)
2,837✔
465
            {
466
                // Filter out descriptors here. The base class can't do so, because
467
                // the descriptors are retrieved from the items.
468
                if (it.id != DESCRIPTOR_ID)
2,381✔
469
                    out.append(item_wrapper(it, self));
1,195✔
470
            }
471
            return out;
912✔
472
        })
456✔
473
        .def("is_start_of_stream", &heap_base::is_start_of_stream)
5✔
474
        .def("is_end_of_stream", &heap_base::is_end_of_stream);
5✔
475
    py::class_<heap, heap_base>(m, "Heap")
5✔
476
        .def("get_descriptors", &heap::get_descriptors);
5✔
477
    py::class_<incomplete_heap, heap_base>(m, "IncompleteHeap")
5✔
478
        .def_property_readonly("heap_length", &incomplete_heap::get_heap_length)
5✔
479
        .def_property_readonly("received_length", &incomplete_heap::get_received_length)
5✔
480
        .def_property_readonly("payload_ranges", &incomplete_heap::get_payload_ranges);
5✔
481
    py::class_<item_wrapper>(m, "RawItem", py::buffer_protocol())
5✔
482
        .def_readonly("id", &item_wrapper::id)
5✔
483
        .def_readonly("is_immediate", &item_wrapper::is_immediate)
5✔
484
        .def_readonly("immediate_value", &item_wrapper::immediate_value)
5✔
485
        .def_buffer([](item_wrapper &item) { return item.get_value(); });
1,194✔
486

487
    py::class_<stream_stat_config> stream_stat_config_cls(m, "StreamStatConfig");
5✔
488
    /* We have to register the embedded enum type before we can use it as a
489
     * default value for the stream_stat constructor/
490
     */
491
    py::enum_<stream_stat_config::mode>(stream_stat_config_cls, "Mode")
10✔
492
        .value("COUNTER", stream_stat_config::mode::COUNTER)
5✔
493
        .value("MAXIMUM", stream_stat_config::mode::MAXIMUM);
5✔
494
    stream_stat_config_cls
495
        .def(
5✔
496
            py::init<std::string, stream_stat_config::mode>(),
5✔
497
            "name"_a, "mode"_a = stream_stat_config::mode::COUNTER)
5✔
498
        .def_property_readonly("name", &stream_stat_config::get_name)
5✔
499
        .def_property_readonly("mode", &stream_stat_config::get_mode)
5✔
500
        .def("combine", &stream_stat_config::combine)
5✔
501
        .def(py::self == py::self)
5✔
502
        .def(py::self != py::self);
5✔
503
    py::class_<stream_stats> stream_stats_cls(m, "StreamStats");
5✔
504
    stream_stats_cls
505
        .def("__getitem__", [](const stream_stats &self, std::size_t index)
5✔
506
        {
507
            if (index < self.size())
5✔
508
                return self[index];
4✔
509
            else
510
                throw py::index_error();
1✔
511
        })
512
        .def("__getitem__", [](const stream_stats &self, const std::string &name)
5✔
513
        {
514
            auto pos = self.find(name);
57✔
515
            if (pos == self.end())
57✔
516
                throw py::key_error(name);
1✔
517
            return pos->second;
56✔
518
        })
519
        .def("__setitem__", [](stream_stats &self, std::size_t index, std::uint64_t value)
5✔
520
        {
521
            if (index < self.size())
×
522
                self[index] = value;
×
523
            else
524
                throw py::index_error();
×
525
        })
×
526
        .def("__setitem__", [](stream_stats &self, const std::string &name, std::uint64_t value)
5✔
527
        {
528
            auto pos = self.find(name);
28✔
529
            if (pos == self.end())
28✔
530
                throw py::key_error(name);
×
531
            pos->second = value;
28✔
532
        })
28✔
533
        .def("__contains__", [](const stream_stats &self, const std::string &name)
5✔
534
        {
535
            return self.find(name) != self.end();
3✔
536
        })
537
        .def("get", [](const stream_stats &self, const std::string &name, py::object &default_)
5✔
538
        {
539
            auto pos = self.find(name);
×
540
            return pos != self.end() ? py::int_(pos->second) : default_;
×
541
        }, py::arg(), py::arg() = py::none())
10✔
542
        /* TODO: keys, values and items should ideally return view that
543
         * simulate Python's dictionary views (py::bind_map does this, but it
544
         * can't be used because it expects the map to implement erase).
545
         */
546
        .def(
5✔
547
            "items",
548
            [](const stream_stats &self) { return py::make_iterator(self.begin(), self.end()); },
1✔
549
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
550
        )
551
        .def(
5✔
552
            "__iter__",
553
            [](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
2✔
554
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
555
        )
556
        .def(
5✔
557
            "keys",
558
            [](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
1✔
559
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
560
        )
561
        .def(
5✔
562
            "values",
563
            [](const stream_stats &self) { return py::make_value_iterator(self.begin(), self.end()); },
1✔
564
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
565
        )
566
        .def("__len__", &stream_stats::size)
5✔
567
        .def_property_readonly("config", &stream_stats::get_config)
5✔
568
        .def(py::self + py::self)
5✔
569
        .def(py::self += py::self);
5✔
570

571
    py::module stream_stat_indices_module = m.def_submodule("stream_stat_indices");
5✔
572
    /* The macro registers a property on stream_stats to access the built-in stats
573
     * by name, and at the same time populates the index constant in submodule
574
     * stream_stat_indices (upper-casing it).
575
     */
576
#define STREAM_STATS_PROPERTY(field) \
577
    do { \
578
        stream_stats_cls.def_property( \
579
            #field, \
580
            [](const stream_stats &self) { return self[stream_stat_indices::field]; }, \
581
            [](stream_stats &self, std::uint64_t value) { self[stream_stat_indices::field] = value; }); \
582
        std::string upper = #field; \
583
        std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); \
584
        stream_stat_indices_module.attr(upper.c_str()) = stream_stat_indices::field; \
585
    } while (false)
586

587
    STREAM_STATS_PROPERTY(heaps);
22✔
588
    STREAM_STATS_PROPERTY(incomplete_heaps_evicted);
22✔
589
    STREAM_STATS_PROPERTY(incomplete_heaps_flushed);
22✔
590
    STREAM_STATS_PROPERTY(packets);
22✔
591
    STREAM_STATS_PROPERTY(batches);
20✔
592
    STREAM_STATS_PROPERTY(worker_blocked);
24✔
593
    STREAM_STATS_PROPERTY(max_batch);
20✔
594
    STREAM_STATS_PROPERTY(single_packet_heaps);
20✔
595
    STREAM_STATS_PROPERTY(search_dist);
20✔
596
#undef STREAM_STATS_PROPERTY
597

598
    py::class_<stream_config>(m, "StreamConfig")
5✔
599
        .def(py::init(&data_class_constructor<stream_config>))
5✔
600
        .def_property("max_heaps",
10✔
601
                      &stream_config::get_max_heaps,
×
602
                      &stream_config::set_max_heaps)
5✔
603
        .def_property("substreams",
10✔
604
                      &stream_config::get_substreams,
×
605
                      &stream_config::set_substreams)
5✔
606
        .def_property("bug_compat",
10✔
607
                      &stream_config::get_bug_compat,
×
608
                      &stream_config::set_bug_compat)
5✔
609
        .def_property("memcpy",
5✔
610
             [](const stream_config &self) {
247✔
611
                 stream_config cmp;
247✔
612
                 memcpy_function_id ids[] = {MEMCPY_STD, MEMCPY_NONTEMPORAL};
247✔
613
                 for (memcpy_function_id id : ids)
249✔
614
                 {
615
                     cmp.set_memcpy(id);
249✔
616
                     if (equal_functions(self.get_memcpy(), cmp.get_memcpy()))
249✔
617
                         return int(id);
247✔
618
                 }
619
                 throw std::invalid_argument("memcpy function is not one of the standard ones");
×
620
             },
247✔
621
             [](stream_config &self, int id) { self.set_memcpy(memcpy_function_id(id)); })
245✔
622
        .def_property("memory_allocator",
10✔
623
                      &stream_config::get_memory_allocator,
5✔
624
                      SPEAD2_PTMF_VOID(stream_config, set_memory_allocator))
5✔
625
        .def_property("stop_on_stop_item",
10✔
626
                      &stream_config::get_stop_on_stop_item,
5✔
627
                      SPEAD2_PTMF_VOID(stream_config, set_stop_on_stop_item))
5✔
628
        .def_property("allow_unsized_heaps",
10✔
629
                      &stream_config::get_allow_unsized_heaps,
5✔
630
                      SPEAD2_PTMF_VOID(stream_config, set_allow_unsized_heaps))
5✔
631
        .def_property("allow_out_of_order",
10✔
632
                      &stream_config::get_allow_out_of_order,
5✔
633
                      SPEAD2_PTMF_VOID(stream_config, set_allow_out_of_order))
5✔
634
        .def_property("stream_id",
10✔
635
                      &stream_config::get_stream_id,
×
636
                      &stream_config::set_stream_id)
5✔
637
        .def("add_stat", &stream_config::add_stat,
5✔
638
             "name"_a,
5✔
639
             "mode"_a = stream_stat_config::mode::COUNTER)
10✔
640
        .def_property_readonly("stats", &stream_config::get_stats)
5✔
641
        .def("get_stat_index", &stream_config::get_stat_index,
5✔
642
             "name"_a)
5✔
643
        .def("next_stat_index", &stream_config::next_stat_index)
10✔
644
        .def_readonly_static("DEFAULT_MAX_HEAPS", &stream_config::default_max_heaps);
5✔
645
    py::class_<ring_stream_config_wrapper>(m, "RingStreamConfig")
5✔
646
        .def(py::init(&data_class_constructor<ring_stream_config_wrapper>))
5✔
647
        .def_property("heaps",
10✔
648
                      &ring_stream_config_wrapper::get_heaps,
5✔
649
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_heaps))
5✔
650
        .def_property("contiguous_only",
10✔
651
                      &ring_stream_config_wrapper::get_contiguous_only,
5✔
652
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_contiguous_only))
5✔
653
        .def_property("incomplete_keep_payload_ranges",
10✔
654
                      &ring_stream_config_wrapper::get_incomplete_keep_payload_ranges,
5✔
655
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_incomplete_keep_payload_ranges))
10✔
656
        .def_readonly_static("DEFAULT_HEAPS", &ring_stream_config_wrapper::default_heaps);
5✔
657
#if SPEAD2_USE_IBV
658
    py::class_<udp_ibv_config_wrapper>(m, "UdpIbvConfig")
5✔
659
        .def(py::init(&data_class_constructor<udp_ibv_config_wrapper>))
5✔
660
        .def_readwrite("endpoints", &udp_ibv_config_wrapper::py_endpoints)
5✔
661
        .def_readwrite("interface_address", &udp_ibv_config_wrapper::py_interface_address)
5✔
662
        .def_property("buffer_size",
10✔
663
                      &udp_ibv_config_wrapper::get_buffer_size,
5✔
664
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_buffer_size))
5✔
665
        .def_property("max_size",
10✔
666
                      &udp_ibv_config_wrapper::get_max_size,
5✔
667
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_size))
5✔
668
        .def_property("comp_vector",
10✔
669
                      &udp_ibv_config_wrapper::get_comp_vector,
5✔
670
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_comp_vector))
5✔
671
        .def_property("max_poll",
10✔
672
                      &udp_ibv_config_wrapper::get_max_poll,
5✔
673
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_poll))
10✔
674
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &udp_ibv_config_wrapper::default_buffer_size)
5✔
675
        .def_readonly_static("DEFAULT_MAX_SIZE", &udp_ibv_config_wrapper::default_max_size)
5✔
676
        .def_readonly_static("DEFAULT_MAX_POLL", &udp_ibv_config_wrapper::default_max_poll);
5✔
677
#endif // SPEAD2_USE_IBV
678
    py::class_<stream>(m, "_Stream")
5✔
679
        // SPEAD2_PTMF doesn't work for get_stats because it's defined in stream_base, which is a protected ancestor
680
        .def_property_readonly("stats", [](const stream &self) { return self.get_stats(); })
28✔
681
        .def_property_readonly("config",
5✔
682
                               [](const stream &self) { return self.get_config(); })
×
683
        .def("add_buffer_reader", add_buffer_reader, "buffer"_a)
5✔
684
        .def("add_udp_reader", add_udp_reader,
5✔
685
              "port"_a,
5✔
686
              "max_size"_a = udp_reader::default_max_size,
5✔
687
              "buffer_size"_a = udp_reader::default_buffer_size,
10✔
688
              "bind_hostname"_a = std::string())
10✔
689
        .def("add_udp_reader", add_udp_reader_socket,
5✔
690
              "socket"_a,
5✔
691
              "max_size"_a = udp_reader::default_max_size)
10✔
692
        .def("add_udp_reader", add_udp_reader_bind_v4,
5✔
693
              "multicast_group"_a,
5✔
694
              "port"_a,
5✔
695
              "max_size"_a = udp_reader::default_max_size,
5✔
696
              "buffer_size"_a = udp_reader::default_buffer_size,
10✔
697
              "interface_address"_a = "0.0.0.0")
10✔
698
        .def("add_udp_reader", add_udp_reader_bind_v6,
5✔
699
              "multicast_group"_a,
5✔
700
              "port"_a,
5✔
701
              "max_size"_a = udp_reader::default_max_size,
5✔
702
              "buffer_size"_a = udp_reader::default_buffer_size,
10✔
703
              "interface_index"_a = (unsigned int) 0)
10✔
704
        .def("add_tcp_reader", add_tcp_reader,
5✔
705
             "port"_a,
5✔
706
             "max_size"_a = tcp_reader::default_max_size,
5✔
707
             "buffer_size"_a = tcp_reader::default_buffer_size,
10✔
708
             "bind_hostname"_a = std::string())
10✔
709
        .def("add_tcp_reader", add_tcp_reader_socket,
5✔
710
             "acceptor"_a,
5✔
711
             "max_size"_a = tcp_reader::default_max_size)
10✔
712
#if SPEAD2_USE_IBV
713
        .def("add_udp_ibv_reader", add_udp_ibv_reader,
5✔
714
             "config"_a)
5✔
715
#endif
716
#if SPEAD2_USE_PCAP
717
        .def("add_udp_pcap_file_reader", add_udp_pcap_file_reader,
5✔
718
             "filename"_a, "filter"_a = "")
10✔
719
#endif
720
        .def("add_inproc_reader", add_inproc_reader,
5✔
721
             "queue"_a)
5✔
722
        .def("stop", &stream::stop)
10✔
723
        .def_readonly_static("DEFAULT_UDP_MAX_SIZE", &udp_reader::default_max_size)
5✔
724
        .def_readonly_static("DEFAULT_UDP_BUFFER_SIZE", &udp_reader::default_buffer_size)
5✔
725
        .def_readonly_static("DEFAULT_TCP_MAX_SIZE", &tcp_reader::default_max_size)
5✔
726
        .def_readonly_static("DEFAULT_TCP_BUFFER_SIZE", &tcp_reader::default_buffer_size);
5✔
727
    py::class_<ring_stream_wrapper, stream> stream_class(m, "Stream");
5✔
728
    stream_class
729
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
5✔
730
                      const stream_config &,
731
                      const ring_stream_config_wrapper &>(),
5✔
732
             "thread_pool"_a.none(false), "config"_a = stream_config(),
10✔
733
             "ring_config"_a = ring_stream_config_wrapper())
10✔
734
        .def("__iter__", [](py::object self) { return self; })
483✔
735
        .def("__next__", &ring_stream_wrapper::next)
5✔
736
        .def("get", &ring_stream_wrapper::get)
5✔
737
        .def("get_nowait", &ring_stream_wrapper::get_nowait)
5✔
738
        .def_property_readonly("fd", &ring_stream_wrapper::get_fd)
5✔
739
        .def_property_readonly("ringbuffer", &ring_stream_wrapper::get_ringbuffer)
5✔
740
        .def_property_readonly("ring_config", &ring_stream_wrapper::get_ring_config);
5✔
741
    using Ringbuffer = ringbuffer<live_heap, semaphore_fd, semaphore>;
742
    py::class_<Ringbuffer>(stream_class, "Ringbuffer")
5✔
743
        .def("size", &Ringbuffer::size)
5✔
744
        .def("capacity", &Ringbuffer::capacity);
5✔
745
    py::class_<chunk_stream_config>(m, "ChunkStreamConfig")
5✔
746
        .def(py::init(&data_class_constructor<chunk_stream_config>))
5✔
747
        .def_property("items",
10✔
748
                      &chunk_stream_config::get_items,
×
749
                      &chunk_stream_config::set_items)
5✔
750
        .def_property("max_chunks",
10✔
751
                      &chunk_stream_config::get_max_chunks,
×
752
                      &chunk_stream_config::set_max_chunks)
5✔
753
        .def_property(
5✔
754
            "place",
755
            [](const chunk_stream_config &config) {
78✔
756
                return callback_to_python(config.get_place());
78✔
757
            },
758
            [](chunk_stream_config &config, py::object obj) {
74✔
759
                config.set_place(callback_from_python<chunk_place_function>(
74✔
760
                    obj,
761
                    "void (void *, size_t)",
762
                    "void (void *, size_t, void *)"
763
                ));
764
            })
70✔
765
        .def(
10✔
766
            "enable_packet_presence", &chunk_stream_config::enable_packet_presence,
5✔
767
            "payload_size"_a)
5✔
768
        .def("disable_packet_presence", &chunk_stream_config::disable_packet_presence)
5✔
769
        .def_property_readonly("packet_presence_payload_size",
10✔
770
                               &chunk_stream_config::get_packet_presence_payload_size)
5✔
771
        .def_property("max_heap_extra",
10✔
772
                      &chunk_stream_config::get_max_heap_extra,
×
773
                      &chunk_stream_config::set_max_heap_extra)
10✔
774
        .def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_config::default_max_chunks);
5✔
775
    py::class_<chunk>(m, "Chunk")
5✔
776
        .def(py::init(&data_class_constructor<chunk>))
5✔
777
        .def_readwrite("chunk_id", &chunk::chunk_id)
5✔
778
        .def_readwrite("stream_id", &chunk::stream_id)
5✔
779
        // Can't use def_readwrite for present, data, extra because they're
780
        // non-copyable types
781
        .def_property(
5✔
782
            "present",
783
            [](const chunk &c) -> const memory_allocator::pointer & { return c.present; },
770✔
784
            [](chunk &c, memory_allocator::pointer &&value)
135✔
785
            {
786
                if (value)
135✔
787
                {
788
                    auto *alloc = get_buffer_allocation(value);
134✔
789
                    assert(alloc != nullptr);
134✔
790
                    c.present_size = alloc->buffer_info.size * alloc->buffer_info.itemsize;
134✔
791
                }
792
                else
793
                    c.present_size = 0;
1✔
794
                c.present = std::move(value);
135✔
795
            })
135✔
796
        .def_property(
5✔
797
            "data",
798
            [](const chunk &c) -> const memory_allocator::pointer & { return c.data; },
763✔
799
            [](chunk &c, memory_allocator::pointer &&value) { c.data = std::move(value); })
135✔
800
        .def_property(
5✔
801
            "extra",
802
            [](const chunk &c) -> const memory_allocator::pointer & { return c.extra; },
147✔
803
            [](chunk &c, memory_allocator::pointer &&value) { c.extra = std::move(value); });
44✔
804
    // Don't allow ChunkRingPair to be constructed from Python. It exists
805
    // purely to be a base class.
806
    using chunk_ring_pair = detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>;
807
    py::class_<chunk_ring_pair>(m, "ChunkRingPair")
5✔
808
        .def(
5✔
809
            "add_free_chunk",
810
            [](chunk_ring_pair &self, chunk &c)
298✔
811
            {
812
                push_chunk(
298✔
813
                    [&self](std::unique_ptr<chunk> &&wrapper)
298✔
814
                    {
815
                        self.add_free_chunk(std::move(wrapper));
298✔
816
                    },
298✔
817
                    c
818
                );
819
            },
298✔
820
            "chunk"_a)
5✔
821
        .def_property_readonly("data_ringbuffer", &chunk_ring_pair::get_data_ringbuffer)
5✔
822
        .def_property_readonly("free_ringbuffer", &chunk_ring_pair::get_free_ringbuffer);
5✔
823

824
    py::class_<chunk_ring_stream_wrapper,
5✔
825
               detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>,
826
               stream>(m, "ChunkRingStream")
827
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
5✔
828
                      const stream_config &,
829
                      const chunk_stream_config &,
830
                      std::shared_ptr<chunk_ringbuffer>,
831
                      std::shared_ptr<chunk_ringbuffer>>(),
5✔
832
             "thread_pool"_a.none(false),
5✔
833
             "config"_a = stream_config(),
10✔
834
             "chunk_stream_config"_a,
5✔
835
             "data_ringbuffer"_a.none(false),
5✔
836
             "free_ringbuffer"_a.none(false),
5✔
837
            // Keep the Python ringbuffer objects alive, not just the C++ side.
838
            // This allows Python subclasses to be passed then later retrieved
839
            // from properties.
840
             py::keep_alive<1, 5>(),
×
841
             py::keep_alive<1, 6>());
5✔
842
    py::class_<chunk_ringbuffer, std::shared_ptr<chunk_ringbuffer>>(m, "ChunkRingbuffer")
5✔
843
        .def(py::init<std::size_t>(), "maxsize"_a)
5✔
844
        .def("qsize", &chunk_ringbuffer::size)
5✔
845
        .def_property_readonly("maxsize", &chunk_ringbuffer::capacity)
5✔
846
        .def_property_readonly(
5✔
847
            "data_fd",
848
            [](const chunk_ringbuffer &ring) { return ring.get_data_sem().get_fd(); })
7✔
849
        .def_property_readonly(
5✔
850
            "space_fd",
851
            [](const chunk_ringbuffer &ring) { return ring.get_space_sem().get_fd(); })
7✔
852
        .def("get", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.pop(gil_release_tag())); })
489✔
853
        .def("get_nowait", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.try_pop()); })
40✔
854
        .def(
5✔
855
            "put",
856
            [](chunk_ringbuffer &ring, chunk &c)
110✔
857
            {
858
                push_chunk(
110✔
859
                    [&ring](std::unique_ptr<chunk> &&wrapper)
108✔
860
                    {
861
                        ring.push(std::move(wrapper), gil_release_tag());
108✔
862
                    },
108✔
863
                    c
864
                );
865
            },
108✔
866
            "chunk"_a)
5✔
867
        .def(
5✔
868
            "put_nowait",
869
            [](chunk_ringbuffer &ring, chunk &c)
22✔
870
            {
871
                push_chunk(
22✔
872
                    [&ring](std::unique_ptr<chunk> &&wrapper) { ring.try_push(std::move(wrapper)); },
22✔
873
                    c
874
                );
875
            },
17✔
876
            "chunk"_a)
5✔
877
        .def("empty", [](const chunk_ringbuffer &ring) { return ring.size() == 0; })
9✔
878
        .def("full", [](const chunk_ringbuffer &ring) { return ring.size() == ring.capacity(); })
141✔
879
        .def("stop", &chunk_ringbuffer::stop)
5✔
880
        .def("add_producer", &chunk_ringbuffer::add_producer)
5✔
881
        .def("remove_producer", &chunk_ringbuffer::remove_producer)
5✔
882
        .def("__iter__", [](py::object self) { return self; })
17✔
883
        .def(
5✔
884
            "__next__", [](chunk_ringbuffer &ring)
85✔
885
            {
886
                try
887
                {
888
                    return unwrap_chunk(ring.pop(gil_release_tag()));
158✔
889
                }
890
                catch (ringbuffer_stopped &)
12✔
891
                {
892
                    throw py::stop_iteration();
12✔
893
                }
12✔
894
            });
895

896
    py::class_<chunk_stream_group_config> chunk_stream_group_config_cls(m, "ChunkStreamGroupConfig");
5✔
897
    chunk_stream_group_config_cls
898
        .def(py::init(&data_class_constructor<chunk_stream_group_config>))
5✔
899
        .def_property("max_chunks",
10✔
900
                      &chunk_stream_group_config::get_max_chunks,
×
901
                      &chunk_stream_group_config::set_max_chunks)
5✔
902
        .def_property("eviction_mode",
10✔
903
                      &chunk_stream_group_config::get_eviction_mode,
×
904
                      &chunk_stream_group_config::set_eviction_mode)
5✔
905
        .def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_group_config::default_max_chunks);
5✔
906
    py::enum_<chunk_stream_group_config::eviction_mode>(chunk_stream_group_config_cls, "EvictionMode")
10✔
907
        .value("LOSSY", chunk_stream_group_config::eviction_mode::LOSSY)
5✔
908
        .value("LOSSLESS", chunk_stream_group_config::eviction_mode::LOSSLESS);
5✔
909

910
    py::class_<chunk_stream_group_member, stream>(m, "ChunkStreamGroupMember");
5✔
911

912
    py::class_<chunk_stream_ring_group_wrapper,
5✔
913
               detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>>(m, "ChunkStreamRingGroup")
914
        .def(py::init<const chunk_stream_group_config &,
5✔
915
                      std::shared_ptr<chunk_ringbuffer>,
916
                      std::shared_ptr<chunk_ringbuffer>>(),
5✔
917
             "config"_a,
5✔
918
             "data_ringbuffer"_a.none(false),
5✔
919
             "free_ringbuffer"_a.none(false),
5✔
920
            // Keep the Python ringbuffer objects alive, not just the C++ side.
921
            // This allows Python subclasses to be passed then later retrieved
922
            // from properties.
923
            py::keep_alive<1, 3>(),
×
924
            py::keep_alive<1, 4>())
×
925
        .def_property_readonly(
10✔
926
            "config", &chunk_stream_ring_group_wrapper::get_config)
5✔
927
        .def(
5✔
928
            "emplace_back",
929
            [](chunk_stream_ring_group_wrapper &group,
94✔
930
               std::shared_ptr<thread_pool_wrapper> thread_pool,
931
               const stream_config &config,
932
               const chunk_stream_config &chunk_stream_config) -> chunk_stream_group_member & {
933
                return group.emplace_back<chunk_stream_group_member_wrapper>(std::move(thread_pool), config, chunk_stream_config);
94✔
934
            },
935
            "thread_pool"_a, "config"_a, "chunk_stream_config"_a,
5✔
936
            py::return_value_policy::reference_internal
5✔
937
        )
938
        .def("__len__", &chunk_stream_ring_group_wrapper::size)
5✔
939
        .def(
×
940
            "__getitem__",
941
            [](chunk_stream_ring_group_wrapper &group, std::ptrdiff_t index) -> chunk_stream_group_member & {
113✔
942
                if (index < 0)
113✔
943
                    index += group.size();
4✔
944
                if (index >= 0 && std::size_t(index) < group.size())
113✔
945
                    return group[index];
90✔
946
                else
947
                    throw py::index_error();
23✔
948
            },
949
            py::return_value_policy::reference_internal
5✔
950
        )
951
        .def(
5✔
952
            "__getitem__",
953
            [](chunk_stream_ring_group_wrapper &group, const py::slice &slice) {
4✔
954
                py::list out;
4✔
955
                std::size_t start, stop, step, length;
956
                if (!slice.compute(group.size(), &start, &stop, &step, &length))
4✔
957
                    throw py::error_already_set();
×
958
                py::object self = py::cast(group);
4✔
959
                for (std::size_t i = 0; i < length; i++) {
12✔
960
                    out.append(py::cast(group[start], py::return_value_policy::reference_internal, self));
8✔
961
                    start += step;
8✔
962
                }
963
                return out;
8✔
964
            }
4✔
965
        )
966
        .def("stop", &chunk_stream_ring_group_wrapper::stop);
5✔
967

968
    return m;
10✔
969
}
5✔
970

971
} // namespace recv
972
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc