• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5891294988

17 Aug 2023 01:03PM UTC coverage: 74.962% (-0.03%) from 74.99%
5891294988

push

github

bmerry
Fix a lingering reference to C++11

5422 of 7233 relevant lines covered (74.96%)

52656.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.84
/src/send_writer.cpp
1
/* Copyright 2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cassert>
22
#include <array>
23
#include <chrono>
24
#include <forward_list>
25
#include <algorithm>
26
#include <tuple>
27
#include <boost/utility/in_place_factory.hpp>
28
#include <spead2/send_writer.h>
29
#include <spead2/send_stream.h>
30

31
namespace spead2::send
32
{
33

34
writer::precise_time::precise_time(const coarse_type &coarse)
5,691✔
35
    : coarse(coarse), correction(0.0)
5,691✔
36
{
37
}
5,691✔
38

39
void writer::precise_time::normalize()
1,022✔
40
{
41
    auto floor = std::chrono::duration_cast<coarse_type::duration>(correction);
1,022✔
42
    if (correction < floor)
1,022✔
43
        floor -= coarse_type::duration(1);  // cast rounds negative values up instead of down
×
44
    coarse += floor;
1,022✔
45
    correction -= floor;
1,022✔
46
}
1,022✔
47

48
writer::precise_time &writer::precise_time::operator+=(const correction_type &delta)
1,022✔
49
{
50
    correction += delta;
1,022✔
51
    normalize();
1,022✔
52
    return *this;
1,022✔
53
}
54

55
bool writer::precise_time::operator<(const precise_time &other) const
6,202✔
56
{
57
    return std::tie(coarse, correction) < std::tie(other.coarse, other.correction);
6,202✔
58
}
59

60
void writer::set_owner(stream *owner)
298✔
61
{
62
    assert(!this->owner);
3✔
63
    assert(owner);
3✔
64
    this->owner = owner;
298✔
65
}
298✔
66

67
void writer::enable_hw_rate()
×
68
{
69
    assert(config.get_rate_method() != rate_method::SW);
×
70
    hw_rate = true;
×
71
}
×
72

73
writer::timer_type::time_point writer::update_send_times(timer_type::time_point now)
511✔
74
{
75
    std::chrono::duration<double> wait_burst(rate_bytes * seconds_per_byte_burst);
511✔
76
    std::chrono::duration<double> wait(rate_bytes * seconds_per_byte);
511✔
77
    send_time_burst += wait_burst;
511✔
78
    send_time += wait;
511✔
79
    rate_bytes = 0;
511✔
80

81
    /* send_time_burst needs to reflect the time the burst
82
     * was actually sent (as well as we can estimate it), even if
83
     * send_time or now is later.
84
     */
85
    precise_time target_time = std::max(send_time_burst, send_time);
511✔
86
    send_time_burst = std::max(precise_time(now), target_time);
511✔
87
    return target_time.get_coarse();
511✔
88
}
89

90
void writer::update_send_time_empty()
5,180✔
91
{
92
    timer_type::time_point now = timer_type::clock_type::now();
5,180✔
93
    /* Compute what send_time would need to be to make the next packet due to be
94
     * transmitted now. The calculations are mostly done without using
95
     * precise_time, because "now" is coarse to start with.
96
     */
97
    std::chrono::duration<double> wait(rate_bytes * seconds_per_byte);
5,180✔
98
    auto wait2 = std::chrono::duration_cast<timer_type::clock_type::duration>(wait);
5,180✔
99
    timer_type::time_point backdate = now - wait2;
5,180✔
100
    send_time = std::max(send_time, precise_time(backdate));
5,180✔
101
}
5,180✔
102

103
writer::packet_result writer::get_packet(transmit_packet &data, std::uint8_t *scratch)
33,159✔
104
{
105
    if (must_sleep)
33,159✔
106
    {
107
        auto now = timer_type::clock_type::now();
338✔
108
        if (now < send_time_burst.get_coarse())
338✔
109
            return packet_result::SLEEP;
338✔
110
        else
111
            must_sleep = false;
×
112
    }
113
    if (rate_bytes >= config.get_burst_size())
32,821✔
114
    {
115
        auto now = timer_type::clock_type::now();
511✔
116
        auto target_time = update_send_times(now);
511✔
117
        if (now < target_time)
511✔
118
        {
119
            must_sleep = true;
346✔
120
            return packet_result::SLEEP;
346✔
121
        }
122
    }
123

124
    if (active == queue_tail)
32,475✔
125
    {
126
        /* We've read up to our cached tail. See if there has been
127
         * new work added in the meantime.
128
         */
129
        queue_tail = get_owner()->queue_tail.load(std::memory_order_acquire);
10,751✔
130
        if (active == queue_tail)
10,751✔
131
            return packet_result::EMPTY;
5,539✔
132
    }
133
    detail::queue_item *cur = get_owner()->get_queue(active);
26,936✔
134
    assert(cur->gen.has_next_packet());
73✔
135

136
    data.buffers = cur->gen.next_packet(scratch);
26,936✔
137
    data.size = boost::asio::buffer_size(data.buffers);
26,936✔
138
    data.substream_index = cur->substream_index;
26,936✔
139
    // Point at the start of the group, so that errors and byte counts accumulate
140
    // in one place.
141
    data.item = get_owner()->get_queue(active_start);
26,936✔
142
    if (!hw_rate)
26,936✔
143
        rate_bytes += data.size;
26,936✔
144
    data.last = false;
26,936✔
145

146
    switch (cur->mode)
26,936✔
147
    {
148
    case group_mode::ROUND_ROBIN:
9,010✔
149
        {
150
            std::size_t next_active = active;
9,010✔
151
            // Find the heap to use for the next packet, skipping exhausted heaps
152
            next_active = cur->group_next;
9,010✔
153
            detail::queue_item *next = (next_active == active) ? cur : get_owner()->get_queue(next_active);
9,010✔
154
            while (!next->gen.has_next_packet())
9,174✔
155
            {
156
                if (next_active == active)
220✔
157
                {
158
                    // We've gone all the way around the group and not found anything,
159
                    // so the group is exhausted.
160
                    data.last = true;
56✔
161
                    active = cur->group_end;
56✔
162
                    active_start = active;
56✔
163
                    return packet_result::SUCCESS;
56✔
164
                }
165
                next_active = next->group_next;
164✔
166
                next = get_owner()->get_queue(next_active);
164✔
167
            }
168
            // Cache the result so that we can skip the search next time
169
            cur->group_next = next_active;
8,954✔
170
            active = next_active;
8,954✔
171
        }
172
        break;
8,954✔
173
    case group_mode::SERIAL:
17,926✔
174
        {
175
            detail::queue_item *next = cur;
17,926✔
176
            while (!next->gen.has_next_packet())
18,090✔
177
            {
178
                active++;
5,325✔
179
                if (active == cur->group_end)
5,325✔
180
                {
181
                    // We've finished all the heaps in the group
182
                    data.last = true;
5,161✔
183
                    active_start = active;
5,161✔
184
                    return packet_result::SUCCESS;
5,161✔
185
                }
186
                next = get_owner()->get_queue(active);
164✔
187
            }
188
        }
189
        break;
12,765✔
190
    }
191
    return packet_result::SUCCESS;
21,719✔
192
}
193

194
void writer::groups_completed(std::size_t n)
5,217✔
195
{
196
    struct bound_handler
197
    {
198
        stream::completion_handler handler;
199
        boost::system::error_code result;
200
        std::size_t bytes_transferred;
201

202
        bound_handler() = default;
83,472✔
203
        bound_handler(stream::completion_handler &&handler, const boost::system::error_code &result, std::size_t bytes_transferred)
5,217✔
204
            : handler(std::move(handler)), result(result), bytes_transferred(bytes_transferred)
5,217✔
205
        {
206
        }
5,217✔
207
    };
208

209
    /**
210
     * We have to ensure that we vacate the slots (and update the head)
211
     * before we trigger any callbacks or promises, because otherwise a
212
     * callback that immediately tries to enqueue new heaps might find that
213
     * the queue is still full.
214
     *
215
     * Batching amortises the locking overhead when using small heaps. We
216
     * could use a single dynamically-sized batch, but that would require
217
     * dynamic memory allocation to hold the handlers.
218
     */
219
    constexpr std::size_t max_batch = 16;
5,217✔
220
    std::forward_list<std::promise<void>> waiters;
5,217✔
221
    std::array<bound_handler, max_batch> handlers;
5,217✔
222
    while (n > 0)
10,434✔
223
    {
224
        std::size_t batch = std::min(max_batch, n);
5,217✔
225
        {
226
            std::lock_guard<std::mutex> lock(get_owner()->head_mutex);
5,217✔
227
            for (std::size_t i = 0; i < batch; i++)
10,434✔
228
            {
229
                detail::queue_item *cur = get_owner()->get_queue(queue_head);
5,217✔
230
                handlers[i] = bound_handler(
10,434✔
231
                    std::move(cur->handler), cur->result, cur->bytes_sent);
10,434✔
232
                waiters.splice_after(waiters.before_begin(), cur->waiters);
5,217✔
233
                std::size_t next_queue_head = cur->group_end;
5,217✔
234
                cur->~queue_item();
5,217✔
235
                queue_head++;
5,217✔
236
                // For a group with > 1 heap, destroy the rest of the group
237
                // and splice in waiters.
238
                while (queue_head != next_queue_head)
5,545✔
239
                {
240
                    cur = get_owner()->get_queue(queue_head);
328✔
241
                    waiters.splice_after(waiters.before_begin(), cur->waiters);
328✔
242
                    cur->~queue_item();
328✔
243
                    queue_head++;
328✔
244
                }
245
            }
246
            // After this, async_send_heaps is free to reuse the slots we've
247
            // just vacated.
248
            get_owner()->queue_head.store(queue_head, std::memory_order_release);
5,217✔
249
        }
5,217✔
250
        for (std::size_t i = 0; i < batch; i++)
10,434✔
251
            handlers[i].handler(handlers[i].result, handlers[i].bytes_transferred);
5,217✔
252
        while (!waiters.empty())
5,219✔
253
        {
254
            waiters.front().set_value();
2✔
255
            waiters.pop_front();
2✔
256
        }
257
        n -= batch;
5,217✔
258
    }
259
}
5,217✔
260

261
void writer::sleep()
346✔
262
{
263
    if (must_sleep)
346✔
264
    {
265
        timer.expires_at(send_time_burst.get_coarse());
346✔
266
        timer.async_wait(
346✔
267
            [this](const boost::system::error_code &) {
346✔
268
                must_sleep = false;
346✔
269
                wakeup();
346✔
270
            }
346✔
271
        );
272
    }
273
    else
274
    {
275
        post_wakeup();
×
276
    }
277
}
346✔
278

279
void writer::request_wakeup()
5,478✔
280
{
281
    std::size_t old_tail = queue_tail;
5,478✔
282
    {
283
        std::lock_guard<std::mutex> tail_lock(get_owner()->tail_mutex);
5,478✔
284
        queue_tail = get_owner()->queue_tail.load(std::memory_order_acquire);
5,478✔
285
        if (queue_tail == old_tail)
5,478✔
286
        {
287
            get_owner()->need_wakeup = true;
5,478✔
288
            return;
5,478✔
289
        }
290
    }
5,478✔
291
    // If we get here, new work was added since the last call to get_packet,
292
    // so we must just wake ourselves up.
293
    post_wakeup();
×
294
}
295

296
void writer::post_wakeup()
9,575✔
297
{
298
    get_io_service().post([this]() { wakeup(); });
19,150✔
299
}
9,575✔
300

301
writer::writer(io_service_ref io_service, const stream_config &config)
302✔
302
    : config(config),
302✔
303
    seconds_per_byte_burst(config.get_burst_rate() > 0.0 ? 1.0 / config.get_burst_rate() : 0.0),
302✔
304
    seconds_per_byte(config.get_rate() > 0.0 ? 1.0 / config.get_rate() : 0.0),
302✔
305
    io_service(std::move(io_service)),
302✔
306
    timer(*this->io_service)
604✔
307
{
308
}
302✔
309

310
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc