• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 20780411183

07 Jan 2026 11:45AM UTC coverage: 78.758% (-0.04%) from 78.801%
20780411183

push

github

web-flow
Merge pull request #413 from ska-sa/bump-rtd-dependencies

Update requirements-readthedocs.txt to latest versions

5569 of 7071 relevant lines covered (78.76%)

91224.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.01
/src/send_stream.cpp
1
/* Copyright 2015, 2017, 2019-2020, 2023-2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <algorithm>
22
#include <limits>
23
#include <cmath>
24
#include <thread>
25
#include <stdexcept>
26
#include <new>
27
#include <spead2/common_logging.h>
28
#include <spead2/send_stream.h>
29
#include <spead2/send_writer.h>
30

31
namespace spead2::send
32
{
33

34
stream::queue_item_storage &stream::get_queue_storage(std::size_t idx)
79,395✔
35
{
36
    return queue[idx & queue_mask];
79,395✔
37
}
38

39
detail::queue_item *stream::get_queue(std::size_t idx)
73,802✔
40
{
41
    return get_queue_storage(idx).get();
73,802✔
42
}
43

44
static std::size_t compute_queue_mask(std::size_t size)
319✔
45
{
46
    if (size == 0)
319✔
47
        throw std::invalid_argument("max_heaps must be at least 1");
×
48
    if (size > std::numeric_limits<std::size_t>::max() / 2 + 1)
319✔
49
        throw std::invalid_argument("max_heaps is too large");
×
50
    std::size_t p2 = 1;
319✔
51
    while (p2 < size)
943✔
52
        p2 <<= 1;
624✔
53
    return p2 - 1;
319✔
54
}
55

56
stream::unwinder::unwinder(stream &s, std::size_t tail)
170✔
57
    : s(s), orig_tail(tail), tail(tail)
170✔
58
{
59
}
170✔
60

61
void stream::unwinder::set_tail(std::size_t tail)
446✔
62
{
63
    this->tail = tail;
446✔
64
}
446✔
65

66
void stream::unwinder::abort()
173✔
67
{
68
    for (std::size_t i = orig_tail; i != tail; i++)
179✔
69
        s.get_queue_storage(i).destroy();
6✔
70
}
173✔
71

72
void stream::unwinder::commit()
167✔
73
{
74
    orig_tail = tail;
167✔
75
}
167✔
76

77
stream::stream(std::unique_ptr<writer> &&w)
319✔
78
    : queue_size(w->config.get_max_heaps()),
319✔
79
    queue_mask(compute_queue_mask(queue_size)),
319✔
80
    num_substreams(w->get_num_substreams()),
319✔
81
    max_packet_size(w->config.get_max_packet_size()),
319✔
82
    default_wait_per_byte(
319✔
83
        std::chrono::duration<double>(w->config.get_rate() > 0.0 ? 1.0 / w->config.get_rate() : 0.0)
319✔
84
    ),
85
    w(std::move(w)),
319✔
86
    queue(new queue_item_storage[queue_mask + 1])
957✔
87
{
88
    this->w->set_owner(this);
319✔
89
    this->w->start();
319✔
90
}
319✔
91

92
stream::~stream()
319✔
93
{
94
    flush();
319✔
95
    /* The writer might still have a pending wakeup to check for new work.
96
     * Before we can safely delete it, we need it to have set need_wakeup.
97
     * A spin loop is not normally great style, but we take a hit on shutdown
98
     * to keep worker::request_wakeup fast when we're not shutting down.
99
     */
100
    std::unique_lock<std::mutex> lock(tail_mutex);
319✔
101
    while (!need_wakeup)
319✔
102
    {
103
        lock.unlock();
×
104
        std::this_thread::yield();
×
105
        lock.lock();
×
106
    }
107
}
319✔
108

109
boost::asio::io_context &stream::get_io_context() const
5,306✔
110
{
111
    return w->get_io_context();
5,306✔
112
}
113

114
boost::asio::io_context &stream::get_io_service() const
×
115
{
116
    return w->get_io_context();
×
117
}
118

119
void stream::set_cnt_sequence(item_pointer_t next, item_pointer_t step)
2✔
120
{
121
    if (step == 0)
2✔
122
        throw std::invalid_argument("step cannot be 0");
×
123
    std::unique_lock<std::mutex> lock(tail_mutex);
2✔
124
    next_cnt = next;
2✔
125
    step_cnt = step;
2✔
126
}
2✔
127

128
bool stream::async_send_heap(const heap &h, completion_handler handler,
5,144✔
129
                             s_item_pointer_t cnt,
130
                             std::size_t substream_index,
131
                             double rate)
132
{
133
    heap_reference ref(h, cnt, substream_index, rate);
5,144✔
134
    return async_send_heaps_impl<null_unwinder>(
5,144✔
135
        &ref, &ref + 1, std::move(handler), group_mode::SERIAL);
15,432✔
136
}
137

138
void stream::flush()
321✔
139
{
140
    std::future<void> future;
321✔
141
    {
142
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
321✔
143
        std::lock_guard<std::mutex> head_lock(head_mutex);
321✔
144
        // These could probably be read with relaxed consistency because the locks
145
        // ensure ordering, but this is not performance-critical.
146
        std::size_t tail = queue_tail.load();
321✔
147
        std::size_t head = queue_head.load();
321✔
148
        if (head == tail)
321✔
149
            return;
319✔
150
        detail::queue_item *item = get_queue(tail - 1);
2✔
151
        item->waiters.emplace_front();
2✔
152
        future = item->waiters.front().get_future();
2✔
153
    }
640✔
154

155
    future.wait();
2✔
156
}
321✔
157

158
std::size_t stream::get_num_substreams() const
×
159
{
160
    return num_substreams;
×
161
}
162

163
// Explicit instantiation
164
template bool stream::async_send_heaps_impl<stream::null_unwinder, heap_reference *>(
165
    heap_reference *first, heap_reference *last,
166
    completion_handler &&handler, group_mode mode);
167

168
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc