• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 13648116537

04 Mar 2025 07:04AM UTC coverage: 78.758% (-0.1%) from 78.871%
13648116537

push

github

web-flow
Merge pull request #376 from ska-sa/gha-arm-runners

Use native ARM runners for building aarch64 wheels

5569 of 7071 relevant lines covered (78.76%)

91241.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.13
/src/py_common.cpp
1
/* Copyright 2015, 2017, 2020, 2023, 2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <pybind11/operators.h>
24
#include <boost/system/system_error.hpp>
25
#include <memory>
26
#include <list>
27
#include <functional>
28
#include <spead2/py_common.h>
29
#include <spead2/common_ringbuffer.h>
30
#include <spead2/common_defines.h>
31
#include <spead2/common_flavour.h>
32
#include <spead2/common_logging.h>
33
#include <spead2/common_memory_pool.h>
34
#include <spead2/common_thread_pool.h>
35
#include <spead2/common_inproc.h>
36
#if SPEAD2_USE_IBV
37
# include <spead2/common_ibv.h>
38
#endif
39
#include "common_unique.h"
40

41
namespace py = pybind11;
42

43
namespace spead2
44
{
45

46
namespace detail
47
{
48

49
static std::list<std::function<void()>> stop_entries;
50
static std::function<void(log_level, const std::string &)> orig_logger;
51
static std::unique_ptr<log_function_python> our_logger;
52

53
static void run_exit_stoppers()
5✔
54
{
55
    while (!stop_entries.empty())
18✔
56
        stop_entries.front()();
13✔
57
    // Clear up our custom logger
58
    set_log_function(orig_logger);
5✔
59
    our_logger.reset();
5✔
60
}
5✔
61

62
} // namespace detail
63

64
exit_stopper::exit_stopper(std::function<void()> callback)
1,592✔
65
    : entry(detail::stop_entries.insert(detail::stop_entries.begin(), std::move(callback)))
1,592✔
66
{
67
}
1,592✔
68

69
void exit_stopper::reset()
3,095✔
70
{
71
    if (entry != detail::stop_entries.end())
3,095✔
72
    {
73
        detail::stop_entries.erase(entry);
1,592✔
74
        entry = detail::stop_entries.end();
1,592✔
75
    }
76
}
3,095✔
77

78
static void translate_exception_boost_io_error(std::exception_ptr p)
631✔
79
{
80
    try
81
    {
82
        if (p)
631✔
83
            std::rethrow_exception(p);
1,262✔
84
    }
85
    catch (const boost_io_error &e)
631✔
86
    {
87
        py::tuple args = py::make_tuple(e.code().value(), e.what());
7✔
88
        PyErr_SetObject(PyExc_IOError, args.ptr());
7✔
89
    }
7✔
90
}
7✔
91

92
template class socket_wrapper<boost::asio::ip::udp::socket>;
93
template class socket_wrapper<boost::asio::ip::tcp::socket>;
94
template class socket_wrapper<boost::asio::ip::tcp::acceptor>;
95

96
boost::asio::ip::address make_address_no_release(
777✔
97
    boost::asio::io_context &io_context, const std::string &hostname,
98
    boost::asio::ip::resolver_query_base::flags flags)
99
{
100
    if (hostname == "")
777✔
101
        return boost::asio::ip::address();
115✔
102
    using boost::asio::ip::udp;
103
    udp::resolver resolver(io_context);
662✔
104
    return resolver.resolve(hostname, "", flags).begin()->endpoint().address();
662✔
105
}
662✔
106

107
void deprecation_warning(const char *msg)
×
108
{
109
    if (PyErr_WarnEx(PyExc_DeprecationWarning, msg, 1) == -1)
×
110
        throw py::error_already_set();
×
111
}
×
112

113
thread_pool_wrapper::~thread_pool_wrapper()
936✔
114
{
115
    stop();
936✔
116
}
936✔
117

118
void thread_pool_wrapper::stop()
941✔
119
{
120
    stopper.reset();
941✔
121
    py::gil_scoped_release gil;
941✔
122
    thread_pool::stop();
941✔
123
}
941✔
124

125
py::buffer_info request_buffer_info(const py::buffer &buffer, int extra_flags)
11,236✔
126
{
127
    auto view = detail::make_unique_for_overwrite<Py_buffer>();
11,236✔
128
    int flags = PyBUF_STRIDES | PyBUF_FORMAT | extra_flags;
11,236✔
129
    if (PyObject_GetBuffer(buffer.ptr(), view.get(), flags) != 0)
11,236✔
130
        throw py::error_already_set();
×
131
    py::buffer_info info(view.get());
11,236✔
132
    view.release();
11,236✔
133
    return info;
22,472✔
134
}
11,236✔
135

136
const char *const log_function_python::level_methods[log_function_python::num_levels] =
137
{
138
    "warning",
139
    "info",
140
    "debug"
141
};
142

143
log_function_python::log_function_python(
5✔
144
    pybind11::object logger, std::size_t ring_size) :
5✔
145
        overflowed(false),
5✔
146
        ring(ring_size)
5✔
147
{
148
    for (unsigned int i = 0; i < num_levels; i++)
20✔
149
        log_methods[i] = logger.attr(level_methods[i]);
15✔
150
    thread = std::thread([this] () { run(); });
10✔
151
}
5✔
152

153
void log_function_python::run()
277✔
154
{
155
    try
156
    {
157
        while (true)
158
        {
159
            auto msg = ring.pop();
277✔
160
            py::gil_scoped_acquire gil;
273✔
161
            auto &[level, text] = msg;
273✔
162
            log(level, text);
273✔
163
            /* If there are multiple messages queued, consume them while
164
             * the GIL is held, rather than dropping and regaining the
165
             * GIL; but limit it, so that we don't starve other threads
166
             * of the GIL.
167
             */
168
            try
169
            {
170
                for (int pass = 1; pass < 1024; pass++)
20,246✔
171
                {
172
                    msg = ring.try_pop();
20,246✔
173
                    auto &[level, text] = msg;
19,973✔
174
                    log(level, text);
19,973✔
175
                }
176
            }
177
            catch (ringbuffer_empty &)
273✔
178
            {
179
            }
272✔
180
            if (overflowed.exchange(false))
272✔
181
                log(log_level::warning,
×
182
                    "Log ringbuffer was full - some log messages were dropped");
183
        }
274✔
184
    }
185
    catch (ringbuffer_stopped &)
5✔
186
    {
187
        // Could possibly report the overflowed flag here again - but this may be
188
        // deep into interpreter shutdown and it might not be safe to log.
189
    }
5✔
190
    catch (std::exception &e)
×
191
    {
192
        std::cerr << "Logger thread crashed with exception " << e.what() << '\n';
×
193
    }
×
194
}
5✔
195

196
void log_function_python::log(log_level level, const std::string &msg) const
20,246✔
197
{
198
    try
199
    {
200
        unsigned int level_idx = static_cast<unsigned int>(level);
20,246✔
201
        assert(level_idx < num_levels);
20,246✔
202
        log_methods[level_idx]("%s", msg);
20,246✔
203
    }
204
    catch (py::error_already_set &e)
×
205
    {
206
        // This can happen during interpreter shutdown, because the modules
207
        // needed for the logging have already been unloaded.
208
    }
×
209
}
20,246✔
210

211
void log_function_python::operator()(log_level level, const std::string &msg)
20,246✔
212
{
213
    /* A blocking push can potentially lead to deadlock: the consumer may be
214
     * blocked waiting for the GIL, and possibly we may be holding the GIL.
215
     * If there is so much logging that the consumer can't keep up, we
216
     * probably want to throttle the log messages anyway, so we just set a
217
     * flag.
218
     */
219
    try
220
    {
221
        ring.try_emplace(level, msg);
20,246✔
222
    }
223
    catch (ringbuffer_full &)
×
224
    {
225
        overflowed = true;
×
226
    }
×
227
}
20,246✔
228

229
void log_function_python::stop()
10✔
230
{
231
    stopper.reset();
10✔
232
    {
233
        py::gil_scoped_release gil;
10✔
234
        ring.stop();
10✔
235
        if (thread.joinable())
10✔
236
            thread.join();
5✔
237
    }
10✔
238
    for (unsigned int i = 0; i < num_levels; i++)
40✔
239
        log_methods[i] = py::object();
30✔
240
}
10✔
241

242
void register_module(py::module m)
5✔
243
{
244
    using namespace pybind11::literals;
245

246
    py::register_exception<ringbuffer_stopped>(m, "Stopped");
5✔
247
    py::register_exception<ringbuffer_empty>(m, "Empty");
5✔
248
    py::register_exception<ringbuffer_full>(m, "Full");
5✔
249
    py::register_exception_translator(translate_exception_boost_io_error);
5✔
250

251
#define EXPORT_ENUM(x) (m.attr(#x) = long(x))
252
    EXPORT_ENUM(BUG_COMPAT_DESCRIPTOR_WIDTHS);
5✔
253
    EXPORT_ENUM(BUG_COMPAT_SHAPE_BIT_1);
5✔
254
    EXPORT_ENUM(BUG_COMPAT_SWAP_ENDIAN);
5✔
255
    EXPORT_ENUM(BUG_COMPAT_PYSPEAD_0_5_2);
5✔
256

257
    EXPORT_ENUM(NULL_ID);
5✔
258
    EXPORT_ENUM(HEAP_CNT_ID);
5✔
259
    EXPORT_ENUM(HEAP_LENGTH_ID);
5✔
260
    EXPORT_ENUM(PAYLOAD_OFFSET_ID);
5✔
261
    EXPORT_ENUM(PAYLOAD_LENGTH_ID);
5✔
262
    EXPORT_ENUM(DESCRIPTOR_ID);
5✔
263
    EXPORT_ENUM(STREAM_CTRL_ID);
5✔
264

265
    EXPORT_ENUM(DESCRIPTOR_NAME_ID);
5✔
266
    EXPORT_ENUM(DESCRIPTOR_DESCRIPTION_ID);
5✔
267
    EXPORT_ENUM(DESCRIPTOR_SHAPE_ID);
5✔
268
    EXPORT_ENUM(DESCRIPTOR_FORMAT_ID);
5✔
269
    EXPORT_ENUM(DESCRIPTOR_ID_ID);
5✔
270
    EXPORT_ENUM(DESCRIPTOR_DTYPE_ID);
5✔
271

272
    EXPORT_ENUM(CTRL_STREAM_START);
5✔
273
    EXPORT_ENUM(CTRL_DESCRIPTOR_REISSUE);
5✔
274
    EXPORT_ENUM(CTRL_STREAM_STOP);
5✔
275
    EXPORT_ENUM(CTRL_DESCRIPTOR_UPDATE);
5✔
276

277
    EXPORT_ENUM(MEMCPY_STD);
5✔
278
    EXPORT_ENUM(MEMCPY_NONTEMPORAL);
5✔
279
#undef EXPORT_ENUM
280

281
    m.def("log_info", [](const std::string &msg) { log_info("%s", msg); },
20,005✔
282
          "Log a message at INFO level (for testing only)");
283

284
    py::class_<flavour>(m, "Flavour")
5✔
285
        .def(py::init<int, int, int, bug_compat_mask>(),
5✔
286
             "version"_a, "item_pointer_bits"_a,
5✔
287
             "heap_address_bits"_a, "bug_compat"_a=0)
10✔
288
        .def(py::init<>())
5✔
289
        .def(py::self == py::self)
5✔
290
        .def(py::self != py::self)
5✔
291
        .def_property_readonly("version", &flavour::get_version)
5✔
292
        .def_property_readonly("item_pointer_bits", &flavour::get_item_pointer_bits)
5✔
293
        .def_property_readonly("heap_address_bits", &flavour::get_heap_address_bits)
5✔
294
        .def_property_readonly("bug_compat", &flavour::get_bug_compat);
5✔
295

296
    py::class_<memory_allocator, std::shared_ptr<memory_allocator>>(m, "MemoryAllocator")
5✔
297
        .def(py::init<>());
5✔
298

299
    py::class_<mmap_allocator, memory_allocator, std::shared_ptr<mmap_allocator>>(
5✔
300
        m, "MmapAllocator")
301
        .def(py::init<int, bool>(), "flags"_a=0, "prefer_huge"_a=false);
5✔
302

303
    py::class_<memory_pool, memory_allocator, std::shared_ptr<memory_pool>>(
5✔
304
        m, "MemoryPool")
305
        .def(py::init<std::size_t, std::size_t, std::size_t, std::size_t, std::shared_ptr<memory_allocator>>(),
5✔
306
             "lower"_a, "upper"_a, "max_free"_a, "initial"_a, py::arg_v("allocator", nullptr, "None"))
10✔
307
        .def(py::init<std::shared_ptr<thread_pool>, std::size_t, std::size_t, std::size_t, std::size_t, std::size_t, std::shared_ptr<memory_allocator>>(),
5✔
308
             "thread_pool"_a, "lower"_a, "upper"_a, "max_free"_a, "initial"_a, "low_water"_a, py::arg_v("allocator", nullptr, "None"))
10✔
309
        .def_property("warn_on_empty",
5✔
310
                      &memory_pool::get_warn_on_empty, &memory_pool::set_warn_on_empty);
5✔
311

312
    py::class_<thread_pool_wrapper, std::shared_ptr<thread_pool_wrapper>>(m, "ThreadPool")
5✔
313
        .def(py::init<int>(), "threads"_a = 1)
10✔
314
        .def(py::init<int, const std::vector<int> &>(), "threads"_a, "affinity"_a)
5✔
315
        .def_static("set_affinity", &thread_pool_wrapper::set_affinity)
5✔
316
        .def("stop", &thread_pool_wrapper::stop);
5✔
317

318
    py::class_<inproc_queue, std::shared_ptr<inproc_queue>>(m, "InprocQueue")
5✔
319
        .def(py::init<>())
5✔
320
        .def("add_packet", [](inproc_queue &self, py::buffer obj)
5✔
321
        {
322
            py::buffer_info info = request_buffer_info(obj, PyBUF_C_CONTIGUOUS);
25✔
323
            inproc_queue::packet pkt;
25✔
324
            pkt.size = info.size * info.itemsize;
25✔
325
            pkt.data = detail::make_unique_for_overwrite<std::uint8_t[]>(pkt.size);
25✔
326
            std::memcpy(pkt.data.get(), info.ptr, pkt.size);
25✔
327
            self.add_packet(std::move(pkt));
25✔
328
        }, "packet")
25✔
329
        .def("stop", &inproc_queue::stop);
5✔
330

331
    py::class_<descriptor>(m, "RawDescriptor")
5✔
332
        .def(py::init<>())
5✔
333
        .def_readwrite("id", &descriptor::id)
5✔
334
        .def_property("name", bytes_getter(&descriptor::name), bytes_setter(&descriptor::name))
10✔
335
        .def_property("description", bytes_getter(&descriptor::description), bytes_setter(&descriptor::description))
10✔
336
        .def_property("shape", [](const descriptor &d) -> py::list
5✔
337
        {
338
            py::list out;
910✔
339
            for (const auto &size : d.shape)
968✔
340
            {
341
                if (size >= 0)
58✔
342
                    out.append(size);
38✔
343
                else
344
                    out.append(py::none());
20✔
345
            }
346
            return out;
910✔
347
        }, [](descriptor &d, py::sequence shape)
×
348
        {
349
            std::vector<std::int64_t> out;
1,234✔
350
            out.reserve(len(shape));
1,234✔
351
            for (std::size_t i = 0; i < len(shape); i++)
1,680✔
352
            {
353
                py::object value = shape[i];
446✔
354
                if (value.is_none())
446✔
355
                    out.push_back(-1);
15✔
356
                else
357
                {
358
                    std::int64_t v = value.cast<std::int64_t>();
431✔
359
                    // TODO: verify range (particularly, >= 0)
360
                    out.push_back(v);
431✔
361
                }
362
            }
446✔
363
            d.shape = std::move(out);
1,234✔
364
        })
1,234✔
365
        .def_readwrite("format", &descriptor::format)
5✔
366
        .def_property("numpy_header", bytes_getter(&descriptor::numpy_header), bytes_setter(&descriptor::numpy_header))
5✔
367
    ;
368
#if SPEAD2_USE_IBV
369
    py::class_<ibv_context_t>(m, "IbvContext")
5✔
370
        .def(py::init([](const std::string &interface_address)
5✔
371
            {
372
                py::gil_scoped_release release;
×
373
                boost::asio::io_context io_context;
×
374
                return ibv_context_t(make_address_no_release(
×
375
                    io_context, interface_address, boost::asio::ip::udp::resolver::passive));
×
376
            }), "interface"_a)
5✔
377
        .def("reset", [](ibv_context_t &self) { self.reset(); })
5✔
378
    ;
379
#endif
380
}
5✔
381

382
void register_logging()
5✔
383
{
384
    py::object logging_module = py::module::import("logging");
5✔
385
    py::object logger = logging_module.attr("getLogger")("spead2");
5✔
386
    detail::our_logger.reset(new log_function_python(logger));
5✔
387
    detail::orig_logger = set_log_function(std::ref(*detail::our_logger));
5✔
388
}
5✔
389

390
void register_atexit()
5✔
391
{
392
    py::module atexit_mod = py::module::import("atexit");
5✔
393
    atexit_mod.attr("register")(py::cpp_function(detail::run_exit_stoppers));
5✔
394
}
5✔
395

396
buffer_allocation::buffer_allocation(py::buffer buf)
312✔
397
    : obj(std::move(buf)),
312✔
398
    buffer_info(request_buffer_info(obj, PyBUF_C_CONTIGUOUS | PyBUF_WRITEABLE))
312✔
399
{
400
}
312✔
401

402
namespace
403
{
404

405
/* Function object that acts as a deleter for a wrapped buffer_allocation. It's
406
 * a class rather than a lambda to provide get_allocation.
407
 *
408
 * It needs to hold a shared_ptr rather than a unique_ptr because std::function
409
 * requires the function to be copyable. In practice it is unlikely to be
410
 * copied.
411
 */
412
class buffer_allocation_deleter
413
{
414
private:
415
    std::shared_ptr<buffer_allocation> alloc;
416

417
public:
418
    explicit buffer_allocation_deleter(std::shared_ptr<buffer_allocation> alloc)
312✔
419
        : alloc(std::move(alloc)) {}
312✔
420

421
    void operator()([[maybe_unused]] std::uint8_t *ptr) const
312✔
422
    {
423
        alloc->buffer_info = py::buffer_info();
312✔
424
        alloc->obj = py::object();
312✔
425
    }
312✔
426

427
    buffer_allocation &get_allocation() const
1,502✔
428
    {
429
        return *alloc;
1,502✔
430
    }
431
};
432

433
} // anonymous namespace
434

435
buffer_allocation *get_buffer_allocation(const memory_allocator::pointer &ptr)
134✔
436
{
437
    const auto *deleter = ptr.get_deleter().target<buffer_allocation_deleter>();
134✔
438
    if (deleter)
134✔
439
        return &deleter->get_allocation();
134✔
440
    else
441
        return nullptr;
×
442
}
443

444
} // namespace spead2
445

446
namespace PYBIND11_NAMESPACE
447
{
448
namespace detail
449
{
450

451
bool type_caster<spead2::memory_allocator::pointer>::load(handle src, [[maybe_unused]] bool convert)
314✔
452
{
453
    if (src.is_none())
314✔
454
    {
455
        value.reset();
2✔
456
        return true;
2✔
457
    }
458
    if (!PyObject_CheckBuffer(src.ptr()))
312✔
459
        return false;
×
460
    // Create a pointer wrapping the buffer_allocation
461
    auto alloc = std::make_shared<spead2::buffer_allocation>(reinterpret_borrow<buffer>(src));
312✔
462
    // copy the pointer before moving from alloc
463
    std::uint8_t *ptr = static_cast<std::uint8_t *>(alloc->buffer_info.ptr);
312✔
464
    value = spead2::memory_allocator::pointer(
624✔
465
        ptr, spead2::buffer_allocation_deleter(std::move(alloc)));
936✔
466
    return true;
312✔
467
}
312✔
468

469
handle type_caster<spead2::memory_allocator::pointer>::cast(
1,680✔
470
    const spead2::memory_allocator::pointer &ptr, return_value_policy, handle)
471
{
472
    if (!ptr)
1,680✔
473
        return none().inc_ref();
312✔
474
    auto deleter = ptr.get_deleter().target<spead2::buffer_allocation_deleter>();
1,368✔
475
    if (!deleter)
1,368✔
476
        throw type_error("pointer did not come from a Python buffer object");
×
477
    return deleter->get_allocation().obj.inc_ref();
1,368✔
478
}
479

480
} // namespace detail
481
} // namespace PYBIND11_NAMESPACE
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc