• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5948146497

23 Aug 2023 07:07AM UTC coverage: 74.931% (-0.07%) from 75.0%
5948146497

push

github

web-flow
Merge pull request #242 from ska-sa/mypy-pyproject

Move mypy configuration into pyproject.toml

5419 of 7232 relevant lines covered (74.93%)

52629.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.28
/src/py_common.cpp
1
/* Copyright 2015, 2017, 2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
23
#include <pybind11/operators.h>
24
#include <boost/system/system_error.hpp>
25
#include <memory>
26
#include <list>
27
#include <functional>
28
#include <spead2/py_common.h>
29
#include <spead2/common_ringbuffer.h>
30
#include <spead2/common_defines.h>
31
#include <spead2/common_flavour.h>
32
#include <spead2/common_logging.h>
33
#include <spead2/common_memory_pool.h>
34
#include <spead2/common_thread_pool.h>
35
#include <spead2/common_inproc.h>
36
#if SPEAD2_USE_IBV
37
# include <spead2/common_ibv.h>
38
#endif
39

40
namespace py = pybind11;
41

42
namespace spead2
43
{
44

45
namespace detail
46
{
47

48
static std::list<std::function<void()>> stop_entries;
49
static std::function<void(log_level, const std::string &)> orig_logger;
50
static std::unique_ptr<log_function_python> our_logger;
51

52
static void run_exit_stoppers()
5✔
53
{
54
    while (!stop_entries.empty())
44✔
55
        stop_entries.front()();
39✔
56
    // Clear up our custom logger
57
    set_log_function(orig_logger);
5✔
58
    our_logger.reset();
5✔
59
}
5✔
60

61
} // namespace detail
62

63
exit_stopper::exit_stopper(std::function<void()> callback)
1,547✔
64
    : entry(detail::stop_entries.insert(detail::stop_entries.begin(), std::move(callback)))
1,547✔
65
{
66
}
1,547✔
67

68
void exit_stopper::reset()
3,031✔
69
{
70
    if (entry != detail::stop_entries.end())
3,031✔
71
    {
72
        detail::stop_entries.erase(entry);
1,547✔
73
        entry = detail::stop_entries.end();
1,547✔
74
    }
75
}
3,031✔
76

77
static void translate_exception_boost_io_error(std::exception_ptr p)
616✔
78
{
79
    try
80
    {
81
        if (p)
616✔
82
            std::rethrow_exception(p);
1,232✔
83
    }
84
    catch (const boost_io_error &e)
616✔
85
    {
86
        py::tuple args = py::make_tuple(e.code().value(), e.what());
7✔
87
        PyErr_SetObject(PyExc_IOError, args.ptr());
7✔
88
    }
7✔
89
}
7✔
90

91
template class socket_wrapper<boost::asio::ip::udp::socket>;
92
template class socket_wrapper<boost::asio::ip::tcp::socket>;
93
template class socket_wrapper<boost::asio::ip::tcp::acceptor>;
94

95
boost::asio::ip::address make_address_no_release(
753✔
96
    boost::asio::io_service &io_service, const std::string &hostname,
97
    boost::asio::ip::resolver_query_base::flags flags)
98
{
99
    if (hostname == "")
753✔
100
        return boost::asio::ip::address();
110✔
101
    using boost::asio::ip::udp;
102
    udp::resolver resolver(io_service);
643✔
103
    udp::resolver::query query(hostname, "", flags);
1,286✔
104
    return resolver.resolve(query)->endpoint().address();
643✔
105
}
645✔
106

107
void deprecation_warning(const char *msg)
128✔
108
{
109
    if (PyErr_WarnEx(PyExc_DeprecationWarning, msg, 1) == -1)
128✔
110
        throw py::error_already_set();
×
111
}
128✔
112

113
thread_pool_wrapper::~thread_pool_wrapper()
907✔
114
{
115
    stop();
907✔
116
}
907✔
117

118
void thread_pool_wrapper::stop()
933✔
119
{
120
    stopper.reset();
933✔
121
    py::gil_scoped_release gil;
933✔
122
    thread_pool::stop();
933✔
123
}
933✔
124

125
py::buffer_info request_buffer_info(const py::buffer &buffer, int extra_flags)
11,217✔
126
{
127
    std::unique_ptr<Py_buffer> view(new Py_buffer);
11,217✔
128
    int flags = PyBUF_STRIDES | PyBUF_FORMAT | extra_flags;
11,217✔
129
    if (PyObject_GetBuffer(buffer.ptr(), view.get(), flags) != 0)
11,217✔
130
        throw py::error_already_set();
×
131
    py::buffer_info info(view.get());
11,217✔
132
    view.release();
11,217✔
133
    return info;
22,434✔
134
}
11,217✔
135

136
constexpr unsigned int log_function_python::num_levels;
137
const char *const log_function_python::level_methods[log_function_python::num_levels] =
138
{
139
    "warning",
140
    "info",
141
    "debug"
142
};
143

144
log_function_python::log_function_python(
5✔
145
    pybind11::object logger, std::size_t ring_size) :
5✔
146
        overflowed(false),
5✔
147
        ring(ring_size)
5✔
148
{
149
    for (unsigned int i = 0; i < num_levels; i++)
20✔
150
        log_methods[i] = logger.attr(level_methods[i]);
15✔
151
    thread = std::thread([this] () { run(); });
10✔
152
}
5✔
153

154
void log_function_python::run()
258✔
155
{
156
    try
157
    {
158
        while (true)
159
        {
160
            auto msg = ring.pop();
258✔
161
            py::gil_scoped_acquire gil;
254✔
162
            log(msg.first, msg.second);
254✔
163
            /* If there are multiple messages queued, consume them while
164
             * the GIL is held, rather than dropping and regaining the
165
             * GIL; but limit it, so that we don't starve other threads
166
             * of the GIL.
167
             */
168
            try
169
            {
170
                for (int pass = 1; pass < 1024; pass++)
20,240✔
171
                {
172
                    msg = ring.try_pop();
20,240✔
173
                    log(msg.first, msg.second);
19,986✔
174
                }
175
            }
176
            catch (ringbuffer_empty &)
254✔
177
            {
178
            }
253✔
179
            if (overflowed.exchange(false))
253✔
180
                log(log_level::warning,
×
181
                    "Log ringbuffer was full - some log messages were dropped");
182
        }
255✔
183
    }
184
    catch (ringbuffer_stopped &)
5✔
185
    {
186
        // Could possibly report the overflowed flag here again - but this may be
187
        // deep into interpreter shutdown and it might not be safe to log.
188
    }
5✔
189
    catch (std::exception &e)
×
190
    {
191
        std::cerr << "Logger thread crashed with exception " << e.what() << '\n';
×
192
    }
×
193
}
5✔
194

195
void log_function_python::log(log_level level, const std::string &msg) const
20,240✔
196
{
197
    try
198
    {
199
        unsigned int level_idx = static_cast<unsigned int>(level);
20,240✔
200
        assert(level_idx < num_levels);
201
        log_methods[level_idx]("%s", msg);
20,240✔
202
    }
203
    catch (py::error_already_set &e)
×
204
    {
205
        // This can happen during interpreter shutdown, because the modules
206
        // needed for the logging have already been unloaded.
207
    }
×
208
}
20,240✔
209

210
void log_function_python::operator()(log_level level, const std::string &msg)
20,240✔
211
{
212
    /* A blocking push can potentially lead to deadlock: the consumer may be
213
     * blocked waiting for the GIL, and possibly we may be holding the GIL.
214
     * If there is so much logging that the consumer can't keep up, we
215
     * probably want to throttle the log messages anyway, so we just set a
216
     * flag.
217
     */
218
    try
219
    {
220
        ring.try_emplace(level, msg);
20,240✔
221
    }
222
    catch (ringbuffer_full &)
×
223
    {
224
        overflowed = true;
×
225
    }
×
226
}
20,240✔
227

228
void log_function_python::stop()
10✔
229
{
230
    stopper.reset();
10✔
231
    {
232
        py::gil_scoped_release gil;
10✔
233
        ring.stop();
10✔
234
        if (thread.joinable())
10✔
235
            thread.join();
5✔
236
    }
10✔
237
    for (unsigned int i = 0; i < num_levels; i++)
40✔
238
        log_methods[i] = py::object();
30✔
239
}
10✔
240

241
void register_module(py::module m)
5✔
242
{
243
    using namespace pybind11::literals;
244

245
    py::register_exception<ringbuffer_stopped>(m, "Stopped");
5✔
246
    py::register_exception<ringbuffer_empty>(m, "Empty");
5✔
247
    py::register_exception<ringbuffer_full>(m, "Full");
5✔
248
    py::register_exception_translator(translate_exception_boost_io_error);
5✔
249

250
#define EXPORT_ENUM(x) (m.attr(#x) = long(x))
251
    EXPORT_ENUM(BUG_COMPAT_DESCRIPTOR_WIDTHS);
5✔
252
    EXPORT_ENUM(BUG_COMPAT_SHAPE_BIT_1);
5✔
253
    EXPORT_ENUM(BUG_COMPAT_SWAP_ENDIAN);
5✔
254
    EXPORT_ENUM(BUG_COMPAT_PYSPEAD_0_5_2);
5✔
255

256
    EXPORT_ENUM(NULL_ID);
5✔
257
    EXPORT_ENUM(HEAP_CNT_ID);
5✔
258
    EXPORT_ENUM(HEAP_LENGTH_ID);
5✔
259
    EXPORT_ENUM(PAYLOAD_OFFSET_ID);
5✔
260
    EXPORT_ENUM(PAYLOAD_LENGTH_ID);
5✔
261
    EXPORT_ENUM(DESCRIPTOR_ID);
5✔
262
    EXPORT_ENUM(STREAM_CTRL_ID);
5✔
263

264
    EXPORT_ENUM(DESCRIPTOR_NAME_ID);
5✔
265
    EXPORT_ENUM(DESCRIPTOR_DESCRIPTION_ID);
5✔
266
    EXPORT_ENUM(DESCRIPTOR_SHAPE_ID);
5✔
267
    EXPORT_ENUM(DESCRIPTOR_FORMAT_ID);
5✔
268
    EXPORT_ENUM(DESCRIPTOR_ID_ID);
5✔
269
    EXPORT_ENUM(DESCRIPTOR_DTYPE_ID);
5✔
270

271
    EXPORT_ENUM(CTRL_STREAM_START);
5✔
272
    EXPORT_ENUM(CTRL_DESCRIPTOR_REISSUE);
5✔
273
    EXPORT_ENUM(CTRL_STREAM_STOP);
5✔
274
    EXPORT_ENUM(CTRL_DESCRIPTOR_UPDATE);
5✔
275

276
    EXPORT_ENUM(MEMCPY_STD);
5✔
277
    EXPORT_ENUM(MEMCPY_NONTEMPORAL);
5✔
278
#undef EXPORT_ENUM
279

280
    m.def("log_info", [](const std::string &msg) { log_info("%s", msg); },
20,005✔
281
          "Log a message at INFO level (for testing only)");
282

283
    py::class_<flavour>(m, "Flavour")
5✔
284
        .def(py::init<int, int, int, bug_compat_mask>(),
5✔
285
             "version"_a, "item_pointer_bits"_a,
5✔
286
             "heap_address_bits"_a, "bug_compat"_a=0)
10✔
287
        .def(py::init<>())
5✔
288
        .def(py::self == py::self)
5✔
289
        .def(py::self != py::self)
5✔
290
        .def_property_readonly("version", SPEAD2_PTMF(flavour, get_version))
5✔
291
        .def_property_readonly("item_pointer_bits", SPEAD2_PTMF(flavour, get_item_pointer_bits))
5✔
292
        .def_property_readonly("heap_address_bits", SPEAD2_PTMF(flavour, get_heap_address_bits))
5✔
293
        .def_property_readonly("bug_compat", SPEAD2_PTMF(flavour, get_bug_compat));
5✔
294

295
    py::class_<memory_allocator, std::shared_ptr<memory_allocator>>(m, "MemoryAllocator")
5✔
296
        .def(py::init<>());
5✔
297

298
    py::class_<mmap_allocator, memory_allocator, std::shared_ptr<mmap_allocator>>(
5✔
299
        m, "MmapAllocator")
300
        .def(py::init<int>(), "flags"_a=0);
5✔
301

302
    py::class_<memory_pool, memory_allocator, std::shared_ptr<memory_pool>>(
5✔
303
        m, "MemoryPool")
304
        .def(py::init<std::size_t, std::size_t, std::size_t, std::size_t, std::shared_ptr<memory_allocator>>(),
5✔
305
             "lower"_a, "upper"_a, "max_free"_a, "initial"_a, py::arg_v("allocator", nullptr, "None"))
10✔
306
        .def(py::init<std::shared_ptr<thread_pool>, std::size_t, std::size_t, std::size_t, std::size_t, std::size_t, std::shared_ptr<memory_allocator>>(),
5✔
307
             "thread_pool"_a, "lower"_a, "upper"_a, "max_free"_a, "initial"_a, "low_water"_a, "allocator"_a)
5✔
308
        .def_property("warn_on_empty",
5✔
309
                      &memory_pool::get_warn_on_empty, &memory_pool::set_warn_on_empty);
5✔
310

311
    py::class_<thread_pool_wrapper, std::shared_ptr<thread_pool_wrapper>>(m, "ThreadPool")
5✔
312
        .def(py::init<int>(), "threads"_a = 1)
10✔
313
        .def(py::init<int, const std::vector<int> &>(), "threads"_a, "affinity"_a)
5✔
314
        .def_static("set_affinity", &thread_pool_wrapper::set_affinity)
5✔
315
        .def("stop", SPEAD2_PTMF(thread_pool_wrapper, stop));
5✔
316

317
    py::class_<inproc_queue, std::shared_ptr<inproc_queue>>(m, "InprocQueue")
5✔
318
        .def(py::init<>())
5✔
319
        .def("add_packet", [](inproc_queue &self, py::buffer obj)
5✔
320
        {
321
            py::buffer_info info = request_buffer_info(obj, PyBUF_C_CONTIGUOUS);
25✔
322
            inproc_queue::packet pkt;
25✔
323
            pkt.size = info.size * info.itemsize;
25✔
324
            pkt.data = std::unique_ptr<std::uint8_t[]>{new std::uint8_t[pkt.size]};
25✔
325
            std::memcpy(pkt.data.get(), info.ptr, pkt.size);
25✔
326
            self.add_packet(std::move(pkt));
25✔
327
        }, "packet")
25✔
328
        .def("stop", SPEAD2_PTMF(inproc_queue, stop));
5✔
329

330
    py::class_<descriptor>(m, "RawDescriptor")
5✔
331
        .def(py::init<>())
5✔
332
        .def_readwrite("id", &descriptor::id)
5✔
333
        .def_property("name", bytes_getter(&descriptor::name), bytes_setter(&descriptor::name))
10✔
334
        .def_property("description", bytes_getter(&descriptor::description), bytes_setter(&descriptor::description))
10✔
335
        .def_property("shape", [](const descriptor &d) -> py::list
5✔
336
        {
337
            py::list out;
910✔
338
            for (const auto &size : d.shape)
968✔
339
            {
340
                if (size >= 0)
58✔
341
                    out.append(size);
38✔
342
                else
343
                    out.append(py::none());
20✔
344
            }
345
            return out;
910✔
346
        }, [](descriptor &d, py::sequence shape)
×
347
        {
348
            std::vector<std::int64_t> out;
1,218✔
349
            out.reserve(len(shape));
1,218✔
350
            for (std::size_t i = 0; i < len(shape); i++)
1,633✔
351
            {
352
                py::object value = shape[i];
415✔
353
                if (value.is_none())
415✔
354
                    out.push_back(-1);
15✔
355
                else
356
                {
357
                    std::int64_t v = value.cast<std::int64_t>();
400✔
358
                    // TODO: verify range (particularly, >= 0)
359
                    out.push_back(v);
400✔
360
                }
361
            }
415✔
362
            d.shape = std::move(out);
1,218✔
363
        })
1,218✔
364
        .def_readwrite("format", &descriptor::format)
5✔
365
        .def_property("numpy_header", bytes_getter(&descriptor::numpy_header), bytes_setter(&descriptor::numpy_header))
5✔
366
    ;
367
#if SPEAD2_USE_IBV
368
    py::class_<ibv_context_t>(m, "IbvContext")
5✔
369
        .def(py::init([](const std::string &interface_address)
5✔
370
            {
371
                py::gil_scoped_release release;
×
372
                boost::asio::io_service io_service;
×
373
                return ibv_context_t(make_address_no_release(
×
374
                    io_service, interface_address, boost::asio::ip::udp::resolver::query::passive));
×
375
            }), "interface"_a)
5✔
376
        .def("reset", [](ibv_context_t &self) { self.reset(); })
5✔
377
    ;
378
#endif
379
}
5✔
380

381
void register_logging()
5✔
382
{
383
    py::object logging_module = py::module::import("logging");
5✔
384
    py::object logger = logging_module.attr("getLogger")("spead2");
5✔
385
    detail::our_logger.reset(new log_function_python(logger));
5✔
386
    detail::orig_logger = set_log_function(std::ref(*detail::our_logger));
5✔
387
}
5✔
388

389
void register_atexit()
5✔
390
{
391
    py::module atexit_mod = py::module::import("atexit");
5✔
392
    atexit_mod.attr("register")(py::cpp_function(detail::run_exit_stoppers));
5✔
393
}
5✔
394

395
buffer_allocation::buffer_allocation(py::buffer buf)
312✔
396
    : obj(std::move(buf)),
312✔
397
    buffer_info(request_buffer_info(obj, PyBUF_C_CONTIGUOUS | PyBUF_WRITEABLE))
312✔
398
{
399
}
312✔
400

401
namespace
402
{
403

404
/* Function object that acts as a deleter for a wrapped buffer_allocation. It's
405
 * a class rather than a lambda so that the shared_ptr can be constructed by
406
 * move instead of copy in C++11 (which doesn't support C++14 generalised
407
 * lambda captures), and to provide get_allocation.
408
 *
409
 * It needs to hold a shared_ptr rather than a unique_ptr because std::function
410
 * requires the function to be copyable. In practice it is unlikely to be
411
 * copied.
412
 */
413
class buffer_allocation_deleter
414
{
415
private:
416
    std::shared_ptr<buffer_allocation> alloc;
417

418
public:
419
    explicit buffer_allocation_deleter(std::shared_ptr<buffer_allocation> alloc)
312✔
420
        : alloc(std::move(alloc)) {}
312✔
421

422
    void operator()(std::uint8_t *ptr) const
312✔
423
    {
424
        alloc->buffer_info = py::buffer_info();
312✔
425
        alloc->obj = py::object();
312✔
426
    }
312✔
427

428
    buffer_allocation &get_allocation() const
1,502✔
429
    {
430
        return *alloc;
1,502✔
431
    }
432
};
433

434
} // anonymous namespace
435

436
buffer_allocation *get_buffer_allocation(const memory_allocator::pointer &ptr)
134✔
437
{
438
    const auto *deleter = ptr.get_deleter().target<buffer_allocation_deleter>();
134✔
439
    if (deleter)
134✔
440
        return &deleter->get_allocation();
134✔
441
    else
442
        return nullptr;
×
443
}
444

445
} // namespace spead2
446

447
namespace PYBIND11_NAMESPACE
448
{
449
namespace detail
450
{
451

452
bool type_caster<spead2::memory_allocator::pointer>::load(handle src, bool convert)
314✔
453
{
454
    if (src.is_none())
314✔
455
    {
456
        value.reset();
2✔
457
        return true;
2✔
458
    }
459
    if (!PyObject_CheckBuffer(src.ptr()))
312✔
460
        return false;
×
461
    // Create a pointer wrapping the buffer_allocation
462
    auto alloc = std::make_shared<spead2::buffer_allocation>(reinterpret_borrow<buffer>(src));
312✔
463
    // copy the pointer before moving from alloc
464
    std::uint8_t *ptr = static_cast<std::uint8_t *>(alloc->buffer_info.ptr);
312✔
465
    value = spead2::memory_allocator::pointer(
624✔
466
        ptr, spead2::buffer_allocation_deleter(std::move(alloc)));
936✔
467
    return true;
312✔
468
}
312✔
469

470
handle type_caster<spead2::memory_allocator::pointer>::cast(
1,680✔
471
    const spead2::memory_allocator::pointer &ptr, return_value_policy, handle)
472
{
473
    if (!ptr)
1,680✔
474
        return none().inc_ref();
312✔
475
    auto deleter = ptr.get_deleter().target<spead2::buffer_allocation_deleter>();
1,368✔
476
    if (!deleter)
1,368✔
477
        throw type_error("pointer did not come from a Python buffer object");
×
478
    return deleter->get_allocation().obj.inc_ref();
1,368✔
479
}
480

481
} // namespace detail
482
} // namespace PYBIND11_NAMESPACE
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc