• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5782484957

pending completion
5782484957

push

github

web-flow
Merge pull request #232 from ska-sa/delay-freeing-readers

Defer deleting readers until stream destruction

19 of 19 new or added lines in 6 files covered. (100.0%)

5418 of 7230 relevant lines covered (74.94%)

52699.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.84
/src/send_writer.cpp
1
/* Copyright 2020 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cassert>
22
#include <array>
23
#include <chrono>
24
#include <forward_list>
25
#include <algorithm>
26
#include <tuple>
27
#include <boost/utility/in_place_factory.hpp>
28
#include <spead2/send_writer.h>
29
#include <spead2/send_stream.h>
30

31
namespace spead2
32
{
33
namespace send
34
{
35

36
writer::precise_time::precise_time(const coarse_type &coarse)
5,709✔
37
    : coarse(coarse), correction(0.0)
5,709✔
38
{
39
}
5,709✔
40

41
void writer::precise_time::normalize()
1,022✔
42
{
43
    auto floor = std::chrono::duration_cast<coarse_type::duration>(correction);
1,022✔
44
    if (correction < floor)
1,022✔
45
        floor -= coarse_type::duration(1);  // cast rounds negative values up instead of down
×
46
    coarse += floor;
1,022✔
47
    correction -= floor;
1,022✔
48
}
1,022✔
49

50
writer::precise_time &writer::precise_time::operator+=(const correction_type &delta)
1,022✔
51
{
52
    correction += delta;
1,022✔
53
    normalize();
1,022✔
54
    return *this;
1,022✔
55
}
56

57
bool writer::precise_time::operator<(const precise_time &other) const
6,220✔
58
{
59
    return std::tie(coarse, correction) < std::tie(other.coarse, other.correction);
6,220✔
60
}
61

62
void writer::set_owner(stream *owner)
298✔
63
{
64
    assert(!this->owner);
3✔
65
    assert(owner);
3✔
66
    this->owner = owner;
298✔
67
}
298✔
68

69
void writer::enable_hw_rate()
×
70
{
71
    assert(config.get_rate_method() != rate_method::SW);
×
72
    hw_rate = true;
×
73
}
×
74

75
writer::timer_type::time_point writer::update_send_times(timer_type::time_point now)
511✔
76
{
77
    std::chrono::duration<double> wait_burst(rate_bytes * seconds_per_byte_burst);
511✔
78
    std::chrono::duration<double> wait(rate_bytes * seconds_per_byte);
511✔
79
    send_time_burst += wait_burst;
511✔
80
    send_time += wait;
511✔
81
    rate_bytes = 0;
511✔
82

83
    /* send_time_burst needs to reflect the time the burst
84
     * was actually sent (as well as we can estimate it), even if
85
     * send_time or now is later.
86
     */
87
    precise_time target_time = std::max(send_time_burst, send_time);
511✔
88
    send_time_burst = std::max(precise_time(now), target_time);
511✔
89
    return target_time.get_coarse();
511✔
90
}
91

92
void writer::update_send_time_empty()
5,198✔
93
{
94
    timer_type::time_point now = timer_type::clock_type::now();
5,198✔
95
    /* Compute what send_time would need to be to make the next packet due to be
96
     * transmitted now. The calculations are mostly done without using
97
     * precise_time, because "now" is coarse to start with.
98
     */
99
    std::chrono::duration<double> wait(rate_bytes * seconds_per_byte);
5,198✔
100
    auto wait2 = std::chrono::duration_cast<timer_type::clock_type::duration>(wait);
5,198✔
101
    timer_type::time_point backdate = now - wait2;
5,198✔
102
    send_time = std::max(send_time, precise_time(backdate));
5,198✔
103
}
5,198✔
104

105
writer::packet_result writer::get_packet(transmit_packet &data, std::uint8_t *scratch)
33,177✔
106
{
107
    if (must_sleep)
33,177✔
108
    {
109
        auto now = timer_type::clock_type::now();
338✔
110
        if (now < send_time_burst.get_coarse())
338✔
111
            return packet_result::SLEEP;
338✔
112
        else
113
            must_sleep = false;
×
114
    }
115
    if (rate_bytes >= config.get_burst_size())
32,839✔
116
    {
117
        auto now = timer_type::clock_type::now();
511✔
118
        auto target_time = update_send_times(now);
511✔
119
        if (now < target_time)
511✔
120
        {
121
            must_sleep = true;
346✔
122
            return packet_result::SLEEP;
346✔
123
        }
124
    }
125

126
    if (active == queue_tail)
32,493✔
127
    {
128
        /* We've read up to our cached tail. See if there has been
129
         * new work added in the meantime.
130
         */
131
        queue_tail = get_owner()->queue_tail.load(std::memory_order_acquire);
10,770✔
132
        if (active == queue_tail)
10,770✔
133
            return packet_result::EMPTY;
5,557✔
134
    }
135
    detail::queue_item *cur = get_owner()->get_queue(active);
26,936✔
136
    assert(cur->gen.has_next_packet());
73✔
137

138
    data.buffers = cur->gen.next_packet(scratch);
26,936✔
139
    data.size = boost::asio::buffer_size(data.buffers);
26,936✔
140
    data.substream_index = cur->substream_index;
26,936✔
141
    // Point at the start of the group, so that errors and byte counts accumulate
142
    // in one place.
143
    data.item = get_owner()->get_queue(active_start);
26,936✔
144
    if (!hw_rate)
26,936✔
145
        rate_bytes += data.size;
26,936✔
146
    data.last = false;
26,936✔
147

148
    switch (cur->mode)
26,936✔
149
    {
150
    case group_mode::ROUND_ROBIN:
9,010✔
151
        {
152
            std::size_t next_active = active;
9,010✔
153
            // Find the heap to use for the next packet, skipping exhausted heaps
154
            next_active = cur->group_next;
9,010✔
155
            detail::queue_item *next = (next_active == active) ? cur : get_owner()->get_queue(next_active);
9,010✔
156
            while (!next->gen.has_next_packet())
9,174✔
157
            {
158
                if (next_active == active)
220✔
159
                {
160
                    // We've gone all the way around the group and not found anything,
161
                    // so the group is exhausted.
162
                    data.last = true;
56✔
163
                    active = cur->group_end;
56✔
164
                    active_start = active;
56✔
165
                    return packet_result::SUCCESS;
56✔
166
                }
167
                next_active = next->group_next;
164✔
168
                next = get_owner()->get_queue(next_active);
164✔
169
            }
170
            // Cache the result so that we can skip the search next time
171
            cur->group_next = next_active;
8,954✔
172
            active = next_active;
8,954✔
173
        }
174
        break;
8,954✔
175
    case group_mode::SERIAL:
17,926✔
176
        {
177
            detail::queue_item *next = cur;
17,926✔
178
            while (!next->gen.has_next_packet())
18,090✔
179
            {
180
                active++;
5,325✔
181
                if (active == cur->group_end)
5,325✔
182
                {
183
                    // We've finished all the heaps in the group
184
                    data.last = true;
5,161✔
185
                    active_start = active;
5,161✔
186
                    return packet_result::SUCCESS;
5,161✔
187
                }
188
                next = get_owner()->get_queue(active);
164✔
189
            }
190
        }
191
        break;
12,765✔
192
    }
193
    return packet_result::SUCCESS;
21,719✔
194
}
195

196
void writer::groups_completed(std::size_t n)
5,217✔
197
{
198
    struct bound_handler
199
    {
200
        stream::completion_handler handler;
201
        boost::system::error_code result;
202
        std::size_t bytes_transferred;
203

204
        bound_handler() = default;
83,472✔
205
        bound_handler(stream::completion_handler &&handler, const boost::system::error_code &result, std::size_t bytes_transferred)
5,217✔
206
            : handler(std::move(handler)), result(result), bytes_transferred(bytes_transferred)
5,217✔
207
        {
208
        }
5,217✔
209
    };
210

211
    /**
212
     * We have to ensure that we vacate the slots (and update the head)
213
     * before we trigger any callbacks or promises, because otherwise a
214
     * callback that immediately tries to enqueue new heaps might find that
215
     * the queue is still full.
216
     *
217
     * Batching amortises the locking overhead when using small heaps. We
218
     * could use a single dynamically-sized batch, but that would require
219
     * dynamic memory allocation to hold the handlers.
220
     */
221
    constexpr std::size_t max_batch = 16;
5,217✔
222
    std::forward_list<std::promise<void>> waiters;
5,217✔
223
    std::array<bound_handler, max_batch> handlers;
5,217✔
224
    while (n > 0)
10,434✔
225
    {
226
        std::size_t batch = std::min(max_batch, n);
5,217✔
227
        {
228
            std::lock_guard<std::mutex> lock(get_owner()->head_mutex);
5,217✔
229
            for (std::size_t i = 0; i < batch; i++)
10,434✔
230
            {
231
                detail::queue_item *cur = get_owner()->get_queue(queue_head);
5,217✔
232
                handlers[i] = bound_handler(
10,434✔
233
                    std::move(cur->handler), cur->result, cur->bytes_sent);
10,434✔
234
                waiters.splice_after(waiters.before_begin(), cur->waiters);
5,217✔
235
                std::size_t next_queue_head = cur->group_end;
5,217✔
236
                cur->~queue_item();
5,217✔
237
                queue_head++;
5,217✔
238
                // For a group with > 1 heap, destroy the rest of the group
239
                // and splice in waiters.
240
                while (queue_head != next_queue_head)
5,545✔
241
                {
242
                    cur = get_owner()->get_queue(queue_head);
328✔
243
                    waiters.splice_after(waiters.before_begin(), cur->waiters);
328✔
244
                    cur->~queue_item();
328✔
245
                    queue_head++;
328✔
246
                }
247
            }
248
            // After this, async_send_heaps is free to reuse the slots we've
249
            // just vacated.
250
            get_owner()->queue_head.store(queue_head, std::memory_order_release);
5,217✔
251
        }
5,217✔
252
        for (std::size_t i = 0; i < batch; i++)
10,434✔
253
            handlers[i].handler(handlers[i].result, handlers[i].bytes_transferred);
5,217✔
254
        while (!waiters.empty())
5,219✔
255
        {
256
            waiters.front().set_value();
2✔
257
            waiters.pop_front();
2✔
258
        }
259
        n -= batch;
5,217✔
260
    }
261
}
5,217✔
262

263
void writer::sleep()
346✔
264
{
265
    if (must_sleep)
346✔
266
    {
267
        timer.expires_at(send_time_burst.get_coarse());
346✔
268
        timer.async_wait(
346✔
269
            [this](const boost::system::error_code &) {
346✔
270
                must_sleep = false;
346✔
271
                wakeup();
346✔
272
            }
346✔
273
        );
274
    }
275
    else
276
    {
277
        post_wakeup();
×
278
    }
279
}
346✔
280

281
void writer::request_wakeup()
5,496✔
282
{
283
    std::size_t old_tail = queue_tail;
5,496✔
284
    {
285
        std::lock_guard<std::mutex> tail_lock(get_owner()->tail_mutex);
5,496✔
286
        queue_tail = get_owner()->queue_tail.load(std::memory_order_acquire);
5,496✔
287
        if (queue_tail == old_tail)
5,496✔
288
        {
289
            get_owner()->need_wakeup = true;
5,496✔
290
            return;
5,496✔
291
        }
292
    }
5,496✔
293
    // If we get here, new work was added since the last call to get_packet,
294
    // so we must just wake ourselves up.
295
    post_wakeup();
×
296
}
297

298
void writer::post_wakeup()
9,575✔
299
{
300
    get_io_service().post([this]() { wakeup(); });
19,150✔
301
}
9,575✔
302

303
writer::writer(io_service_ref io_service, const stream_config &config)
302✔
304
    : config(config),
302✔
305
    seconds_per_byte_burst(config.get_burst_rate() > 0.0 ? 1.0 / config.get_burst_rate() : 0.0),
302✔
306
    seconds_per_byte(config.get_rate() > 0.0 ? 1.0 / config.get_rate() : 0.0),
302✔
307
    io_service(std::move(io_service)),
302✔
308
    timer(*this->io_service)
604✔
309
{
310
}
302✔
311

312
} // namespace send
313
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc