• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 16780666019

06 Aug 2025 02:51PM UTC coverage: 78.72% (+0.3%) from 78.411%
16780666019

push

github

bmerry
Add discarded_chunks statistic for pop_if_full feature

This required some major rework of the way ready callbacks are handled.
It turned out the previous approach didn't carry enough information for
this to be workable for chunk stream groups, because the batch_stats
pointer is only useful if you know the statistic to slot mapping, and
that is specific to the stream rather than the group (even if it will
typically be uniform across the streams).

This thus required introducing a new variant on the chunk_ready_function
type (group_chunk_ready_function) and an awkward backwards compatibility
mechanism.

The same thing probably ought to be done for chunk_allocation_function,
but I've only partly gotten around to that.

26 of 32 new or added lines in 3 files covered. (81.25%)

7 existing lines in 2 files now uncovered.

5623 of 7143 relevant lines covered (78.72%)

90321.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.01
/src/send_stream.cpp
1
/* Copyright 2015, 2017, 2019-2020, 2023-2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <algorithm>
22
#include <limits>
23
#include <cmath>
24
#include <thread>
25
#include <stdexcept>
26
#include <new>
27
#include <spead2/common_logging.h>
28
#include <spead2/send_stream.h>
29
#include <spead2/send_writer.h>
30

31
namespace spead2::send
32
{
33

34
stream::queue_item_storage &stream::get_queue_storage(std::size_t idx)
79,705✔
35
{
36
    return queue[idx & queue_mask];
79,705✔
37
}
38

39
detail::queue_item *stream::get_queue(std::size_t idx)
74,050✔
40
{
41
    return get_queue_storage(idx).get();
74,050✔
42
}
43

44
static std::size_t compute_queue_mask(std::size_t size)
320✔
45
{
46
    if (size == 0)
320✔
47
        throw std::invalid_argument("max_heaps must be at least 1");
×
48
    if (size > std::numeric_limits<std::size_t>::max() / 2 + 1)
320✔
49
        throw std::invalid_argument("max_heaps is too large");
×
50
    std::size_t p2 = 1;
320✔
51
    while (p2 < size)
946✔
52
        p2 <<= 1;
626✔
53
    return p2 - 1;
320✔
54
}
55

56
stream::unwinder::unwinder(stream &s, std::size_t tail)
170✔
57
    : s(s), orig_tail(tail), tail(tail)
170✔
58
{
59
}
170✔
60

61
void stream::unwinder::set_tail(std::size_t tail)
446✔
62
{
63
    this->tail = tail;
446✔
64
}
446✔
65

66
void stream::unwinder::abort()
173✔
67
{
68
    for (std::size_t i = orig_tail; i != tail; i++)
179✔
69
        s.get_queue_storage(i).destroy();
6✔
70
}
173✔
71

72
void stream::unwinder::commit()
167✔
73
{
74
    orig_tail = tail;
167✔
75
}
167✔
76

77
stream::stream(std::unique_ptr<writer> &&w)
320✔
78
    : queue_size(w->config.get_max_heaps()),
320✔
79
    queue_mask(compute_queue_mask(queue_size)),
320✔
80
    num_substreams(w->get_num_substreams()),
320✔
81
    max_packet_size(w->config.get_max_packet_size()),
320✔
82
    default_wait_per_byte(
320✔
83
        std::chrono::duration<double>(w->config.get_rate() > 0.0 ? 1.0 / w->config.get_rate() : 0.0)
320✔
84
    ),
85
    w(std::move(w)),
320✔
86
    queue(new queue_item_storage[queue_mask + 1])
960✔
87
{
88
    this->w->set_owner(this);
320✔
89
    this->w->start();
320✔
90
}
320✔
91

92
stream::~stream()
320✔
93
{
94
    flush();
320✔
95
    /* The writer might still have a pending wakeup to check for new work.
96
     * Before we can safely delete it, we need it to have set need_wakeup.
97
     * A spin loop is not normally great style, but we take a hit on shutdown
98
     * to keep worker::request_wakeup fast when we're not shutting down.
99
     */
100
    std::unique_lock<std::mutex> lock(tail_mutex);
320✔
101
    while (!need_wakeup)
320✔
102
    {
UNCOV
103
        lock.unlock();
×
UNCOV
104
        std::this_thread::yield();
×
UNCOV
105
        lock.lock();
×
106
    }
107
}
320✔
108

109
boost::asio::io_context &stream::get_io_context() const
5,368✔
110
{
111
    return w->get_io_context();
5,368✔
112
}
113

114
boost::asio::io_context &stream::get_io_service() const
×
115
{
116
    return w->get_io_context();
×
117
}
118

119
void stream::set_cnt_sequence(item_pointer_t next, item_pointer_t step)
2✔
120
{
121
    if (step == 0)
2✔
122
        throw std::invalid_argument("step cannot be 0");
×
123
    std::unique_lock<std::mutex> lock(tail_mutex);
2✔
124
    next_cnt = next;
2✔
125
    step_cnt = step;
2✔
126
}
2✔
127

128
bool stream::async_send_heap(const heap &h, completion_handler handler,
5,206✔
129
                             s_item_pointer_t cnt,
130
                             std::size_t substream_index,
131
                             double rate)
132
{
133
    heap_reference ref(h, cnt, substream_index, rate);
5,206✔
134
    return async_send_heaps_impl<null_unwinder>(
5,206✔
135
        &ref, &ref + 1, std::move(handler), group_mode::SERIAL);
15,618✔
136
}
137

138
void stream::flush()
322✔
139
{
140
    std::future<void> future;
322✔
141
    {
142
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
322✔
143
        std::lock_guard<std::mutex> head_lock(head_mutex);
322✔
144
        // These could probably be read with relaxed consistency because the locks
145
        // ensure ordering, but this is not performance-critical.
146
        std::size_t tail = queue_tail.load();
322✔
147
        std::size_t head = queue_head.load();
322✔
148
        if (head == tail)
322✔
149
            return;
320✔
150
        detail::queue_item *item = get_queue(tail - 1);
2✔
151
        item->waiters.emplace_front();
2✔
152
        future = item->waiters.front().get_future();
2✔
153
    }
642✔
154

155
    future.wait();
2✔
156
}
322✔
157

158
std::size_t stream::get_num_substreams() const
×
159
{
160
    return num_substreams;
×
161
}
162

163
// Explicit instantiation
164
template bool stream::async_send_heaps_impl<stream::null_unwinder, heap_reference *>(
165
    heap_reference *first, heap_reference *last,
166
    completion_handler &&handler, group_mode mode);
167

168
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc