• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5462348362

pending completion
5462348362

Pull #219

github

web-flow
Merge fbf2f7ae2 into 99464cfdf
Pull Request #219: Add chunk stream groups

589 of 589 new or added lines in 16 files covered. (100.0%)

5397 of 7206 relevant lines covered (74.9%)

52888.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.74
/src/py_recv.cpp
1
/* Copyright 2015, 2017, 2020-2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <pybind11/operators.h>
24
#include <algorithm>
25
#include <stdexcept>
26
#include <type_traits>
27
#include <cstdint>
28
#include <cctype>
29
#include <unistd.h>
30
#include <sys/socket.h>
31
#include <boost/optional.hpp>
32
#include <spead2/recv_udp.h>
33
#include <spead2/recv_udp_ibv.h>
34
#include <spead2/recv_udp_pcap.h>
35
#include <spead2/recv_tcp.h>
36
#include <spead2/recv_mem.h>
37
#include <spead2/recv_inproc.h>
38
#include <spead2/recv_stream.h>
39
#include <spead2/recv_ring_stream.h>
40
#include <spead2/recv_chunk_stream.h>
41
#include <spead2/recv_chunk_stream_group.h>
42
#include <spead2/recv_live_heap.h>
43
#include <spead2/recv_heap.h>
44
#include <spead2/common_ringbuffer.h>
45
#include <spead2/py_common.h>
46

47
namespace py = pybind11;
48

49
namespace spead2
50
{
51
namespace recv
52
{
53

54
/* True if both arguments are plain function pointers and point to the same function */
55
template<typename R, typename... Args>
56
static bool equal_functions(const std::function<R(Args...)> &a,
249✔
57
                            const std::function<R(Args...)> &b)
58
{
59
    using ptr = R (*)(Args...);
60
    const ptr *x = a.template target<ptr>();
249✔
61
    const ptr *y = b.template target<ptr>();
249✔
62
    return x && y && *x == *y;
249✔
63
}
64

65
/**
66
 * Wraps @ref item to provide safe memory management. The item references
67
 * memory inside the heap, so it needs to hold a reference to that
68
 * heap, as do any memoryviews created on the value.
69
 */
70
class item_wrapper : public item
71
{
72
private:
73
    /// Python object containing a @ref heap
74
    py::object owning_heap;
75

76
public:
77
    item_wrapper() = default;
78
    item_wrapper(const item &it, py::object owning_heap)
1,195✔
79
        : item(it), owning_heap(std::move(owning_heap)) {}
1,195✔
80

81
    /**
82
     * Obtain the raw value using Python buffer protocol.
83
     */
84
    py::buffer_info get_value() const
1,189✔
85
    {
86
        return py::buffer_info(
87
            reinterpret_cast<void *>(ptr),
1,189✔
88
            1,      // size of individual elements
89
            py::format_descriptor<std::uint8_t>::format(),
1,189✔
90
            length);
2,378✔
91
    }
92
};
93

94
/**
95
 * Extends mem_reader to obtain data using the Python buffer protocol.
96
 * It steals the provided buffer view; it is not passed by rvalue reference
97
 * because it cannot be perfectly forwarded.
98
 */
99
class buffer_reader : public mem_reader
100
{
101
private:
102
    py::buffer_info view;
103
public:
104
    explicit buffer_reader(stream &s, py::buffer_info &view)
54✔
105
        : mem_reader(s, reinterpret_cast<const std::uint8_t *>(view.ptr), view.itemsize * view.size),
54✔
106
        view(std::move(view))
54✔
107
    {
108
    }
54✔
109
};
110

111
#if SPEAD2_USE_IBV
112
/* Managing the endpoints and interface address requires some sleight of
113
 * hand. We store a separate copy in the wrapper in a Python-centric format.
114
 * When constructing the reader, we make a copy with the C++ view.
115
 */
116
class udp_ibv_config_wrapper : public udp_ibv_config
117
{
118
public:
119
    std::vector<std::pair<std::string, std::uint16_t>> py_endpoints;
120
    std::string py_interface_address;
121
};
122
#endif // SPEAD2_USE_IBV
123

124
static boost::asio::ip::address make_address(stream &s, const std::string &hostname)
286✔
125
{
126
    return make_address_no_release(s.get_io_service(), hostname,
127
                                   boost::asio::ip::udp::resolver::query::passive);
286✔
128
}
129

130
template<typename Protocol>
131
static typename Protocol::endpoint make_endpoint(
242✔
132
    stream &s, const std::string &hostname, std::uint16_t port)
133
{
134
    return typename Protocol::endpoint(make_address(s, hostname), port);
242✔
135
}
136

137
static void add_buffer_reader(stream &s, py::buffer buffer)
54✔
138
{
139
    py::buffer_info info = request_buffer_info(buffer, PyBUF_C_CONTIGUOUS);
54✔
140
    py::gil_scoped_release gil;
54✔
141
    s.emplace_reader<buffer_reader>(std::ref(info));
54✔
142
}
54✔
143

144
static void add_udp_reader(
122✔
145
    stream &s,
146
    std::uint16_t port,
147
    std::size_t max_size,
148
    std::size_t buffer_size,
149
    const std::string &bind_hostname)
150
{
151
    py::gil_scoped_release gil;
122✔
152
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, bind_hostname, port);
122✔
153
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size);
122✔
154
}
122✔
155

156
static void add_udp_reader_socket(
80✔
157
    stream &s,
158
    const socket_wrapper<boost::asio::ip::udp::socket> &socket,
159
    std::size_t max_size = udp_reader::default_max_size)
160
{
161
    auto asio_socket = socket.copy(s.get_io_service());
80✔
162
    py::gil_scoped_release gil;
80✔
163
    s.emplace_reader<udp_reader>(std::move(asio_socket), max_size);
80✔
164
}
80✔
165

166
static void add_udp_reader_bind_v4(
40✔
167
    stream &s,
168
    const std::string &address,
169
    std::uint16_t port,
170
    std::size_t max_size,
171
    std::size_t buffer_size,
172
    const std::string &interface_address)
173
{
174
    py::gil_scoped_release gil;
40✔
175
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
40✔
176
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, make_address(s, interface_address));
40✔
177
}
40✔
178

179
static void add_udp_reader_bind_v6(
40✔
180
    stream &s,
181
    const std::string &address,
182
    std::uint16_t port,
183
    std::size_t max_size,
184
    std::size_t buffer_size,
185
    unsigned int interface_index)
186
{
187
    py::gil_scoped_release gil;
40✔
188
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
40✔
189
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, interface_index);
40✔
190
}
40✔
191

192
static void add_tcp_reader(
36✔
193
    stream &s,
194
    std::uint16_t port,
195
    std::size_t max_size,
196
    std::size_t buffer_size,
197
    const std::string &bind_hostname)
198
{
199
    py::gil_scoped_release gil;
36✔
200
    auto endpoint = make_endpoint<boost::asio::ip::tcp>(s, bind_hostname, port);
36✔
201
    s.emplace_reader<tcp_reader>(endpoint, max_size, buffer_size);
36✔
202
}
36✔
203

204
static void add_tcp_reader_socket(
29✔
205
    stream &s,
206
    const socket_wrapper<boost::asio::ip::tcp::acceptor> &acceptor,
207
    std::size_t max_size)
208
{
209
    auto asio_socket = acceptor.copy(s.get_io_service());
29✔
210
    py::gil_scoped_release gil;
29✔
211
    s.emplace_reader<tcp_reader>(std::move(asio_socket), max_size);
29✔
212
}
29✔
213

214
#if SPEAD2_USE_IBV
215
static void add_udp_ibv_reader_single(
×
216
    stream &s,
217
    const std::string &address,
218
    std::uint16_t port,
219
    const std::string &interface_address,
220
    std::size_t max_size,
221
    std::size_t buffer_size,
222
    int comp_vector,
223
    int max_poll)
224
{
225
    deprecation_warning("Use a UdpIbvConfig instead");
×
226
    py::gil_scoped_release gil;
×
227
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
×
228
    s.emplace_reader<udp_ibv_reader>(
×
229
        udp_ibv_config()
×
230
            .add_endpoint(endpoint)
×
231
            .set_interface_address(make_address(s, interface_address))
×
232
            .set_max_size(max_size)
×
233
            .set_buffer_size(buffer_size)
×
234
            .set_comp_vector(comp_vector)
×
235
            .set_max_poll(max_poll));
236
}
×
237

238
static void add_udp_ibv_reader_multi(
×
239
    stream &s,
240
    const py::sequence &endpoints,
241
    const std::string &interface_address,
242
    std::size_t max_size,
243
    std::size_t buffer_size,
244
    int comp_vector,
245
    int max_poll)
246
{
247
    deprecation_warning("Use a UdpIbvConfig instead");
×
248
    // TODO: could this conversion be done by a custom caster?
249
    udp_ibv_config config;
×
250
    for (size_t i = 0; i < len(endpoints); i++)
×
251
    {
252
        py::sequence endpoint = endpoints[i].cast<py::sequence>();
×
253
        std::string address = endpoint[0].cast<std::string>();
×
254
        std::uint16_t port = endpoint[1].cast<std::uint16_t>();
×
255
        config.add_endpoint(make_endpoint<boost::asio::ip::udp>(s, address, port));
×
256
    }
×
257
    py::gil_scoped_release gil;
×
258
    config.set_interface_address(make_address(s, interface_address));
×
259
    config.set_max_size(max_size);
×
260
    config.set_buffer_size(buffer_size);
×
261
    config.set_comp_vector(comp_vector);
×
262
    config.set_max_poll(max_poll);
×
263
    s.emplace_reader<udp_ibv_reader>(config);
×
264
}
×
265

266
static void add_udp_ibv_reader_new(stream &s, const udp_ibv_config_wrapper &config_wrapper)
5✔
267
{
268
    py::gil_scoped_release gil;
5✔
269
    udp_ibv_config config = config_wrapper;
5✔
270
    for (const auto &endpoint : config_wrapper.py_endpoints)
8✔
271
        config.add_endpoint(make_endpoint<boost::asio::ip::udp>(
4✔
272
            s, endpoint.first, endpoint.second));
4✔
273
    config.set_interface_address(
3✔
274
        make_address(s, config_wrapper.py_interface_address));
5✔
275
    s.emplace_reader<udp_ibv_reader>(config);
2✔
276
}
10✔
277
#endif  // SPEAD2_USE_IBV
278

279
#if SPEAD2_USE_PCAP
280
static void add_udp_pcap_file_reader(stream &s, const std::string &filename, const std::string &filter)
×
281
{
282
    py::gil_scoped_release gil;
×
283
    s.emplace_reader<udp_pcap_file_reader>(filename, filter);
×
284
}
×
285
#endif
286

287
static void add_inproc_reader(stream &s, std::shared_ptr<inproc_queue> queue)
145✔
288
{
289
    py::gil_scoped_release gil;
145✔
290
    s.emplace_reader<inproc_reader>(queue);
145✔
291
}
145✔
292

293
class ring_stream_config_wrapper : public ring_stream_config
294
{
295
private:
296
    bool incomplete_keep_payload_ranges = false;
297

298
public:
299
    ring_stream_config_wrapper() = default;
53✔
300

301
    ring_stream_config_wrapper(const ring_stream_config &base) :
×
302
        ring_stream_config(base)
×
303
    {
304
    }
×
305

306
    ring_stream_config_wrapper &set_incomplete_keep_payload_ranges(bool keep)
3✔
307
    {
308
        incomplete_keep_payload_ranges = keep;
3✔
309
        return *this;
3✔
310
    }
311

312
    bool get_incomplete_keep_payload_ranges() const
506✔
313
    {
314
        return incomplete_keep_payload_ranges;
506✔
315
    }
316
};
317

318
/**
319
 * Stream that handles the magic necessary to reflect heaps into
320
 * Python space and capture the reference to it.
321
 *
322
 * The GIL needs to be handled carefully. Any operation run by the thread pool
323
 * might need to take the GIL to do logging. Thus, any operation that blocks
324
 * on completion of code scheduled through the thread pool must drop the GIL
325
 * first.
326
 */
327
class ring_stream_wrapper : public ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >
328
{
329
private:
330
    bool incomplete_keep_payload_ranges;
331
    exit_stopper stopper{[this] { stop(); }};
6✔
332

333
    py::object to_object(live_heap &&h)
485✔
334
    {
335
        if (h.is_contiguous())
485✔
336
            return py::cast(heap(std::move(h)), py::return_value_policy::move);
966✔
337
        else
338
            return py::cast(incomplete_heap(std::move(h), false, incomplete_keep_payload_ranges),
4✔
339
                            py::return_value_policy::move);
4✔
340
    }
341

342
public:
343
    ring_stream_wrapper(
504✔
344
        io_service_ref io_service,
345
        const stream_config &config = stream_config(),
346
        const ring_stream_config_wrapper &ring_config = ring_stream_config_wrapper())
347
        : ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore>>(
504✔
348
            std::move(io_service), config, ring_config),
504✔
349
        incomplete_keep_payload_ranges(ring_config.get_incomplete_keep_payload_ranges())
504✔
350
    {}
504✔
351

352
    py::object next()
961✔
353
    {
354
        try
355
        {
356
            return get();
1,444✔
357
        }
358
        catch (ringbuffer_stopped &e)
478✔
359
        {
360
            throw py::stop_iteration();
478✔
361
        }
478✔
362
    }
363

364
    py::object get()
961✔
365
    {
366
        return to_object(ring_stream::pop_live(gil_release_tag()));
1,444✔
367
    }
368

369
    py::object get_nowait()
4✔
370
    {
371
        return to_object(try_pop_live());
6✔
372
    }
373

374
    int get_fd() const
145✔
375
    {
376
        return get_ringbuffer().get_data_sem().get_fd();
145✔
377
    }
378

379
    ring_stream_config_wrapper get_ring_config() const
×
380
    {
381
        ring_stream_config_wrapper ring_config(
382
            ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >::get_ring_config());
×
383
        ring_config.set_incomplete_keep_payload_ranges(incomplete_keep_payload_ranges);
×
384
        return ring_config;
×
385
    }
386

387
    virtual void stop() override
511✔
388
    {
389
        stopper.reset();
511✔
390
        py::gil_scoped_release gil;
511✔
391
        ring_stream::stop();
511✔
392
    }
511✔
393

394
    ~ring_stream_wrapper()
1,008✔
395
    {
504✔
396
        stop();
504✔
397
    }
1,008✔
398
};
399

400
/**
401
 * Package a chunk with a reference to the original Python object.
402
 * This is used
403
 * only for chunks in the ringbuffer, not those owned by Python.
404
 */
405
class chunk_wrapper : public chunk
406
{
407
public:
408
    py::object obj;
409
};
410

411
/**
412
 * Get the original Python object from a wrapped chunk, and
413
 * restore its pointers.
414
 */
415
static py::object unwrap_chunk(std::unique_ptr<chunk> &&c)
321✔
416
{
417
    chunk_wrapper &cw = dynamic_cast<chunk_wrapper &>(*c);
321✔
418
    chunk &orig = cw.obj.cast<chunk &>();
321✔
419
    py::object ret = std::move(cw.obj);
321✔
420
    orig = std::move(*c);
321✔
421
    return ret;
321✔
422
}
423

424
/**
425
 * Wrap up a Python chunk into an object that can traverse the ringbuffer.
426
 * Python doesn't allow ownership to be given away, so we have to create a
427
 * new C++ object which refers back to the original Python object to keep
428
 * it alive.
429
 */
430
static std::unique_ptr<chunk_wrapper> wrap_chunk(chunk &c)
426✔
431
{
432
    if (!c.data)
426✔
433
        throw std::invalid_argument("data buffer is not set");
1✔
434
    if (!c.present)
425✔
435
        throw std::invalid_argument("present buffer is not set");
1✔
436
    std::unique_ptr<chunk_wrapper> cw{new chunk_wrapper};
424✔
437
    static_cast<chunk &>(*cw) = std::move(c);
424✔
438
    cw->obj = py::cast(c);
424✔
439
    return cw;
424✔
440
}
×
441

442
/**
443
 * Push a chunk onto a ringbuffer. The specific operation is described by
444
 * @a func; this function takes care of wrapping in @ref chunk_wrapper.
445
 */
446
template<typename T>
447
static void push_chunk(T func, chunk &c)
426✔
448
{
449
    /* Note: the type of 'wrapper' must exactly match what the ringbuffer
450
     * expects, otherwise it constructs a new, temporary unique_ptr by
451
     * moving from 'wrapper', and we lose ownership in the failure path.
452
     */
453
    std::unique_ptr<chunk> wrapper = wrap_chunk(c);
426✔
454
    try
455
    {
456
        func(std::move(wrapper));
424✔
457
    }
458
    catch (std::exception &)
10✔
459
    {
460
        // Undo the move that happened as part of wrapping
461
        if (wrapper)
5✔
462
            c = std::move(*wrapper);
5✔
463
        throw;
5✔
464
    }
465
}
424✔
466

467
typedef ringbuffer<std::unique_ptr<chunk>, semaphore_fd, semaphore_fd> chunk_ringbuffer;
468

469
/* Note: ring_stream_wrapper drops the GIL while stopping. We
470
 * can't do that here because stop() can free chunks that were
471
 * in flight, which involves interaction with the Python API.
472
 * I think the only reason ring_stream_wrapper drops the GIL is
473
 * that logging used to directly acquire the GIL, and so if stop()
474
 * did any logging it would deadlock. Now that logging is pushed
475
 * off to a separate thread that should no longer be an issue.
476
 */
477
#define EXIT_STOPPER_WRAPPER(cls, base)                   \
478
    class cls : public base                               \
479
    {                                                     \
480
    private:                                              \
481
        exit_stopper stopper{[this] { stop(); }};         \
482
    public:                                               \
483
        using base::base;                                 \
484
        virtual void stop() override                      \
485
        {                                                 \
486
            stopper.reset();                              \
487
            base::stop();                                 \
488
        }                                                 \
489
    }
490

491
// These aliases are needed because a type passed to a macro cannot contain a comma
492
using chunk_ring_stream_orig = chunk_ring_stream<chunk_ringbuffer, chunk_ringbuffer>;
493
using chunk_stream_ring_group_orig = chunk_stream_ring_group<chunk_ringbuffer, chunk_ringbuffer>;
494

495
EXIT_STOPPER_WRAPPER(chunk_ring_stream_wrapper, chunk_ring_stream_orig);
11✔
496
EXIT_STOPPER_WRAPPER(chunk_stream_ring_group_wrapper, chunk_stream_ring_group_orig);
15✔
497
// We don't need to wrap chunk_stream_group_member, because we've wrapped
498
// chunk_stream_ring_group and its stop will stop the member streams.
499

500
#undef EXIT_STOPPER_WRAPPER
501

502
/// Register the receiver module with Python
503
py::module register_module(py::module &parent)
4✔
504
{
505
    using namespace pybind11::literals;
506

507
    // Create the module
508
    py::module m = parent.def_submodule("recv");
4✔
509

510
    py::class_<heap_base>(m, "HeapBase")
4✔
511
        .def_property_readonly("cnt", SPEAD2_PTMF(heap_base, get_cnt))
4✔
512
        .def_property_readonly("flavour", SPEAD2_PTMF(heap_base, get_flavour))
4✔
513
        .def("get_items", [](py::object &self) -> py::list
4✔
514
        {
515
            const heap_base &h = self.cast<const heap_base &>();
456✔
516
            std::vector<item> base = h.get_items();
456✔
517
            py::list out;
456✔
518
            for (const item &it : base)
2,837✔
519
            {
520
                // Filter out descriptors here. The base class can't do so, because
521
                // the descriptors are retrieved from the items.
522
                if (it.id != DESCRIPTOR_ID)
2,381✔
523
                    out.append(item_wrapper(it, self));
1,195✔
524
            }
525
            return out;
912✔
526
        })
456✔
527
        .def("is_start_of_stream", SPEAD2_PTMF(heap_base, is_start_of_stream))
4✔
528
        .def("is_end_of_stream", SPEAD2_PTMF(heap_base, is_end_of_stream));
4✔
529
    py::class_<heap, heap_base>(m, "Heap")
4✔
530
        .def("get_descriptors", SPEAD2_PTMF(heap, get_descriptors));
4✔
531
    py::class_<incomplete_heap, heap_base>(m, "IncompleteHeap")
4✔
532
        .def_property_readonly("heap_length", SPEAD2_PTMF(incomplete_heap, get_heap_length))
4✔
533
        .def_property_readonly("received_length", SPEAD2_PTMF(incomplete_heap, get_received_length))
4✔
534
        .def_property_readonly("payload_ranges", SPEAD2_PTMF(incomplete_heap, get_payload_ranges));
4✔
535
    py::class_<item_wrapper>(m, "RawItem", py::buffer_protocol())
4✔
536
        .def_readonly("id", &item_wrapper::id)
4✔
537
        .def_readonly("is_immediate", &item_wrapper::is_immediate)
4✔
538
        .def_readonly("immediate_value", &item_wrapper::immediate_value)
4✔
539
        .def_buffer([](item_wrapper &item) { return item.get_value(); });
1,193✔
540

541
    py::class_<stream_stat_config> stream_stat_config_cls(m, "StreamStatConfig");
4✔
542
    /* We have to register the embedded enum type before we can use it as a
543
     * default value for the stream_stat constructor/
544
     */
545
    py::enum_<stream_stat_config::mode>(stream_stat_config_cls, "Mode")
8✔
546
        .value("COUNTER", stream_stat_config::mode::COUNTER)
4✔
547
        .value("MAXIMUM", stream_stat_config::mode::MAXIMUM);
4✔
548
    stream_stat_config_cls
549
        .def(
4✔
550
            py::init<std::string, stream_stat_config::mode>(),
4✔
551
            "name"_a, "mode"_a = stream_stat_config::mode::COUNTER)
4✔
552
        .def_property_readonly("name", SPEAD2_PTMF(stream_stat_config, get_name))
4✔
553
        .def_property_readonly("mode", SPEAD2_PTMF(stream_stat_config, get_mode))
4✔
554
        .def("combine", SPEAD2_PTMF(stream_stat_config, combine))
4✔
555
        .def(py::self == py::self)
4✔
556
        .def(py::self != py::self);
4✔
557
    py::class_<stream_stats> stream_stats_cls(m, "StreamStats");
4✔
558
    stream_stats_cls
559
        .def("__getitem__", [](const stream_stats &self, std::size_t index)
4✔
560
        {
561
            if (index < self.size())
5✔
562
                return self[index];
4✔
563
            else
564
                throw py::index_error();
1✔
565
        })
566
        .def("__getitem__", [](const stream_stats &self, const std::string &name)
4✔
567
        {
568
            auto pos = self.find(name);
57✔
569
            if (pos == self.end())
57✔
570
                throw py::key_error(name);
1✔
571
            return pos->second;
56✔
572
        })
573
        .def("__setitem__", [](stream_stats &self, std::size_t index, std::uint64_t value)
4✔
574
        {
575
            if (index < self.size())
×
576
                self[index] = value;
×
577
            else
578
                throw py::index_error();
×
579
        })
×
580
        .def("__setitem__", [](stream_stats &self, const std::string &name, std::uint64_t value)
4✔
581
        {
582
            auto pos = self.find(name);
28✔
583
            if (pos == self.end())
28✔
584
                throw py::key_error(name);
×
585
            pos->second = value;
28✔
586
        })
28✔
587
        .def("__contains__", [](const stream_stats &self, const std::string &name)
4✔
588
        {
589
            return self.find(name) != self.end();
3✔
590
        })
591
        .def("get", [](const stream_stats &self, const std::string &name, py::object &default_)
4✔
592
        {
593
            auto pos = self.find(name);
×
594
            return pos != self.end() ? py::int_(pos->second) : default_;
×
595
        }, py::arg(), py::arg() = py::none())
8✔
596
        /* TODO: keys, values and items should ideally return view that
597
         * simulate Python's dictionary views (py::bind_map does this, but it
598
         * can't be used because it expects the map to implement erase).
599
         */
600
        .def(
4✔
601
            "items",
602
            [](const stream_stats &self) { return py::make_iterator(self.begin(), self.end()); },
1✔
603
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
604
        )
605
        .def(
4✔
606
            "__iter__",
607
            [](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
2✔
608
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
609
        )
610
        .def(
4✔
611
            "keys",
612
            [](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
1✔
613
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
614
        )
615
        .def(
4✔
616
            "values",
617
            [](const stream_stats &self) { return py::make_value_iterator(self.begin(), self.end()); },
1✔
618
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
619
        )
620
        .def("__len__", SPEAD2_PTMF(stream_stats, size))
4✔
621
        .def_property_readonly("config", SPEAD2_PTMF(stream_stats, get_config))
4✔
622
        .def(py::self + py::self)
4✔
623
        .def(py::self += py::self);
4✔
624

625
    py::module stream_stat_indices_module = m.def_submodule("stream_stat_indices");
4✔
626
    /* The macro registers a property on stream_stats to access the built-in stats
627
     * by name, and at the same time populates the index constant in submodule
628
     * stream_stat_indices (upper-casing it).
629
     */
630
#define STREAM_STATS_PROPERTY(field) \
631
    do { \
632
        stream_stats_cls.def_property( \
633
            #field, \
634
            [](const stream_stats &self) { return self[stream_stat_indices::field]; }, \
635
            [](stream_stats &self, std::uint64_t value) { self[stream_stat_indices::field] = value; }); \
636
        std::string upper = #field; \
637
        std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); \
638
        stream_stat_indices_module.attr(upper.c_str()) = stream_stat_indices::field; \
639
    } while (false)
640

641
    STREAM_STATS_PROPERTY(heaps);
20✔
642
    STREAM_STATS_PROPERTY(incomplete_heaps_evicted);
20✔
643
    STREAM_STATS_PROPERTY(incomplete_heaps_flushed);
20✔
644
    STREAM_STATS_PROPERTY(packets);
20✔
645
    STREAM_STATS_PROPERTY(batches);
18✔
646
    STREAM_STATS_PROPERTY(worker_blocked);
23✔
647
    STREAM_STATS_PROPERTY(max_batch);
18✔
648
    STREAM_STATS_PROPERTY(single_packet_heaps);
18✔
649
    STREAM_STATS_PROPERTY(search_dist);
18✔
650
#undef STREAM_STATS_PROPERTY
651

652
    py::class_<stream_config>(m, "StreamConfig")
4✔
653
        .def(py::init(&data_class_constructor<stream_config>))
4✔
654
        .def_property("max_heaps",
4✔
655
                      SPEAD2_PTMF(stream_config, get_max_heaps),
4✔
656
                      SPEAD2_PTMF(stream_config, set_max_heaps))
4✔
657
        .def_property("substreams",
4✔
658
                      SPEAD2_PTMF(stream_config, get_substreams),
4✔
659
                      SPEAD2_PTMF(stream_config, set_substreams))
4✔
660
        .def_property("bug_compat",
4✔
661
                      SPEAD2_PTMF(stream_config, get_bug_compat),
4✔
662
                      SPEAD2_PTMF(stream_config, set_bug_compat))
4✔
663
        .def_property("memcpy",
4✔
664
             [](const stream_config &self) {
247✔
665
                 stream_config cmp;
247✔
666
                 memcpy_function_id ids[] = {MEMCPY_STD, MEMCPY_NONTEMPORAL};
247✔
667
                 for (memcpy_function_id id : ids)
249✔
668
                 {
669
                     cmp.set_memcpy(id);
249✔
670
                     if (equal_functions(self.get_memcpy(), cmp.get_memcpy()))
249✔
671
                         return int(id);
247✔
672
                 }
673
                 throw std::invalid_argument("memcpy function is not one of the standard ones");
×
674
             },
247✔
675
             [](stream_config &self, int id) { self.set_memcpy(memcpy_function_id(id)); })
245✔
676
        .def_property("memory_allocator",
4✔
677
                      SPEAD2_PTMF(stream_config, get_memory_allocator),
4✔
678
                      SPEAD2_PTMF_VOID(stream_config, set_memory_allocator))
4✔
679
        .def_property("stop_on_stop_item",
4✔
680
                      SPEAD2_PTMF(stream_config, get_stop_on_stop_item),
4✔
681
                      SPEAD2_PTMF_VOID(stream_config, set_stop_on_stop_item))
4✔
682
        .def_property("allow_unsized_heaps",
4✔
683
                      SPEAD2_PTMF(stream_config, get_allow_unsized_heaps),
4✔
684
                      SPEAD2_PTMF_VOID(stream_config, set_allow_unsized_heaps))
4✔
685
        .def_property("allow_out_of_order",
4✔
686
                      SPEAD2_PTMF(stream_config, get_allow_out_of_order),
4✔
687
                      SPEAD2_PTMF_VOID(stream_config, set_allow_out_of_order))
4✔
688
        .def_property("stream_id",
4✔
689
                      SPEAD2_PTMF(stream_config, get_stream_id),
4✔
690
                      SPEAD2_PTMF(stream_config, set_stream_id))
4✔
691
        .def("add_stat", SPEAD2_PTMF(stream_config, add_stat),
4✔
692
             "name"_a,
4✔
693
             "mode"_a = stream_stat_config::mode::COUNTER)
8✔
694
        .def_property_readonly("stats", SPEAD2_PTMF(stream_config, get_stats))
4✔
695
        .def("get_stat_index", SPEAD2_PTMF(stream_config, get_stat_index),
4✔
696
             "name"_a)
4✔
697
        .def("next_stat_index", SPEAD2_PTMF(stream_config, next_stat_index))
8✔
698
        .def_readonly_static("DEFAULT_MAX_HEAPS", &stream_config::default_max_heaps);
4✔
699
    py::class_<ring_stream_config_wrapper>(m, "RingStreamConfig")
4✔
700
        .def(py::init(&data_class_constructor<ring_stream_config_wrapper>))
4✔
701
        .def_property("heaps",
4✔
702
                      SPEAD2_PTMF(ring_stream_config_wrapper, get_heaps),
4✔
703
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_heaps))
4✔
704
        .def_property("contiguous_only",
4✔
705
                      SPEAD2_PTMF(ring_stream_config_wrapper, get_contiguous_only),
4✔
706
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_contiguous_only))
4✔
707
        .def_property("incomplete_keep_payload_ranges",
4✔
708
                      SPEAD2_PTMF(ring_stream_config_wrapper, get_incomplete_keep_payload_ranges),
4✔
709
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_incomplete_keep_payload_ranges))
8✔
710
        .def_readonly_static("DEFAULT_HEAPS", &ring_stream_config_wrapper::default_heaps);
4✔
711
#if SPEAD2_USE_IBV
712
    py::class_<udp_ibv_config_wrapper>(m, "UdpIbvConfig")
4✔
713
        .def(py::init(&data_class_constructor<udp_ibv_config_wrapper>))
4✔
714
        .def_readwrite("endpoints", &udp_ibv_config_wrapper::py_endpoints)
4✔
715
        .def_readwrite("interface_address", &udp_ibv_config_wrapper::py_interface_address)
4✔
716
        .def_property("buffer_size",
4✔
717
                      SPEAD2_PTMF(udp_ibv_config_wrapper, get_buffer_size),
4✔
718
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_buffer_size))
4✔
719
        .def_property("max_size",
4✔
720
                      SPEAD2_PTMF(udp_ibv_config_wrapper, get_max_size),
4✔
721
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_size))
4✔
722
        .def_property("comp_vector",
4✔
723
                      SPEAD2_PTMF(udp_ibv_config_wrapper, get_comp_vector),
4✔
724
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_comp_vector))
4✔
725
        .def_property("max_poll",
4✔
726
                      SPEAD2_PTMF(udp_ibv_config_wrapper, get_max_poll),
4✔
727
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_poll))
8✔
728
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &udp_ibv_config_wrapper::default_buffer_size)
4✔
729
        .def_readonly_static("DEFAULT_MAX_SIZE", &udp_ibv_config_wrapper::default_max_size)
4✔
730
        .def_readonly_static("DEFAULT_MAX_POLL", &udp_ibv_config_wrapper::default_max_poll);
4✔
731
#endif // SPEAD2_USE_IBV
732
    py::class_<stream>(m, "_Stream")
4✔
733
        // SPEAD2_PTMF doesn't work for get_stats because it's defined in stream_base, which is a protected ancestor
734
        .def_property_readonly("stats", [](const stream &self) { return self.get_stats(); })
28✔
735
        .def_property_readonly("config",
4✔
736
                               [](const stream &self) { return self.get_config(); })
×
737
        .def("add_buffer_reader", add_buffer_reader, "buffer"_a)
4✔
738
        .def("add_udp_reader", add_udp_reader,
4✔
739
              "port"_a,
4✔
740
              "max_size"_a = udp_reader::default_max_size,
4✔
741
              "buffer_size"_a = udp_reader::default_buffer_size,
8✔
742
              "bind_hostname"_a = std::string())
8✔
743
        .def("add_udp_reader", add_udp_reader_socket,
4✔
744
              "socket"_a,
4✔
745
              "max_size"_a = udp_reader::default_max_size)
8✔
746
        .def("add_udp_reader", add_udp_reader_bind_v4,
4✔
747
              "multicast_group"_a,
4✔
748
              "port"_a,
4✔
749
              "max_size"_a = udp_reader::default_max_size,
4✔
750
              "buffer_size"_a = udp_reader::default_buffer_size,
8✔
751
              "interface_address"_a = "0.0.0.0")
8✔
752
        .def("add_udp_reader", add_udp_reader_bind_v6,
4✔
753
              "multicast_group"_a,
4✔
754
              "port"_a,
4✔
755
              "max_size"_a = udp_reader::default_max_size,
4✔
756
              "buffer_size"_a = udp_reader::default_buffer_size,
8✔
757
              "interface_index"_a = (unsigned int) 0)
8✔
758
        .def("add_tcp_reader", add_tcp_reader,
4✔
759
             "port"_a,
4✔
760
             "max_size"_a = tcp_reader::default_max_size,
4✔
761
             "buffer_size"_a = tcp_reader::default_buffer_size,
8✔
762
             "bind_hostname"_a = std::string())
8✔
763
        .def("add_tcp_reader", add_tcp_reader_socket,
4✔
764
             "acceptor"_a,
4✔
765
             "max_size"_a = tcp_reader::default_max_size)
8✔
766
#if SPEAD2_USE_IBV
767
        .def("add_udp_ibv_reader", add_udp_ibv_reader_single,
4✔
768
              "multicast_group"_a,
4✔
769
              "port"_a,
4✔
770
              "interface_address"_a,
4✔
771
              "max_size"_a = udp_ibv_config::default_max_size,
4✔
772
              "buffer_size"_a = udp_ibv_config::default_buffer_size,
8✔
773
              "comp_vector"_a = 0,
8✔
774
              "max_poll"_a = udp_ibv_config::default_max_poll)
8✔
775
        .def("add_udp_ibv_reader", add_udp_ibv_reader_multi,
4✔
776
              "endpoints"_a,
4✔
777
              "interface_address"_a,
4✔
778
              "max_size"_a = udp_ibv_config::default_max_size,
4✔
779
              "buffer_size"_a = udp_ibv_config::default_buffer_size,
8✔
780
              "comp_vector"_a = 0,
8✔
781
              "max_poll"_a = udp_ibv_config::default_max_poll)
8✔
782
        .def("add_udp_ibv_reader", add_udp_ibv_reader_new,
4✔
783
             "config"_a)
4✔
784
#endif
785
#if SPEAD2_USE_PCAP
786
        .def("add_udp_pcap_file_reader", add_udp_pcap_file_reader,
4✔
787
             "filename"_a, "filter"_a = "")
8✔
788
#endif
789
        .def("add_inproc_reader", add_inproc_reader,
4✔
790
             "queue"_a)
4✔
791
        .def("stop", SPEAD2_PTMF(stream, stop))
4✔
792
#if SPEAD2_USE_IBV
793
        .def_property_readonly_static("DEFAULT_UDP_IBV_MAX_SIZE",
4✔
794
            [](py::object) {
1✔
795
#ifndef PYPY_VERSION  // Workaround for https://github.com/pybind/pybind11/issues/3110
796
                deprecation_warning("Use spead2.recv.UdpIbvConfig.DEFAULT_MAX_SIZE");
1✔
797
#endif
798
                return udp_ibv_config::default_max_size;
1✔
799
            })
800
        .def_property_readonly_static("DEFAULT_UDP_IBV_BUFFER_SIZE",
4✔
801
            [](py::object) {
1✔
802
#ifndef PYPY_VERSION  // Workaround for https://github.com/pybind/pybind11/issues/3110
803
                deprecation_warning("Use spead2.recv.UdpIbvConfig.DEFAULT_BUFFER_SIZE");
1✔
804
#endif
805
                return udp_ibv_config::default_buffer_size;
1✔
806
            })
807
        .def_property_readonly_static("DEFAULT_UDP_IBV_MAX_POLL",
8✔
808
            [](py::object) {
1✔
809
#ifndef PYPY_VERSION  // Workaround for https://github.com/pybind/pybind11/issues/3110
810
                deprecation_warning("Use spead2.recv.UdpIbvConfig.DEFAULT_MAX_POLL");
1✔
811
#endif
812
                return udp_ibv_config::default_max_poll;
1✔
813
            })
814
#endif
815
        .def_readonly_static("DEFAULT_UDP_MAX_SIZE", &udp_reader::default_max_size)
4✔
816
        .def_readonly_static("DEFAULT_UDP_BUFFER_SIZE", &udp_reader::default_buffer_size)
4✔
817
        .def_readonly_static("DEFAULT_TCP_MAX_SIZE", &tcp_reader::default_max_size)
4✔
818
        .def_readonly_static("DEFAULT_TCP_BUFFER_SIZE", &tcp_reader::default_buffer_size);
4✔
819
    py::class_<ring_stream_wrapper, stream> stream_class(m, "Stream");
4✔
820
    stream_class
821
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
4✔
822
                      const stream_config &,
823
                      const ring_stream_config_wrapper &>(),
4✔
824
             "thread_pool"_a.none(false), "config"_a = stream_config(),
8✔
825
             "ring_config"_a = ring_stream_config_wrapper())
8✔
826
        .def("__iter__", [](py::object self) { return self; })
482✔
827
        .def("__next__", SPEAD2_PTMF(ring_stream_wrapper, next))
4✔
828
        .def("get", SPEAD2_PTMF(ring_stream_wrapper, get))
4✔
829
        .def("get_nowait", SPEAD2_PTMF(ring_stream_wrapper, get_nowait))
4✔
830
        .def_property_readonly("fd", SPEAD2_PTMF(ring_stream_wrapper, get_fd))
4✔
831
        .def_property_readonly("ringbuffer", SPEAD2_PTMF(ring_stream_wrapper, get_ringbuffer))
4✔
832
        .def_property_readonly("ring_config", SPEAD2_PTMF(ring_stream_wrapper, get_ring_config));
4✔
833
    using Ringbuffer = ringbuffer<live_heap, semaphore_fd, semaphore>;
834
    py::class_<Ringbuffer>(stream_class, "Ringbuffer")
4✔
835
        .def("size", SPEAD2_PTMF(Ringbuffer, size))
4✔
836
        .def("capacity", SPEAD2_PTMF(Ringbuffer, capacity));
4✔
837
    py::class_<chunk_stream_config>(m, "ChunkStreamConfig")
4✔
838
        .def(py::init(&data_class_constructor<chunk_stream_config>))
4✔
839
        .def_property("items",
4✔
840
                      SPEAD2_PTMF(chunk_stream_config, get_items),
4✔
841
                      SPEAD2_PTMF(chunk_stream_config, set_items))
4✔
842
        .def_property("max_chunks",
4✔
843
                      SPEAD2_PTMF(chunk_stream_config, get_max_chunks),
4✔
844
                      SPEAD2_PTMF(chunk_stream_config, set_max_chunks))
4✔
845
        .def_property(
4✔
846
            "place",
847
            [](const chunk_stream_config &config) {
76✔
848
                return callback_to_python(config.get_place());
76✔
849
            },
850
            [](chunk_stream_config &config, py::object obj) {
72✔
851
                config.set_place(callback_from_python<chunk_place_function>(
72✔
852
                    obj,
853
                    "void (void *, size_t)",
854
                    "void (void *, size_t, void *)"
855
                ));
856
            })
68✔
857
        .def(
4✔
858
            "enable_packet_presence", SPEAD2_PTMF(chunk_stream_config, enable_packet_presence),
4✔
859
            "payload_size"_a)
4✔
860
        .def("disable_packet_presence", SPEAD2_PTMF(chunk_stream_config, disable_packet_presence))
4✔
861
        .def_property_readonly("packet_presence_payload_size",
4✔
862
                               SPEAD2_PTMF(chunk_stream_config, get_packet_presence_payload_size))
4✔
863
        .def_property("max_heap_extra",
4✔
864
                      SPEAD2_PTMF(chunk_stream_config, get_max_heap_extra),
4✔
865
                      SPEAD2_PTMF(chunk_stream_config, set_max_heap_extra))
8✔
866
        .def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_config::default_max_chunks);
4✔
867
    py::class_<chunk>(m, "Chunk")
4✔
868
        .def(py::init(&data_class_constructor<chunk>))
4✔
869
        .def_readwrite("chunk_id", &chunk::chunk_id)
4✔
870
        .def_readwrite("stream_id", &chunk::stream_id)
4✔
871
        // Can't use def_readwrite for present, data, extra because they're
872
        // non-copyable types
873
        .def_property(
4✔
874
            "present",
875
            [](const chunk &c) -> const memory_allocator::pointer & { return c.present; },
766✔
876
            [](chunk &c, memory_allocator::pointer &&value)
131✔
877
            {
878
                if (value)
131✔
879
                {
880
                    auto *alloc = get_buffer_allocation(value);
130✔
881
                    assert(alloc != nullptr);
882
                    c.present_size = alloc->buffer_info.size * alloc->buffer_info.itemsize;
130✔
883
                }
884
                else
885
                    c.present_size = 0;
1✔
886
                c.present = std::move(value);
131✔
887
            })
131✔
888
        .def_property(
4✔
889
            "data",
890
            [](const chunk &c) -> const memory_allocator::pointer & { return c.data; },
759✔
891
            [](chunk &c, memory_allocator::pointer &&value) { c.data = std::move(value); })
131✔
892
        .def_property(
4✔
893
            "extra",
894
            [](const chunk &c) -> const memory_allocator::pointer & { return c.extra; },
147✔
895
            [](chunk &c, memory_allocator::pointer &&value) { c.extra = std::move(value); });
44✔
896
    // Don't allow ChunkRingPair to be constructed from Python. It exists
897
    // purely to be a base class.
898
    using chunk_ring_pair = detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>;
899
    py::class_<chunk_ring_pair>(m, "ChunkRingPair")
4✔
900
        .def(
4✔
901
            "add_free_chunk",
902
            [](chunk_ring_pair &self, chunk &c)
294✔
903
            {
904
                push_chunk(
294✔
905
                    [&self](std::unique_ptr<chunk> &&wrapper)
294✔
906
                    {
907
                        self.add_free_chunk(std::move(wrapper));
294✔
908
                    },
294✔
909
                    c
910
                );
911
            },
294✔
912
            "chunk"_a)
4✔
913
        .def_property_readonly("data_ringbuffer", SPEAD2_PTMF(chunk_ring_pair, get_data_ringbuffer))
4✔
914
        .def_property_readonly("free_ringbuffer", SPEAD2_PTMF(chunk_ring_pair, get_free_ringbuffer));
4✔
915

916
    py::class_<chunk_ring_stream_wrapper,
4✔
917
               detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>,
918
               stream>(m, "ChunkRingStream")
919
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
4✔
920
                      const stream_config &,
921
                      const chunk_stream_config &,
922
                      std::shared_ptr<chunk_ringbuffer>,
923
                      std::shared_ptr<chunk_ringbuffer>>(),
4✔
924
             "thread_pool"_a.none(false),
4✔
925
             "config"_a = stream_config(),
8✔
926
             "chunk_stream_config"_a,
4✔
927
             "data_ringbuffer"_a.none(false),
4✔
928
             "free_ringbuffer"_a.none(false),
4✔
929
            // Keep the Python ringbuffer objects alive, not just the C++ side.
930
            // This allows Python subclasses to be passed then later retrieved
931
            // from properties.
932
             py::keep_alive<1, 5>(),
×
933
             py::keep_alive<1, 6>());
4✔
934
    py::class_<chunk_ringbuffer, std::shared_ptr<chunk_ringbuffer>>(m, "ChunkRingbuffer")
4✔
935
        .def(py::init<std::size_t>(), "maxsize"_a)
4✔
936
        .def("qsize", SPEAD2_PTMF(chunk_ringbuffer, size))
4✔
937
        .def_property_readonly("maxsize", SPEAD2_PTMF(chunk_ringbuffer, capacity))
4✔
938
        .def_property_readonly(
4✔
939
            "data_fd",
940
            [](const chunk_ringbuffer &ring) { return ring.get_data_sem().get_fd(); })
7✔
941
        .def_property_readonly(
4✔
942
            "space_fd",
943
            [](const chunk_ringbuffer &ring) { return ring.get_space_sem().get_fd(); })
7✔
944
        .def("get", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.pop(gil_release_tag())); })
488✔
945
        .def("get_nowait", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.try_pop()); })
39✔
946
        .def(
4✔
947
            "put",
948
            [](chunk_ringbuffer &ring, chunk &c)
110✔
949
            {
950
                push_chunk(
110✔
951
                    [&ring](std::unique_ptr<chunk> &&wrapper)
108✔
952
                    {
953
                        ring.push(std::move(wrapper), gil_release_tag());
108✔
954
                    },
108✔
955
                    c
956
                );
957
            },
108✔
958
            "chunk"_a)
4✔
959
        .def(
4✔
960
            "put_nowait",
961
            [](chunk_ringbuffer &ring, chunk &c)
22✔
962
            {
963
                push_chunk(
22✔
964
                    [&ring](std::unique_ptr<chunk> &&wrapper) { ring.try_push(std::move(wrapper)); },
22✔
965
                    c
966
                );
967
            },
17✔
968
            "chunk"_a)
4✔
969
        .def("empty", [](const chunk_ringbuffer &ring) { return ring.size() == 0; })
8✔
970
        .def("full", [](const chunk_ringbuffer &ring) { return ring.size() == ring.capacity(); })
140✔
971
        .def("stop", SPEAD2_PTMF(chunk_ringbuffer, stop))
4✔
972
        .def("add_producer", SPEAD2_PTMF(chunk_ringbuffer, add_producer))
4✔
973
        .def("remove_producer", SPEAD2_PTMF(chunk_ringbuffer, remove_producer))
4✔
974
        .def("__iter__", [](py::object self) { return self; })
16✔
975
        .def(
4✔
976
            "__next__", [](chunk_ringbuffer &ring)
85✔
977
            {
978
                try
979
                {
980
                    return unwrap_chunk(ring.pop(gil_release_tag()));
158✔
981
                }
982
                catch (ringbuffer_stopped &)
12✔
983
                {
984
                    throw py::stop_iteration();
12✔
985
                }
12✔
986
            });
987

988
    py::class_<chunk_stream_group_config> chunk_stream_group_config_cls(m, "ChunkStreamGroupConfig");
4✔
989
    chunk_stream_group_config_cls
990
        .def(py::init(&data_class_constructor<chunk_stream_group_config>))
4✔
991
        .def_property("max_chunks",
4✔
992
                      SPEAD2_PTMF(chunk_stream_group_config, get_max_chunks),
4✔
993
                      SPEAD2_PTMF(chunk_stream_group_config, set_max_chunks))
4✔
994
        .def_property("eviction_mode",
4✔
995
                      SPEAD2_PTMF(chunk_stream_group_config, get_eviction_mode),
4✔
996
                      SPEAD2_PTMF(chunk_stream_group_config, set_eviction_mode))
4✔
997
        .def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_group_config::default_max_chunks);
4✔
998
    py::enum_<chunk_stream_group_config::eviction_mode>(chunk_stream_group_config_cls, "EvictionMode")
8✔
999
        .value("LOSSY", chunk_stream_group_config::eviction_mode::LOSSY)
4✔
1000
        .value("LOSSLESS", chunk_stream_group_config::eviction_mode::LOSSLESS);
4✔
1001

1002
    py::class_<chunk_stream_group_member, stream>(m, "ChunkStreamGroupMember");
4✔
1003

1004
    py::class_<chunk_stream_ring_group_wrapper,
4✔
1005
               detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>>(m, "ChunkStreamRingGroup")
1006
        .def(py::init<const chunk_stream_group_config &,
4✔
1007
                      std::shared_ptr<chunk_ringbuffer>,
1008
                      std::shared_ptr<chunk_ringbuffer>>(),
4✔
1009
             "config"_a,
4✔
1010
             "data_ringbuffer"_a.none(false),
4✔
1011
             "free_ringbuffer"_a.none(false),
4✔
1012
            // Keep the Python ringbuffer objects alive, not just the C++ side.
1013
            // This allows Python subclasses to be passed then later retrieved
1014
            // from properties.
1015
            py::keep_alive<1, 3>(),
×
1016
            py::keep_alive<1, 4>())
×
1017
        .def_property_readonly(
4✔
1018
            "config", SPEAD2_PTMF(chunk_stream_ring_group_wrapper, get_config))
4✔
1019
        .def(
4✔
1020
            "emplace_back",
1021
            [](chunk_stream_ring_group_wrapper &group,
92✔
1022
               std::shared_ptr<thread_pool_wrapper> thread_pool,
1023
               const stream_config &config,
1024
               const chunk_stream_config &chunk_stream_config) -> chunk_stream_group_member & {
1025
                return group.emplace_back(std::move(thread_pool), config, chunk_stream_config);
92✔
1026
            },
1027
            "thread_pool"_a, "config"_a, "chunk_stream_config"_a,
4✔
1028
            py::return_value_policy::reference_internal
4✔
1029
        )
1030
        .def("__len__", SPEAD2_PTMF(chunk_stream_ring_group_wrapper, size))
4✔
1031
        .def(
×
1032
            "__getitem__",
1033
            [](chunk_stream_ring_group_wrapper &group, std::ptrdiff_t index) -> chunk_stream_group_member & {
108✔
1034
                if (index < 0)
108✔
1035
                    index += group.size();
4✔
1036
                if (index >= 0 && std::size_t(index) < group.size())
108✔
1037
                    return group[index];
86✔
1038
                else
1039
                    throw py::index_error();
22✔
1040
            },
1041
            py::return_value_policy::reference_internal
4✔
1042
        )
1043
        .def(
4✔
1044
            "__getitem__",
1045
            [](chunk_stream_ring_group_wrapper &group, const py::slice &slice) {
4✔
1046
                py::list out;
4✔
1047
                std::size_t start, stop, step, length;
1048
                if (!slice.compute(group.size(), &start, &stop, &step, &length))
4✔
1049
                    throw py::error_already_set();
×
1050
                py::object self = py::cast(group);
4✔
1051
                for (std::size_t i = 0; i < length; i++) {
12✔
1052
                    out.append(py::cast(group[start], py::return_value_policy::reference_internal, self));
8✔
1053
                    start += step;
8✔
1054
                }
1055
                return out;
8✔
1056
            }
4✔
1057
        )
1058
        .def("stop", SPEAD2_PTMF(chunk_stream_ring_group_wrapper, stop));
4✔
1059

1060
    return m;
8✔
1061
}
4✔
1062

1063
} // namespace recv
1064
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc