• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 8735270455

18 Apr 2024 09:05AM UTC coverage: 78.789% (-0.02%) from 78.811%
8735270455

Pull #317

github

web-flow
Merge b4467a061 into 018ec942b
Pull Request #317: Update CI builds on MacOS

1 of 1 new or added line in 1 file covered. (100.0%)

2 existing lines in 2 files now uncovered.

5568 of 7067 relevant lines covered (78.79%)

91388.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.19
/src/py_recv.cpp
1
/* Copyright 2015, 2017, 2020-2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <pybind11/operators.h>
24
#include <algorithm>
25
#include <stdexcept>
26
#include <type_traits>
27
#include <cstdint>
28
#include <cctype>
29
#include <unistd.h>
30
#include <sys/socket.h>
31
#include <spead2/recv_udp.h>
32
#include <spead2/recv_udp_ibv.h>
33
#include <spead2/recv_udp_pcap.h>
34
#include <spead2/recv_tcp.h>
35
#include <spead2/recv_mem.h>
36
#include <spead2/recv_inproc.h>
37
#include <spead2/recv_stream.h>
38
#include <spead2/recv_ring_stream.h>
39
#include <spead2/recv_chunk_stream.h>
40
#include <spead2/recv_chunk_stream_group.h>
41
#include <spead2/recv_live_heap.h>
42
#include <spead2/recv_heap.h>
43
#include <spead2/common_ringbuffer.h>
44
#include <spead2/py_common.h>
45

46
namespace py = pybind11;
47

48
namespace spead2
49
{
50
namespace recv
51
{
52

53
/* True if both arguments are plain function pointers and point to the same function */
54
template<typename R, typename... Args>
55
static bool equal_functions(const std::function<R(Args...)> &a,
264✔
56
                            const std::function<R(Args...)> &b)
57
{
58
    using ptr = R (*)(Args...);
59
    const ptr *x = a.template target<ptr>();
264✔
60
    const ptr *y = b.template target<ptr>();
264✔
61
    return x && y && *x == *y;
264✔
62
}
63

64
/**
65
 * Wraps @ref item to provide safe memory management. The item references
66
 * memory inside the heap, so it needs to hold a reference to that
67
 * heap, as do any memoryviews created on the value.
68
 */
69
class item_wrapper : public item
70
{
71
private:
72
    /// Python object containing a @ref heap
73
    py::object owning_heap;
74

75
public:
76
    item_wrapper() = default;
77
    item_wrapper(const item &it, py::object owning_heap)
1,210✔
78
        : item(it), owning_heap(std::move(owning_heap)) {}
1,210✔
79

80
    /**
81
     * Obtain the raw value using Python buffer protocol.
82
     */
83
    py::buffer_info get_value() const
1,204✔
84
    {
85
        return py::buffer_info(
86
            reinterpret_cast<void *>(ptr),
1,204✔
87
            1,      // size of individual elements
88
            py::format_descriptor<std::uint8_t>::format(),
1,204✔
89
            length);
2,408✔
90
    }
91
};
92

93
/**
94
 * Extends mem_reader to obtain data using the Python buffer protocol.
95
 */
96
class buffer_reader : public mem_reader
97
{
98
private:
99
    py::buffer_info view;
100
public:
101
    explicit buffer_reader(stream &s, py::buffer_info view)
56✔
102
        : mem_reader(s, reinterpret_cast<const std::uint8_t *>(view.ptr), view.itemsize * view.size),
56✔
103
        view(std::move(view))
56✔
104
    {
105
    }
56✔
106
};
107

108
#if SPEAD2_USE_IBV
109
/* Managing the endpoints and interface address requires some sleight of
110
 * hand. We store a separate copy in the wrapper in a Python-centric format.
111
 * When constructing the reader, we make a copy with the C++ view.
112
 */
113
class udp_ibv_config_wrapper : public udp_ibv_config
114
{
115
public:
116
    std::vector<std::pair<std::string, std::uint16_t>> py_endpoints;
117
    std::string py_interface_address;
118
};
119
#endif // SPEAD2_USE_IBV
120

121
static boost::asio::ip::address make_address(stream &s, const std::string &hostname)
295✔
122
{
123
    return make_address_no_release(s.get_io_service(), hostname,
124
                                   boost::asio::ip::udp::resolver::query::passive);
295✔
125
}
126

127
template<typename Protocol>
128
static typename Protocol::endpoint make_endpoint(
250✔
129
    stream &s, const std::string &hostname, std::uint16_t port)
130
{
131
    return typename Protocol::endpoint(make_address(s, hostname), port);
250✔
132
}
133

134
static void add_buffer_reader(stream &s, py::buffer buffer)
57✔
135
{
136
    py::buffer_info info = request_buffer_info(buffer, PyBUF_C_CONTIGUOUS);
57✔
137
    py::gil_scoped_release gil;
57✔
138
    s.emplace_reader<buffer_reader>(std::move(info));
57✔
139
}
58✔
140

141
static void add_udp_reader(
125✔
142
    stream &s,
143
    std::uint16_t port,
144
    std::size_t max_size,
145
    std::size_t buffer_size,
146
    const std::string &bind_hostname)
147
{
148
    py::gil_scoped_release gil;
125✔
149
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, bind_hostname, port);
125✔
150
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size);
125✔
151
}
125✔
152

153
static void add_udp_reader_socket(
82✔
154
    stream &s,
155
    const socket_wrapper<boost::asio::ip::udp::socket> &socket,
156
    std::size_t max_size = udp_reader::default_max_size)
157
{
158
    auto asio_socket = socket.copy(s.get_io_service());
82✔
159
    py::gil_scoped_release gil;
82✔
160
    s.emplace_reader<udp_reader>(std::move(asio_socket), max_size);
82✔
161
}
82✔
162

163
static void add_udp_reader_bind_v4(
41✔
164
    stream &s,
165
    const std::string &address,
166
    std::uint16_t port,
167
    std::size_t max_size,
168
    std::size_t buffer_size,
169
    const std::string &interface_address)
170
{
171
    py::gil_scoped_release gil;
41✔
172
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
41✔
173
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, make_address(s, interface_address));
41✔
174
}
41✔
175

176
static void add_udp_reader_bind_v6(
41✔
177
    stream &s,
178
    const std::string &address,
179
    std::uint16_t port,
180
    std::size_t max_size,
181
    std::size_t buffer_size,
182
    unsigned int interface_index)
183
{
184
    py::gil_scoped_release gil;
41✔
185
    auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
41✔
186
    s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, interface_index);
41✔
187
}
41✔
188

189
static void add_tcp_reader(
39✔
190
    stream &s,
191
    std::uint16_t port,
192
    std::size_t max_size,
193
    std::size_t buffer_size,
194
    const std::string &bind_hostname)
195
{
196
    py::gil_scoped_release gil;
39✔
197
    auto endpoint = make_endpoint<boost::asio::ip::tcp>(s, bind_hostname, port);
39✔
198
    s.emplace_reader<tcp_reader>(endpoint, max_size, buffer_size);
39✔
199
}
39✔
200

201
static void add_tcp_reader_socket(
31✔
202
    stream &s,
203
    const socket_wrapper<boost::asio::ip::tcp::acceptor> &acceptor,
204
    std::size_t max_size)
205
{
206
    auto asio_socket = acceptor.copy(s.get_io_service());
31✔
207
    py::gil_scoped_release gil;
31✔
208
    s.emplace_reader<tcp_reader>(std::move(asio_socket), max_size);
31✔
209
}
31✔
210

211
#if SPEAD2_USE_IBV
212
static void add_udp_ibv_reader(stream &s, const udp_ibv_config_wrapper &config_wrapper)
5✔
213
{
214
    py::gil_scoped_release gil;
5✔
215
    udp_ibv_config config = config_wrapper;
5✔
216
    for (const auto &[host, port] : config_wrapper.py_endpoints)
8✔
217
        config.add_endpoint(make_endpoint<boost::asio::ip::udp>(s, host, port));
4✔
218
    config.set_interface_address(
3✔
219
        make_address(s, config_wrapper.py_interface_address));
5✔
220
    s.emplace_reader<udp_ibv_reader>(config);
2✔
221
}
10✔
222
#endif  // SPEAD2_USE_IBV
223

224
#if SPEAD2_USE_PCAP
225
static void add_udp_pcap_file_reader(stream &s, const std::string &filename, const std::string &filter)
×
226
{
227
    py::gil_scoped_release gil;
×
228
    s.emplace_reader<udp_pcap_file_reader>(filename, filter);
×
229
}
×
230
#endif
231

232
static void add_inproc_reader(stream &s, std::shared_ptr<inproc_queue> queue)
149✔
233
{
234
    py::gil_scoped_release gil;
149✔
235
    s.emplace_reader<inproc_reader>(queue);
149✔
236
}
149✔
237

238
class ring_stream_config_wrapper : public ring_stream_config
239
{
240
private:
241
    bool incomplete_keep_payload_ranges = false;
242

243
public:
244
    ring_stream_config_wrapper() = default;
54✔
245

246
    ring_stream_config_wrapper(const ring_stream_config &base) :
×
247
        ring_stream_config(base)
×
248
    {
249
    }
×
250

251
    ring_stream_config_wrapper &set_incomplete_keep_payload_ranges(bool keep)
3✔
252
    {
253
        incomplete_keep_payload_ranges = keep;
3✔
254
        return *this;
3✔
255
    }
256

257
    bool get_incomplete_keep_payload_ranges() const
522✔
258
    {
259
        return incomplete_keep_payload_ranges;
522✔
260
    }
261
};
262

263
/**
264
 * Stream that handles the magic necessary to reflect heaps into
265
 * Python space and capture the reference to it.
266
 *
267
 * The GIL needs to be handled carefully. Any operation run by the thread pool
268
 * might need to take the GIL to do logging. Thus, any operation that blocks
269
 * on completion of code scheduled through the thread pool must drop the GIL
270
 * first.
271
 */
272
class ring_stream_wrapper : public ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >
273
{
274
private:
275
    bool incomplete_keep_payload_ranges;
276
    exit_stopper stopper{[this] { stop(); }};
1✔
277

278
    py::object to_object(live_heap &&h)
500✔
279
    {
280
        if (h.is_contiguous())
500✔
281
            return py::cast(heap(std::move(h)), py::return_value_policy::move);
996✔
282
        else
283
            return py::cast(incomplete_heap(std::move(h), false, incomplete_keep_payload_ranges),
4✔
284
                            py::return_value_policy::move);
4✔
285
    }
286

287
public:
288
    ring_stream_wrapper(
520✔
289
        io_service_ref io_service,
290
        const stream_config &config = stream_config(),
291
        const ring_stream_config_wrapper &ring_config = ring_stream_config_wrapper())
292
        : ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore>>(
520✔
293
            std::move(io_service), config, ring_config),
520✔
294
        incomplete_keep_payload_ranges(ring_config.get_incomplete_keep_payload_ranges())
520✔
295
    {}
520✔
296

297
    py::object next()
991✔
298
    {
299
        try
300
        {
301
            return get();
1,489✔
302
        }
303
        catch (ringbuffer_stopped &e)
493✔
304
        {
305
            throw py::stop_iteration();
493✔
306
        }
493✔
307
    }
308

309
    py::object get()
991✔
310
    {
311
        return to_object(ring_stream::pop_live(gil_release_tag()));
1,489✔
312
    }
313

314
    py::object get_nowait()
4✔
315
    {
316
        return to_object(try_pop_live());
6✔
317
    }
318

319
    int get_fd() const
150✔
320
    {
321
        return get_ringbuffer().get_data_sem().get_fd();
150✔
322
    }
323

324
    ring_stream_config_wrapper get_ring_config() const
×
325
    {
326
        ring_stream_config_wrapper ring_config(
327
            ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >::get_ring_config());
×
328
        ring_config.set_incomplete_keep_payload_ranges(incomplete_keep_payload_ranges);
×
329
        return ring_config;
×
330
    }
331

332
    virtual void stop() override
522✔
333
    {
334
        stopper.reset();
522✔
335
        py::gil_scoped_release gil;
522✔
336
        ring_stream::stop();
522✔
337
    }
522✔
338

339
    ~ring_stream_wrapper()
1,040✔
340
    {
520✔
341
        stop();
520✔
342
    }
1,040✔
343
};
344

345
/**
346
 * Package a chunk with a reference to the original Python object.
347
 * This is used
348
 * only for chunks in the ringbuffer, not those owned by Python.
349
 */
350
class chunk_wrapper : public chunk
351
{
352
public:
353
    py::object obj;
354
};
355

356
/**
357
 * Get the original Python object from a wrapped chunk, and
358
 * restore its pointers.
359
 */
360
static py::object unwrap_chunk(std::unique_ptr<chunk> &&c)
321✔
361
{
362
    chunk_wrapper &cw = dynamic_cast<chunk_wrapper &>(*c);
321✔
363
    chunk &orig = cw.obj.cast<chunk &>();
321✔
364
    py::object ret = std::move(cw.obj);
321✔
365
    orig = std::move(*c);
321✔
366
    return ret;
321✔
UNCOV
367
}
×
368

369
/**
370
 * Wrap up a Python chunk into an object that can traverse the ringbuffer.
371
 * Python doesn't allow ownership to be given away, so we have to create a
372
 * new C++ object which refers back to the original Python object to keep
373
 * it alive.
374
 */
375
static std::unique_ptr<chunk_wrapper> wrap_chunk(chunk &c)
430✔
376
{
377
    if (!c.data)
430✔
378
        throw std::invalid_argument("data buffer is not set");
1✔
379
    if (!c.present)
429✔
380
        throw std::invalid_argument("present buffer is not set");
1✔
381
    auto cw = std::make_unique<chunk_wrapper>();
428✔
382
    static_cast<chunk &>(*cw) = std::move(c);
428✔
383
    cw->obj = py::cast(c);
428✔
384
    return cw;
428✔
385
}
×
386

387
/**
388
 * Push a chunk onto a ringbuffer. The specific operation is described by
389
 * @a func; this function takes care of wrapping in @ref chunk_wrapper.
390
 */
391
template<typename T>
392
static void push_chunk(T func, chunk &c)
430✔
393
{
394
    /* Note: the type of 'wrapper' must exactly match what the ringbuffer
395
     * expects, otherwise it constructs a new, temporary unique_ptr by
396
     * moving from 'wrapper', and we lose ownership in the failure path.
397
     */
398
    std::unique_ptr<chunk> wrapper = wrap_chunk(c);
430✔
399
    try
400
    {
401
        func(std::move(wrapper));
428✔
402
    }
403
    catch (std::exception &)
10✔
404
    {
405
        // Undo the move that happened as part of wrapping
406
        if (wrapper)
5✔
407
            c = std::move(*wrapper);
5✔
408
        throw;
5✔
409
    }
410
}
428✔
411

412
typedef ringbuffer<std::unique_ptr<chunk>, semaphore_fd, semaphore_fd> chunk_ringbuffer;
413

414
/* Note: ring_stream_wrapper drops the GIL while stopping. We
415
 * can't do that here because stop() can free chunks that were
416
 * in flight, which involves interaction with the Python API.
417
 * I think the only reason ring_stream_wrapper drops the GIL is
418
 * that logging used to directly acquire the GIL, and so if stop()
419
 * did any logging it would deadlock. Now that logging is pushed
420
 * off to a separate thread that should no longer be an issue.
421
 */
422
#define EXIT_STOPPER_WRAPPER(cls, base)                   \
423
    class cls : public base                               \
424
    {                                                     \
425
    private:                                              \
426
        exit_stopper stopper{[this] { stop(); }};         \
427
    public:                                               \
428
        using base::base;                                 \
429
        virtual void stop() override                      \
430
        {                                                 \
431
            stopper.reset();                              \
432
            base::stop();                                 \
433
        }                                                 \
434
    }
435

436
// These aliases are needed because a type passed to a macro cannot contain a comma
437
using chunk_ring_stream_orig = chunk_ring_stream<chunk_ringbuffer, chunk_ringbuffer>;
438
using chunk_stream_ring_group_orig = chunk_stream_ring_group<chunk_ringbuffer, chunk_ringbuffer>;
439

440
EXIT_STOPPER_WRAPPER(chunk_ring_stream_wrapper, chunk_ring_stream_orig);
11✔
441
EXIT_STOPPER_WRAPPER(chunk_stream_ring_group_wrapper, chunk_stream_ring_group_orig);
17✔
442
EXIT_STOPPER_WRAPPER(chunk_stream_group_member_wrapper, chunk_stream_group_member);
4✔
443

444
#undef EXIT_STOPPER_WRAPPER
445

446
/// Register the receiver module with Python
447
py::module register_module(py::module &parent)
5✔
448
{
449
    using namespace pybind11::literals;
450

451
    // Create the module
452
    py::module m = parent.def_submodule("recv");
5✔
453

454
    py::class_<heap_base>(m, "HeapBase")
5✔
455
        .def_property_readonly("cnt", &heap_base::get_cnt)
5✔
456
        .def_property_readonly("flavour", &heap_base::get_flavour)
5✔
457
        .def("get_items", [](py::object &self) -> py::list
5✔
458
        {
459
            const heap_base &h = self.cast<const heap_base &>();
471✔
460
            std::vector<item> base = h.get_items();
471✔
461
            py::list out;
471✔
462
            for (const item &it : base)
2,882✔
463
            {
464
                // Filter out descriptors here. The base class can't do so, because
465
                // the descriptors are retrieved from the items.
466
                if (it.id != DESCRIPTOR_ID)
2,411✔
467
                    out.append(item_wrapper(it, self));
1,210✔
468
            }
469
            return out;
942✔
470
        })
471✔
471
        .def("is_start_of_stream", &heap_base::is_start_of_stream)
5✔
472
        .def("is_end_of_stream", &heap_base::is_end_of_stream);
5✔
473
    py::class_<heap, heap_base>(m, "Heap")
5✔
474
        .def("get_descriptors", &heap::get_descriptors);
5✔
475
    py::class_<incomplete_heap, heap_base>(m, "IncompleteHeap")
5✔
476
        .def_property_readonly("heap_length", &incomplete_heap::get_heap_length)
5✔
477
        .def_property_readonly("received_length", &incomplete_heap::get_received_length)
5✔
478
        .def_property_readonly("payload_ranges", &incomplete_heap::get_payload_ranges);
5✔
479
    py::class_<item_wrapper>(m, "RawItem", py::buffer_protocol())
5✔
480
        .def_readonly("id", &item_wrapper::id)
5✔
481
        .def_readonly("is_immediate", &item_wrapper::is_immediate)
5✔
482
        .def_readonly("immediate_value", &item_wrapper::immediate_value)
5✔
483
        .def_buffer([](item_wrapper &item) { return item.get_value(); });
1,209✔
484

485
    py::class_<stream_stat_config> stream_stat_config_cls(m, "StreamStatConfig");
5✔
486
    /* We have to register the embedded enum type before we can use it as a
487
     * default value for the stream_stat constructor/
488
     */
489
    py::enum_<stream_stat_config::mode>(stream_stat_config_cls, "Mode")
10✔
490
        .value("COUNTER", stream_stat_config::mode::COUNTER)
5✔
491
        .value("MAXIMUM", stream_stat_config::mode::MAXIMUM);
5✔
492
    stream_stat_config_cls
493
        .def(
5✔
494
            py::init<std::string, stream_stat_config::mode>(),
5✔
495
            "name"_a, "mode"_a = stream_stat_config::mode::COUNTER)
5✔
496
        .def_property_readonly("name", &stream_stat_config::get_name)
5✔
497
        .def_property_readonly("mode", &stream_stat_config::get_mode)
5✔
498
        .def("combine", &stream_stat_config::combine)
5✔
499
        .def(py::self == py::self)
5✔
500
        .def(py::self != py::self);
5✔
501
    py::class_<stream_stats> stream_stats_cls(m, "StreamStats");
5✔
502
    stream_stats_cls
503
        .def("__getitem__", [](const stream_stats &self, std::size_t index)
5✔
504
        {
505
            if (index < self.size())
5✔
506
                return self[index];
4✔
507
            else
508
                throw py::index_error();
1✔
509
        })
510
        .def("__getitem__", [](const stream_stats &self, const std::string &name)
5✔
511
        {
512
            auto pos = self.find(name);
57✔
513
            if (pos == self.end())
57✔
514
                throw py::key_error(name);
1✔
515
            return pos->second;
56✔
516
        })
517
        .def("__setitem__", [](stream_stats &self, std::size_t index, std::uint64_t value)
5✔
518
        {
519
            if (index < self.size())
×
520
                self[index] = value;
×
521
            else
522
                throw py::index_error();
×
523
        })
×
524
        .def("__setitem__", [](stream_stats &self, const std::string &name, std::uint64_t value)
5✔
525
        {
526
            auto pos = self.find(name);
28✔
527
            if (pos == self.end())
28✔
528
                throw py::key_error(name);
×
529
            pos->second = value;
28✔
530
        })
28✔
531
        .def("__contains__", [](const stream_stats &self, const std::string &name)
5✔
532
        {
533
            return self.find(name) != self.end();
3✔
534
        })
535
        .def("get", [](const stream_stats &self, const std::string &name, py::object &default_)
5✔
536
        {
537
            auto pos = self.find(name);
×
538
            return pos != self.end() ? py::int_(pos->second) : default_;
×
539
        }, py::arg(), py::arg() = py::none())
10✔
540
        /* TODO: keys, values and items should ideally return view that
541
         * simulate Python's dictionary views (py::bind_map does this, but it
542
         * can't be used because it expects the map to implement erase).
543
         */
544
        .def(
5✔
545
            "items",
546
            [](const stream_stats &self) { return py::make_iterator(self.begin(), self.end()); },
1✔
547
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
548
        )
549
        .def(
5✔
550
            "__iter__",
551
            [](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
2✔
552
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
553
        )
554
        .def(
5✔
555
            "keys",
556
            [](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
1✔
557
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
558
        )
559
        .def(
5✔
560
            "values",
561
            [](const stream_stats &self) { return py::make_value_iterator(self.begin(), self.end()); },
1✔
562
            py::keep_alive<0, 1>()  // keep the stats alive while it is iterated
×
563
        )
564
        .def("__len__", &stream_stats::size)
5✔
565
        .def_property_readonly("config", &stream_stats::get_config)
5✔
566
        .def(py::self + py::self)
5✔
567
        .def(py::self += py::self);
5✔
568

569
    py::module stream_stat_indices_module = m.def_submodule("stream_stat_indices");
5✔
570
    /* The macro registers a property on stream_stats to access the built-in stats
571
     * by name, and at the same time populates the index constant in submodule
572
     * stream_stat_indices (upper-casing it).
573
     */
574
#define STREAM_STATS_PROPERTY(field) \
575
    do { \
576
        stream_stats_cls.def_property( \
577
            #field, \
578
            [](const stream_stats &self) { return self[stream_stat_indices::field]; }, \
579
            [](stream_stats &self, std::uint64_t value) { self[stream_stat_indices::field] = value; }); \
580
        std::string upper = #field; \
581
        std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); \
582
        stream_stat_indices_module.attr(upper.c_str()) = stream_stat_indices::field; \
583
    } while (false)
584

585
    STREAM_STATS_PROPERTY(heaps);
22✔
586
    STREAM_STATS_PROPERTY(incomplete_heaps_evicted);
22✔
587
    STREAM_STATS_PROPERTY(incomplete_heaps_flushed);
22✔
588
    STREAM_STATS_PROPERTY(packets);
22✔
589
    STREAM_STATS_PROPERTY(batches);
20✔
590
    STREAM_STATS_PROPERTY(worker_blocked);
24✔
591
    STREAM_STATS_PROPERTY(max_batch);
20✔
592
    STREAM_STATS_PROPERTY(single_packet_heaps);
20✔
593
    STREAM_STATS_PROPERTY(search_dist);
20✔
594
#undef STREAM_STATS_PROPERTY
595

596
    py::class_<stream_config>(m, "StreamConfig")
5✔
597
        .def(py::init(&data_class_constructor<stream_config>))
5✔
598
        .def_property("max_heaps",
10✔
599
                      &stream_config::get_max_heaps,
×
600
                      &stream_config::set_max_heaps)
5✔
601
        .def_property("substreams",
10✔
602
                      &stream_config::get_substreams,
×
603
                      &stream_config::set_substreams)
5✔
604
        .def_property("bug_compat",
10✔
605
                      &stream_config::get_bug_compat,
×
606
                      &stream_config::set_bug_compat)
5✔
607
        .def_property("memcpy",
5✔
608
             [](const stream_config &self) {
262✔
609
                 stream_config cmp;
262✔
610
                 memcpy_function_id ids[] = {MEMCPY_STD, MEMCPY_NONTEMPORAL};
262✔
611
                 for (memcpy_function_id id : ids)
264✔
612
                 {
613
                     cmp.set_memcpy(id);
264✔
614
                     if (equal_functions(self.get_memcpy(), cmp.get_memcpy()))
264✔
615
                         return int(id);
262✔
616
                 }
617
                 throw std::invalid_argument("memcpy function is not one of the standard ones");
×
618
             },
262✔
619
             [](stream_config &self, int id) { self.set_memcpy(memcpy_function_id(id)); })
260✔
620
        .def_property("memory_allocator",
10✔
621
                      &stream_config::get_memory_allocator,
5✔
622
                      SPEAD2_PTMF_VOID(stream_config, set_memory_allocator))
5✔
623
        .def_property("stop_on_stop_item",
10✔
624
                      &stream_config::get_stop_on_stop_item,
5✔
625
                      SPEAD2_PTMF_VOID(stream_config, set_stop_on_stop_item))
5✔
626
        .def_property("allow_unsized_heaps",
10✔
627
                      &stream_config::get_allow_unsized_heaps,
5✔
628
                      SPEAD2_PTMF_VOID(stream_config, set_allow_unsized_heaps))
5✔
629
        .def_property("allow_out_of_order",
10✔
630
                      &stream_config::get_allow_out_of_order,
5✔
631
                      SPEAD2_PTMF_VOID(stream_config, set_allow_out_of_order))
5✔
632
        .def_property("stream_id",
10✔
633
                      &stream_config::get_stream_id,
×
634
                      &stream_config::set_stream_id)
5✔
635
        .def_property("explicit_start",
10✔
636
                      &stream_config::get_explicit_start,
×
637
                      &stream_config::set_explicit_start)
5✔
638
        .def("add_stat", &stream_config::add_stat,
5✔
639
             "name"_a,
5✔
640
             "mode"_a = stream_stat_config::mode::COUNTER)
10✔
641
        .def_property_readonly("stats", &stream_config::get_stats)
5✔
642
        .def("get_stat_index", &stream_config::get_stat_index,
5✔
643
             "name"_a)
5✔
644
        .def("next_stat_index", &stream_config::next_stat_index)
10✔
645
        .def_readonly_static("DEFAULT_MAX_HEAPS", &stream_config::default_max_heaps);
5✔
646
    py::class_<ring_stream_config_wrapper>(m, "RingStreamConfig")
5✔
647
        .def(py::init(&data_class_constructor<ring_stream_config_wrapper>))
5✔
648
        .def_property("heaps",
10✔
649
                      &ring_stream_config_wrapper::get_heaps,
5✔
650
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_heaps))
5✔
651
        .def_property("contiguous_only",
10✔
652
                      &ring_stream_config_wrapper::get_contiguous_only,
5✔
653
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_contiguous_only))
5✔
654
        .def_property("incomplete_keep_payload_ranges",
10✔
655
                      &ring_stream_config_wrapper::get_incomplete_keep_payload_ranges,
5✔
656
                      SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_incomplete_keep_payload_ranges))
10✔
657
        .def_readonly_static("DEFAULT_HEAPS", &ring_stream_config_wrapper::default_heaps);
5✔
658
#if SPEAD2_USE_IBV
659
    py::class_<udp_ibv_config_wrapper>(m, "UdpIbvConfig")
5✔
660
        .def(py::init(&data_class_constructor<udp_ibv_config_wrapper>))
5✔
661
        .def_readwrite("endpoints", &udp_ibv_config_wrapper::py_endpoints)
5✔
662
        .def_readwrite("interface_address", &udp_ibv_config_wrapper::py_interface_address)
5✔
663
        .def_property("buffer_size",
10✔
664
                      &udp_ibv_config_wrapper::get_buffer_size,
5✔
665
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_buffer_size))
5✔
666
        .def_property("max_size",
10✔
667
                      &udp_ibv_config_wrapper::get_max_size,
5✔
668
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_size))
5✔
669
        .def_property("comp_vector",
10✔
670
                      &udp_ibv_config_wrapper::get_comp_vector,
5✔
671
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_comp_vector))
5✔
672
        .def_property("max_poll",
10✔
673
                      &udp_ibv_config_wrapper::get_max_poll,
5✔
674
                      SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_poll))
10✔
675
        .def_readonly_static("DEFAULT_BUFFER_SIZE", &udp_ibv_config_wrapper::default_buffer_size)
5✔
676
        .def_readonly_static("DEFAULT_MAX_SIZE", &udp_ibv_config_wrapper::default_max_size)
5✔
677
        .def_readonly_static("DEFAULT_MAX_POLL", &udp_ibv_config_wrapper::default_max_poll);
5✔
678
#endif // SPEAD2_USE_IBV
679
    py::class_<stream>(m, "_Stream")
5✔
680
        // SPEAD2_PTMF doesn't work for get_stats because it's defined in stream_base, which is a protected ancestor
681
        .def_property_readonly("stats", [](const stream &self) { return self.get_stats(); })
28✔
682
        .def_property_readonly("config",
5✔
683
                               [](const stream &self) { return self.get_config(); })
×
684
        .def("add_buffer_reader", add_buffer_reader, "buffer"_a)
5✔
685
        .def("add_udp_reader", add_udp_reader,
5✔
686
              "port"_a,
5✔
687
              "max_size"_a = udp_reader::default_max_size,
5✔
688
              "buffer_size"_a = udp_reader::default_buffer_size,
10✔
689
              "bind_hostname"_a = std::string())
10✔
690
        .def("add_udp_reader", add_udp_reader_socket,
5✔
691
              "socket"_a,
5✔
692
              "max_size"_a = udp_reader::default_max_size)
10✔
693
        .def("add_udp_reader", add_udp_reader_bind_v4,
5✔
694
              "multicast_group"_a,
5✔
695
              "port"_a,
5✔
696
              "max_size"_a = udp_reader::default_max_size,
5✔
697
              "buffer_size"_a = udp_reader::default_buffer_size,
10✔
698
              "interface_address"_a = "0.0.0.0")
10✔
699
        .def("add_udp_reader", add_udp_reader_bind_v6,
5✔
700
              "multicast_group"_a,
5✔
701
              "port"_a,
5✔
702
              "max_size"_a = udp_reader::default_max_size,
5✔
703
              "buffer_size"_a = udp_reader::default_buffer_size,
10✔
704
              "interface_index"_a = (unsigned int) 0)
10✔
705
        .def("add_tcp_reader", add_tcp_reader,
5✔
706
             "port"_a,
5✔
707
             "max_size"_a = tcp_reader::default_max_size,
5✔
708
             "buffer_size"_a = tcp_reader::default_buffer_size,
10✔
709
             "bind_hostname"_a = std::string())
10✔
710
        .def("add_tcp_reader", add_tcp_reader_socket,
5✔
711
             "acceptor"_a,
5✔
712
             "max_size"_a = tcp_reader::default_max_size)
10✔
713
#if SPEAD2_USE_IBV
714
        .def("add_udp_ibv_reader", add_udp_ibv_reader,
5✔
715
             "config"_a)
5✔
716
#endif
717
#if SPEAD2_USE_PCAP
718
        .def("add_udp_pcap_file_reader", add_udp_pcap_file_reader,
5✔
719
             "filename"_a, "filter"_a = "")
10✔
720
#endif
721
        .def("add_inproc_reader", add_inproc_reader,
5✔
722
             "queue"_a)
5✔
723
        .def("start", &stream::start)
5✔
724
        .def("stop", &stream::stop)
10✔
725
        .def_readonly_static("DEFAULT_UDP_MAX_SIZE", &udp_reader::default_max_size)
5✔
726
        .def_readonly_static("DEFAULT_UDP_BUFFER_SIZE", &udp_reader::default_buffer_size)
5✔
727
        .def_readonly_static("DEFAULT_TCP_MAX_SIZE", &tcp_reader::default_max_size)
5✔
728
        .def_readonly_static("DEFAULT_TCP_BUFFER_SIZE", &tcp_reader::default_buffer_size);
5✔
729
    py::class_<ring_stream_wrapper, stream> stream_class(m, "Stream");
5✔
730
    stream_class
731
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
5✔
732
                      const stream_config &,
733
                      const ring_stream_config_wrapper &>(),
5✔
734
             "thread_pool"_a.none(false), "config"_a = stream_config(),
10✔
735
             "ring_config"_a = ring_stream_config_wrapper())
10✔
736
        .def("__iter__", [](py::object self) { return self; })
498✔
737
        .def("__next__", &ring_stream_wrapper::next)
5✔
738
        .def("get", &ring_stream_wrapper::get)
5✔
739
        .def("get_nowait", &ring_stream_wrapper::get_nowait)
5✔
740
        .def_property_readonly("fd", &ring_stream_wrapper::get_fd)
5✔
741
        .def_property_readonly("ringbuffer", &ring_stream_wrapper::get_ringbuffer)
5✔
742
        .def_property_readonly("ring_config", &ring_stream_wrapper::get_ring_config);
5✔
743
    using Ringbuffer = ringbuffer<live_heap, semaphore_fd, semaphore>;
744
    py::class_<Ringbuffer>(stream_class, "Ringbuffer")
5✔
745
        .def("size", &Ringbuffer::size)
5✔
746
        .def("capacity", &Ringbuffer::capacity);
5✔
747
    py::class_<chunk_stream_config>(m, "ChunkStreamConfig")
5✔
748
        .def(py::init(&data_class_constructor<chunk_stream_config>))
5✔
749
        .def_property("items",
10✔
750
                      &chunk_stream_config::get_items,
×
751
                      &chunk_stream_config::set_items)
5✔
752
        .def_property("max_chunks",
10✔
753
                      &chunk_stream_config::get_max_chunks,
×
754
                      &chunk_stream_config::set_max_chunks)
5✔
755
        .def_property(
5✔
756
            "place",
757
            [](const chunk_stream_config &config) {
78✔
758
                return callback_to_python(config.get_place());
78✔
759
            },
760
            [](chunk_stream_config &config, py::object obj) {
74✔
761
                config.set_place(callback_from_python<chunk_place_function>(
74✔
762
                    obj,
763
                    "void (void *, size_t)",
764
                    "void (void *, size_t, void *)"
765
                ));
766
            })
70✔
767
        .def(
10✔
768
            "enable_packet_presence", &chunk_stream_config::enable_packet_presence,
5✔
769
            "payload_size"_a)
5✔
770
        .def("disable_packet_presence", &chunk_stream_config::disable_packet_presence)
5✔
771
        .def_property_readonly("packet_presence_payload_size",
10✔
772
                               &chunk_stream_config::get_packet_presence_payload_size)
5✔
773
        .def_property("max_heap_extra",
10✔
774
                      &chunk_stream_config::get_max_heap_extra,
×
775
                      &chunk_stream_config::set_max_heap_extra)
10✔
776
        .def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_config::default_max_chunks);
5✔
777
    py::class_<chunk>(m, "Chunk")
5✔
778
        .def(py::init(&data_class_constructor<chunk>))
5✔
779
        .def_readwrite("chunk_id", &chunk::chunk_id)
5✔
780
        .def_readwrite("stream_id", &chunk::stream_id)
5✔
781
        // Can't use def_readwrite for present, data, extra because they're
782
        // non-copyable types
783
        .def_property(
5✔
784
            "present",
785
            [](const chunk &c) -> const memory_allocator::pointer & { return c.present; },
770✔
786
            [](chunk &c, memory_allocator::pointer &&value)
135✔
787
            {
788
                if (value)
135✔
789
                {
790
                    auto *alloc = get_buffer_allocation(value);
134✔
791
                    assert(alloc != nullptr);
134✔
792
                    c.present_size = alloc->buffer_info.size * alloc->buffer_info.itemsize;
134✔
793
                }
794
                else
795
                    c.present_size = 0;
1✔
796
                c.present = std::move(value);
135✔
797
            })
135✔
798
        .def_property(
5✔
799
            "data",
800
            [](const chunk &c) -> const memory_allocator::pointer & { return c.data; },
763✔
801
            [](chunk &c, memory_allocator::pointer &&value) { c.data = std::move(value); })
135✔
802
        .def_property(
5✔
803
            "extra",
804
            [](const chunk &c) -> const memory_allocator::pointer & { return c.extra; },
147✔
805
            [](chunk &c, memory_allocator::pointer &&value) { c.extra = std::move(value); });
44✔
806
    // Don't allow ChunkRingPair to be constructed from Python. It exists
807
    // purely to be a base class.
808
    using chunk_ring_pair = detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>;
809
    py::class_<chunk_ring_pair>(m, "ChunkRingPair")
5✔
810
        .def(
5✔
811
            "add_free_chunk",
812
            [](chunk_ring_pair &self, chunk &c)
298✔
813
            {
814
                push_chunk(
298✔
815
                    [&self](std::unique_ptr<chunk> &&wrapper)
298✔
816
                    {
817
                        self.add_free_chunk(std::move(wrapper));
298✔
818
                    },
298✔
819
                    c
820
                );
821
            },
298✔
822
            "chunk"_a)
5✔
823
        .def_property_readonly("data_ringbuffer", &chunk_ring_pair::get_data_ringbuffer)
5✔
824
        .def_property_readonly("free_ringbuffer", &chunk_ring_pair::get_free_ringbuffer);
5✔
825

826
    py::class_<chunk_ring_stream_wrapper,
5✔
827
               detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>,
828
               stream>(m, "ChunkRingStream")
829
        .def(py::init<std::shared_ptr<thread_pool_wrapper>,
5✔
830
                      const stream_config &,
831
                      const chunk_stream_config &,
832
                      std::shared_ptr<chunk_ringbuffer>,
833
                      std::shared_ptr<chunk_ringbuffer>>(),
5✔
834
             "thread_pool"_a.none(false),
5✔
835
             "config"_a = stream_config(),
10✔
836
             "chunk_stream_config"_a,
5✔
837
             "data_ringbuffer"_a.none(false),
5✔
838
             "free_ringbuffer"_a.none(false),
5✔
839
            // Keep the Python ringbuffer objects alive, not just the C++ side.
840
            // This allows Python subclasses to be passed then later retrieved
841
            // from properties.
842
             py::keep_alive<1, 5>(),
×
843
             py::keep_alive<1, 6>());
5✔
844
    py::class_<chunk_ringbuffer, std::shared_ptr<chunk_ringbuffer>>(m, "ChunkRingbuffer")
5✔
845
        .def(py::init<std::size_t>(), "maxsize"_a)
5✔
846
        .def("qsize", &chunk_ringbuffer::size)
5✔
847
        .def_property_readonly("maxsize", &chunk_ringbuffer::capacity)
5✔
848
        .def_property_readonly(
5✔
849
            "data_fd",
850
            [](const chunk_ringbuffer &ring) { return ring.get_data_sem().get_fd(); })
7✔
851
        .def_property_readonly(
5✔
852
            "space_fd",
853
            [](const chunk_ringbuffer &ring) { return ring.get_space_sem().get_fd(); })
7✔
854
        .def("get", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.pop(gil_release_tag())); })
489✔
855
        .def("get_nowait", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.try_pop()); })
40✔
856
        .def(
5✔
857
            "put",
858
            [](chunk_ringbuffer &ring, chunk &c)
110✔
859
            {
860
                push_chunk(
110✔
861
                    [&ring](std::unique_ptr<chunk> &&wrapper)
108✔
862
                    {
863
                        ring.push(std::move(wrapper), gil_release_tag());
108✔
864
                    },
108✔
865
                    c
866
                );
867
            },
108✔
868
            "chunk"_a)
5✔
869
        .def(
5✔
870
            "put_nowait",
871
            [](chunk_ringbuffer &ring, chunk &c)
22✔
872
            {
873
                push_chunk(
22✔
874
                    [&ring](std::unique_ptr<chunk> &&wrapper) { ring.try_push(std::move(wrapper)); },
22✔
875
                    c
876
                );
877
            },
17✔
878
            "chunk"_a)
5✔
879
        .def("empty", [](const chunk_ringbuffer &ring) { return ring.size() == 0; })
9✔
880
        .def("full", [](const chunk_ringbuffer &ring) { return ring.size() == ring.capacity(); })
141✔
881
        .def("stop", &chunk_ringbuffer::stop)
5✔
882
        .def("add_producer", &chunk_ringbuffer::add_producer)
5✔
883
        .def("remove_producer", &chunk_ringbuffer::remove_producer)
5✔
884
        .def("__iter__", [](py::object self) { return self; })
17✔
885
        .def(
5✔
886
            "__next__", [](chunk_ringbuffer &ring)
85✔
887
            {
888
                try
889
                {
890
                    return unwrap_chunk(ring.pop(gil_release_tag()));
158✔
891
                }
892
                catch (ringbuffer_stopped &)
12✔
893
                {
894
                    throw py::stop_iteration();
12✔
895
                }
12✔
896
            });
897

898
    py::class_<chunk_stream_group_config> chunk_stream_group_config_cls(m, "ChunkStreamGroupConfig");
5✔
899
    chunk_stream_group_config_cls
900
        .def(py::init(&data_class_constructor<chunk_stream_group_config>))
5✔
901
        .def_property("max_chunks",
10✔
902
                      &chunk_stream_group_config::get_max_chunks,
×
903
                      &chunk_stream_group_config::set_max_chunks)
5✔
904
        .def_property("eviction_mode",
10✔
905
                      &chunk_stream_group_config::get_eviction_mode,
×
906
                      &chunk_stream_group_config::set_eviction_mode)
5✔
907
        .def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_group_config::default_max_chunks);
5✔
908
    py::enum_<chunk_stream_group_config::eviction_mode>(chunk_stream_group_config_cls, "EvictionMode")
10✔
909
        .value("LOSSY", chunk_stream_group_config::eviction_mode::LOSSY)
5✔
910
        .value("LOSSLESS", chunk_stream_group_config::eviction_mode::LOSSLESS);
5✔
911

912
    py::class_<chunk_stream_group_member, stream>(m, "ChunkStreamGroupMember");
5✔
913

914
    py::class_<chunk_stream_ring_group_wrapper,
5✔
915
               detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>>(m, "ChunkStreamRingGroup")
916
        .def(py::init<const chunk_stream_group_config &,
5✔
917
                      std::shared_ptr<chunk_ringbuffer>,
918
                      std::shared_ptr<chunk_ringbuffer>>(),
5✔
919
             "config"_a,
5✔
920
             "data_ringbuffer"_a.none(false),
5✔
921
             "free_ringbuffer"_a.none(false),
5✔
922
            // Keep the Python ringbuffer objects alive, not just the C++ side.
923
            // This allows Python subclasses to be passed then later retrieved
924
            // from properties.
925
            py::keep_alive<1, 3>(),
×
926
            py::keep_alive<1, 4>())
×
927
        .def_property_readonly(
10✔
928
            "config", &chunk_stream_ring_group_wrapper::get_config)
5✔
929
        .def(
5✔
930
            "emplace_back",
931
            [](chunk_stream_ring_group_wrapper &group,
94✔
932
               std::shared_ptr<thread_pool_wrapper> thread_pool,
933
               const stream_config &config,
934
               const chunk_stream_config &chunk_stream_config) -> chunk_stream_group_member & {
935
                return group.emplace_back<chunk_stream_group_member_wrapper>(std::move(thread_pool), config, chunk_stream_config);
94✔
936
            },
937
            "thread_pool"_a, "config"_a, "chunk_stream_config"_a,
5✔
938
            py::return_value_policy::reference_internal
5✔
939
        )
940
        .def("__len__", &chunk_stream_ring_group_wrapper::size)
5✔
941
        .def(
×
942
            "__getitem__",
943
            [](chunk_stream_ring_group_wrapper &group, std::ptrdiff_t index) -> chunk_stream_group_member & {
113✔
944
                if (index < 0)
113✔
945
                    index += group.size();
4✔
946
                if (index >= 0 && std::size_t(index) < group.size())
113✔
947
                    return group[index];
90✔
948
                else
949
                    throw py::index_error();
23✔
950
            },
951
            py::return_value_policy::reference_internal
5✔
952
        )
953
        .def(
5✔
954
            "__getitem__",
955
            [](chunk_stream_ring_group_wrapper &group, const py::slice &slice) {
4✔
956
                py::list out;
4✔
957
                std::size_t start, stop, step, length;
958
                if (!slice.compute(group.size(), &start, &stop, &step, &length))
4✔
959
                    throw py::error_already_set();
×
960
                py::object self = py::cast(group);
4✔
961
                for (std::size_t i = 0; i < length; i++) {
12✔
962
                    out.append(py::cast(group[start], py::return_value_policy::reference_internal, self));
8✔
963
                    start += step;
8✔
964
                }
965
                return out;
8✔
966
            }
4✔
967
        )
968
        .def("stop", &chunk_stream_ring_group_wrapper::stop);
5✔
969

970
    return m;
10✔
971
}
5✔
972

973
} // namespace recv
974
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc