• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 11665892441

04 Nov 2024 02:08PM UTC coverage: 78.858% (-0.07%) from 78.929%
11665892441

Pull #358

github

bmerry
Bump versions of wheel dependencies
Pull Request #358: Bump versions of wheel dependencies

5580 of 7076 relevant lines covered (78.86%)

91205.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.16
/src/py_common.cpp
1
/* Copyright 2015, 2017, 2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <pybind11/operators.h>
24
#include <boost/system/system_error.hpp>
25
#include <memory>
26
#include <list>
27
#include <functional>
28
#include <spead2/py_common.h>
29
#include <spead2/common_ringbuffer.h>
30
#include <spead2/common_defines.h>
31
#include <spead2/common_flavour.h>
32
#include <spead2/common_logging.h>
33
#include <spead2/common_memory_pool.h>
34
#include <spead2/common_thread_pool.h>
35
#include <spead2/common_inproc.h>
36
#if SPEAD2_USE_IBV
37
# include <spead2/common_ibv.h>
38
#endif
39
#include "common_unique.h"
40

41
namespace py = pybind11;
42

43
namespace spead2
44
{
45

46
namespace detail
47
{
48

49
static std::list<std::function<void()>> stop_entries;
50
static std::function<void(log_level, const std::string &)> orig_logger;
51
static std::unique_ptr<log_function_python> our_logger;
52

53
static void run_exit_stoppers()
5✔
54
{
55
    while (!stop_entries.empty())
18✔
56
        stop_entries.front()();
13✔
57
    // Clear up our custom logger
58
    set_log_function(orig_logger);
5✔
59
    our_logger.reset();
5✔
60
}
5✔
61

62
} // namespace detail
63

64
exit_stopper::exit_stopper(std::function<void()> callback)
1,592✔
65
    : entry(detail::stop_entries.insert(detail::stop_entries.begin(), std::move(callback)))
1,592✔
66
{
67
}
1,592✔
68

69
void exit_stopper::reset()
3,095✔
70
{
71
    if (entry != detail::stop_entries.end())
3,095✔
72
    {
73
        detail::stop_entries.erase(entry);
1,592✔
74
        entry = detail::stop_entries.end();
1,592✔
75
    }
76
}
3,095✔
77

78
static void translate_exception_boost_io_error(std::exception_ptr p)
631✔
79
{
80
    try
81
    {
82
        if (p)
631✔
83
            std::rethrow_exception(p);
1,262✔
84
    }
85
    catch (const boost_io_error &e)
631✔
86
    {
87
        py::tuple args = py::make_tuple(e.code().value(), e.what());
7✔
88
        PyErr_SetObject(PyExc_IOError, args.ptr());
7✔
89
    }
7✔
90
}
7✔
91

92
template class socket_wrapper<boost::asio::ip::udp::socket>;
93
template class socket_wrapper<boost::asio::ip::tcp::socket>;
94
template class socket_wrapper<boost::asio::ip::tcp::acceptor>;
95

96
boost::asio::ip::address make_address_no_release(
777✔
97
    boost::asio::io_service &io_service, const std::string &hostname,
98
    boost::asio::ip::resolver_query_base::flags flags)
99
{
100
    if (hostname == "")
777✔
101
        return boost::asio::ip::address();
115✔
102
    using boost::asio::ip::udp;
103
    udp::resolver resolver(io_service);
662✔
104
    udp::resolver::query query(hostname, "", flags);
1,324✔
105
    return resolver.resolve(query)->endpoint().address();
662✔
106
}
664✔
107

108
void deprecation_warning(const char *msg)
×
109
{
110
    if (PyErr_WarnEx(PyExc_DeprecationWarning, msg, 1) == -1)
×
111
        throw py::error_already_set();
×
112
}
×
113

114
thread_pool_wrapper::~thread_pool_wrapper()
936✔
115
{
116
    stop();
936✔
117
}
936✔
118

119
void thread_pool_wrapper::stop()
941✔
120
{
121
    stopper.reset();
941✔
122
    py::gil_scoped_release gil;
941✔
123
    thread_pool::stop();
941✔
124
}
941✔
125

126
py::buffer_info request_buffer_info(const py::buffer &buffer, int extra_flags)
11,236✔
127
{
128
    auto view = detail::make_unique_for_overwrite<Py_buffer>();
11,236✔
129
    int flags = PyBUF_STRIDES | PyBUF_FORMAT | extra_flags;
11,236✔
130
    if (PyObject_GetBuffer(buffer.ptr(), view.get(), flags) != 0)
11,236✔
131
        throw py::error_already_set();
×
132
    py::buffer_info info(view.get());
11,236✔
133
    view.release();
11,236✔
134
    return info;
22,472✔
135
}
11,236✔
136

137
const char *const log_function_python::level_methods[log_function_python::num_levels] =
138
{
139
    "warning",
140
    "info",
141
    "debug"
142
};
143

144
log_function_python::log_function_python(
5✔
145
    pybind11::object logger, std::size_t ring_size) :
5✔
146
        overflowed(false),
5✔
147
        ring(ring_size)
5✔
148
{
149
    for (unsigned int i = 0; i < num_levels; i++)
20✔
150
        log_methods[i] = logger.attr(level_methods[i]);
15✔
151
    thread = std::thread([this] () { run(); });
10✔
152
}
5✔
153

154
void log_function_python::run()
275✔
155
{
156
    try
157
    {
158
        while (true)
159
        {
160
            auto msg = ring.pop();
275✔
161
            py::gil_scoped_acquire gil;
271✔
162
            auto &[level, text] = msg;
271✔
163
            log(level, text);
271✔
164
            /* If there are multiple messages queued, consume them while
165
             * the GIL is held, rather than dropping and regaining the
166
             * GIL; but limit it, so that we don't starve other threads
167
             * of the GIL.
168
             */
169
            try
170
            {
171
                for (int pass = 1; pass < 1024; pass++)
20,246✔
172
                {
173
                    msg = ring.try_pop();
20,246✔
174
                    auto &[level, text] = msg;
19,975✔
175
                    log(level, text);
19,975✔
176
                }
177
            }
178
            catch (ringbuffer_empty &)
271✔
179
            {
180
            }
270✔
181
            if (overflowed.exchange(false))
270✔
182
                log(log_level::warning,
×
183
                    "Log ringbuffer was full - some log messages were dropped");
184
        }
272✔
185
    }
186
    catch (ringbuffer_stopped &)
5✔
187
    {
188
        // Could possibly report the overflowed flag here again - but this may be
189
        // deep into interpreter shutdown and it might not be safe to log.
190
    }
5✔
191
    catch (std::exception &e)
×
192
    {
193
        std::cerr << "Logger thread crashed with exception " << e.what() << '\n';
×
194
    }
×
195
}
5✔
196

197
void log_function_python::log(log_level level, const std::string &msg) const
20,246✔
198
{
199
    try
200
    {
201
        unsigned int level_idx = static_cast<unsigned int>(level);
20,246✔
202
        assert(level_idx < num_levels);
20,246✔
203
        log_methods[level_idx]("%s", msg);
20,246✔
204
    }
205
    catch (py::error_already_set &e)
×
206
    {
207
        // This can happen during interpreter shutdown, because the modules
208
        // needed for the logging have already been unloaded.
209
    }
×
210
}
20,246✔
211

212
void log_function_python::operator()(log_level level, const std::string &msg)
20,246✔
213
{
214
    /* A blocking push can potentially lead to deadlock: the consumer may be
215
     * blocked waiting for the GIL, and possibly we may be holding the GIL.
216
     * If there is so much logging that the consumer can't keep up, we
217
     * probably want to throttle the log messages anyway, so we just set a
218
     * flag.
219
     */
220
    try
221
    {
222
        ring.try_emplace(level, msg);
20,246✔
223
    }
224
    catch (ringbuffer_full &)
×
225
    {
226
        overflowed = true;
×
227
    }
×
228
}
20,246✔
229

230
void log_function_python::stop()
10✔
231
{
232
    stopper.reset();
10✔
233
    {
234
        py::gil_scoped_release gil;
10✔
235
        ring.stop();
10✔
236
        if (thread.joinable())
10✔
237
            thread.join();
5✔
238
    }
10✔
239
    for (unsigned int i = 0; i < num_levels; i++)
40✔
240
        log_methods[i] = py::object();
30✔
241
}
10✔
242

243
void register_module(py::module m)
5✔
244
{
245
    using namespace pybind11::literals;
246

247
    py::register_exception<ringbuffer_stopped>(m, "Stopped");
5✔
248
    py::register_exception<ringbuffer_empty>(m, "Empty");
5✔
249
    py::register_exception<ringbuffer_full>(m, "Full");
5✔
250
    py::register_exception_translator(translate_exception_boost_io_error);
5✔
251

252
#define EXPORT_ENUM(x) (m.attr(#x) = long(x))
253
    EXPORT_ENUM(BUG_COMPAT_DESCRIPTOR_WIDTHS);
5✔
254
    EXPORT_ENUM(BUG_COMPAT_SHAPE_BIT_1);
5✔
255
    EXPORT_ENUM(BUG_COMPAT_SWAP_ENDIAN);
5✔
256
    EXPORT_ENUM(BUG_COMPAT_PYSPEAD_0_5_2);
5✔
257

258
    EXPORT_ENUM(NULL_ID);
5✔
259
    EXPORT_ENUM(HEAP_CNT_ID);
5✔
260
    EXPORT_ENUM(HEAP_LENGTH_ID);
5✔
261
    EXPORT_ENUM(PAYLOAD_OFFSET_ID);
5✔
262
    EXPORT_ENUM(PAYLOAD_LENGTH_ID);
5✔
263
    EXPORT_ENUM(DESCRIPTOR_ID);
5✔
264
    EXPORT_ENUM(STREAM_CTRL_ID);
5✔
265

266
    EXPORT_ENUM(DESCRIPTOR_NAME_ID);
5✔
267
    EXPORT_ENUM(DESCRIPTOR_DESCRIPTION_ID);
5✔
268
    EXPORT_ENUM(DESCRIPTOR_SHAPE_ID);
5✔
269
    EXPORT_ENUM(DESCRIPTOR_FORMAT_ID);
5✔
270
    EXPORT_ENUM(DESCRIPTOR_ID_ID);
5✔
271
    EXPORT_ENUM(DESCRIPTOR_DTYPE_ID);
5✔
272

273
    EXPORT_ENUM(CTRL_STREAM_START);
5✔
274
    EXPORT_ENUM(CTRL_DESCRIPTOR_REISSUE);
5✔
275
    EXPORT_ENUM(CTRL_STREAM_STOP);
5✔
276
    EXPORT_ENUM(CTRL_DESCRIPTOR_UPDATE);
5✔
277

278
    EXPORT_ENUM(MEMCPY_STD);
5✔
279
    EXPORT_ENUM(MEMCPY_NONTEMPORAL);
5✔
280
#undef EXPORT_ENUM
281

282
    m.def("log_info", [](const std::string &msg) { log_info("%s", msg); },
20,005✔
283
          "Log a message at INFO level (for testing only)");
284

285
    py::class_<flavour>(m, "Flavour")
5✔
286
        .def(py::init<int, int, int, bug_compat_mask>(),
5✔
287
             "version"_a, "item_pointer_bits"_a,
5✔
288
             "heap_address_bits"_a, "bug_compat"_a=0)
10✔
289
        .def(py::init<>())
5✔
290
        .def(py::self == py::self)
5✔
291
        .def(py::self != py::self)
5✔
292
        .def_property_readonly("version", &flavour::get_version)
5✔
293
        .def_property_readonly("item_pointer_bits", &flavour::get_item_pointer_bits)
5✔
294
        .def_property_readonly("heap_address_bits", &flavour::get_heap_address_bits)
5✔
295
        .def_property_readonly("bug_compat", &flavour::get_bug_compat);
5✔
296

297
    py::class_<memory_allocator, std::shared_ptr<memory_allocator>>(m, "MemoryAllocator")
5✔
298
        .def(py::init<>());
5✔
299

300
    py::class_<mmap_allocator, memory_allocator, std::shared_ptr<mmap_allocator>>(
5✔
301
        m, "MmapAllocator")
302
        .def(py::init<int, bool>(), "flags"_a=0, "prefer_huge"_a=false);
5✔
303

304
    py::class_<memory_pool, memory_allocator, std::shared_ptr<memory_pool>>(
5✔
305
        m, "MemoryPool")
306
        .def(py::init<std::size_t, std::size_t, std::size_t, std::size_t, std::shared_ptr<memory_allocator>>(),
5✔
307
             "lower"_a, "upper"_a, "max_free"_a, "initial"_a, py::arg_v("allocator", nullptr, "None"))
10✔
308
        .def(py::init<std::shared_ptr<thread_pool>, std::size_t, std::size_t, std::size_t, std::size_t, std::size_t, std::shared_ptr<memory_allocator>>(),
5✔
309
             "thread_pool"_a, "lower"_a, "upper"_a, "max_free"_a, "initial"_a, "low_water"_a, py::arg_v("allocator", nullptr, "None"))
10✔
310
        .def_property("warn_on_empty",
5✔
311
                      &memory_pool::get_warn_on_empty, &memory_pool::set_warn_on_empty);
5✔
312

313
    py::class_<thread_pool_wrapper, std::shared_ptr<thread_pool_wrapper>>(m, "ThreadPool")
5✔
314
        .def(py::init<int>(), "threads"_a = 1)
10✔
315
        .def(py::init<int, const std::vector<int> &>(), "threads"_a, "affinity"_a)
5✔
316
        .def_static("set_affinity", &thread_pool_wrapper::set_affinity)
5✔
317
        .def("stop", &thread_pool_wrapper::stop);
5✔
318

319
    py::class_<inproc_queue, std::shared_ptr<inproc_queue>>(m, "InprocQueue")
5✔
320
        .def(py::init<>())
5✔
321
        .def("add_packet", [](inproc_queue &self, py::buffer obj)
5✔
322
        {
323
            py::buffer_info info = request_buffer_info(obj, PyBUF_C_CONTIGUOUS);
25✔
324
            inproc_queue::packet pkt;
25✔
325
            pkt.size = info.size * info.itemsize;
25✔
326
            pkt.data = detail::make_unique_for_overwrite<std::uint8_t[]>(pkt.size);
25✔
327
            std::memcpy(pkt.data.get(), info.ptr, pkt.size);
25✔
328
            self.add_packet(std::move(pkt));
25✔
329
        }, "packet")
25✔
330
        .def("stop", &inproc_queue::stop);
5✔
331

332
    py::class_<descriptor>(m, "RawDescriptor")
5✔
333
        .def(py::init<>())
5✔
334
        .def_readwrite("id", &descriptor::id)
5✔
335
        .def_property("name", bytes_getter(&descriptor::name), bytes_setter(&descriptor::name))
10✔
336
        .def_property("description", bytes_getter(&descriptor::description), bytes_setter(&descriptor::description))
10✔
337
        .def_property("shape", [](const descriptor &d) -> py::list
5✔
338
        {
339
            py::list out;
910✔
340
            for (const auto &size : d.shape)
968✔
341
            {
342
                if (size >= 0)
58✔
343
                    out.append(size);
38✔
344
                else
345
                    out.append(py::none());
20✔
346
            }
347
            return out;
910✔
348
        }, [](descriptor &d, py::sequence shape)
×
349
        {
350
            std::vector<std::int64_t> out;
1,234✔
351
            out.reserve(len(shape));
1,234✔
352
            for (std::size_t i = 0; i < len(shape); i++)
1,680✔
353
            {
354
                py::object value = shape[i];
446✔
355
                if (value.is_none())
446✔
356
                    out.push_back(-1);
15✔
357
                else
358
                {
359
                    std::int64_t v = value.cast<std::int64_t>();
431✔
360
                    // TODO: verify range (particularly, >= 0)
361
                    out.push_back(v);
431✔
362
                }
363
            }
446✔
364
            d.shape = std::move(out);
1,234✔
365
        })
1,234✔
366
        .def_readwrite("format", &descriptor::format)
5✔
367
        .def_property("numpy_header", bytes_getter(&descriptor::numpy_header), bytes_setter(&descriptor::numpy_header))
5✔
368
    ;
369
#if SPEAD2_USE_IBV
370
    py::class_<ibv_context_t>(m, "IbvContext")
5✔
371
        .def(py::init([](const std::string &interface_address)
5✔
372
            {
373
                py::gil_scoped_release release;
×
374
                boost::asio::io_service io_service;
×
375
                return ibv_context_t(make_address_no_release(
×
376
                    io_service, interface_address, boost::asio::ip::udp::resolver::query::passive));
×
377
            }), "interface"_a)
5✔
378
        .def("reset", [](ibv_context_t &self) { self.reset(); })
5✔
379
    ;
380
#endif
381
}
5✔
382

383
void register_logging()
5✔
384
{
385
    py::object logging_module = py::module::import("logging");
5✔
386
    py::object logger = logging_module.attr("getLogger")("spead2");
5✔
387
    detail::our_logger.reset(new log_function_python(logger));
5✔
388
    detail::orig_logger = set_log_function(std::ref(*detail::our_logger));
5✔
389
}
5✔
390

391
void register_atexit()
5✔
392
{
393
    py::module atexit_mod = py::module::import("atexit");
5✔
394
    atexit_mod.attr("register")(py::cpp_function(detail::run_exit_stoppers));
5✔
395
}
5✔
396

397
buffer_allocation::buffer_allocation(py::buffer buf)
312✔
398
    : obj(std::move(buf)),
312✔
399
    buffer_info(request_buffer_info(obj, PyBUF_C_CONTIGUOUS | PyBUF_WRITEABLE))
312✔
400
{
401
}
312✔
402

403
namespace
404
{
405

406
/* Function object that acts as a deleter for a wrapped buffer_allocation. It's
407
 * a class rather than a lambda to provide get_allocation.
408
 *
409
 * It needs to hold a shared_ptr rather than a unique_ptr because std::function
410
 * requires the function to be copyable. In practice it is unlikely to be
411
 * copied.
412
 */
413
class buffer_allocation_deleter
414
{
415
private:
416
    std::shared_ptr<buffer_allocation> alloc;
417

418
public:
419
    explicit buffer_allocation_deleter(std::shared_ptr<buffer_allocation> alloc)
312✔
420
        : alloc(std::move(alloc)) {}
312✔
421

422
    void operator()([[maybe_unused]] std::uint8_t *ptr) const
312✔
423
    {
424
        alloc->buffer_info = py::buffer_info();
312✔
425
        alloc->obj = py::object();
312✔
426
    }
312✔
427

428
    buffer_allocation &get_allocation() const
1,502✔
429
    {
430
        return *alloc;
1,502✔
431
    }
432
};
433

434
} // anonymous namespace
435

436
buffer_allocation *get_buffer_allocation(const memory_allocator::pointer &ptr)
134✔
437
{
438
    const auto *deleter = ptr.get_deleter().target<buffer_allocation_deleter>();
134✔
439
    if (deleter)
134✔
440
        return &deleter->get_allocation();
134✔
441
    else
442
        return nullptr;
×
443
}
444

445
} // namespace spead2
446

447
namespace PYBIND11_NAMESPACE
448
{
449
namespace detail
450
{
451

452
bool type_caster<spead2::memory_allocator::pointer>::load(handle src, [[maybe_unused]] bool convert)
314✔
453
{
454
    if (src.is_none())
314✔
455
    {
456
        value.reset();
2✔
457
        return true;
2✔
458
    }
459
    if (!PyObject_CheckBuffer(src.ptr()))
312✔
460
        return false;
×
461
    // Create a pointer wrapping the buffer_allocation
462
    auto alloc = std::make_shared<spead2::buffer_allocation>(reinterpret_borrow<buffer>(src));
312✔
463
    // copy the pointer before moving from alloc
464
    std::uint8_t *ptr = static_cast<std::uint8_t *>(alloc->buffer_info.ptr);
312✔
465
    value = spead2::memory_allocator::pointer(
624✔
466
        ptr, spead2::buffer_allocation_deleter(std::move(alloc)));
936✔
467
    return true;
312✔
468
}
312✔
469

470
handle type_caster<spead2::memory_allocator::pointer>::cast(
1,680✔
471
    const spead2::memory_allocator::pointer &ptr, return_value_policy, handle)
472
{
473
    if (!ptr)
1,680✔
474
        return none().inc_ref();
312✔
475
    auto deleter = ptr.get_deleter().target<spead2::buffer_allocation_deleter>();
1,368✔
476
    if (!deleter)
1,368✔
477
        throw type_error("pointer did not come from a Python buffer object");
×
478
    return deleter->get_allocation().obj.inc_ref();
1,368✔
479
}
480

481
} // namespace detail
482
} // namespace PYBIND11_NAMESPACE
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc