• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6667407230

27 Oct 2023 11:56AM UTC coverage: 69.614%. Remained the same
6667407230

push

github

bmerry
Bump version numbers

I'm assuming that the ABI hasn't changed. The symbol for
memcpy_nontemporal changes type, but the calling code shouldn't be aware
of that.

4747 of 6819 relevant lines covered (69.61%)

123594.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.84
/src/send_writer.cpp
1
/* Copyright 2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cassert>
22
#include <array>
23
#include <chrono>
24
#include <forward_list>
25
#include <algorithm>
26
#include <tuple>
27
#include <boost/utility/in_place_factory.hpp>
28
#include <spead2/send_writer.h>
29
#include <spead2/send_stream.h>
30

31
namespace spead2::send
32
{
33

34
writer::precise_time::precise_time(const coarse_type &coarse)
1,044✔
35
    : coarse(coarse), correction(0.0)
1,044✔
36
{
37
}
1,044✔
38

39
void writer::precise_time::normalize()
894✔
40
{
41
    auto floor = std::chrono::duration_cast<coarse_type::duration>(correction);
894✔
42
    if (correction < floor)
894✔
43
        floor -= coarse_type::duration(1);  // cast rounds negative values up instead of down
×
44
    coarse += floor;
894✔
45
    correction -= floor;
894✔
46
}
894✔
47

48
writer::precise_time &writer::precise_time::operator+=(const correction_type &delta)
894✔
49
{
50
    correction += delta;
894✔
51
    normalize();
894✔
52
    return *this;
894✔
53
}
54

55
bool writer::precise_time::operator<(const precise_time &other) const
1,491✔
56
{
57
    return std::tie(coarse, correction) < std::tie(other.coarse, other.correction);
1,491✔
58
}
59

60
void writer::set_owner(stream *owner)
288✔
61
{
62
    assert(!this->owner);
288✔
63
    assert(owner);
288✔
64
    this->owner = owner;
288✔
65
}
288✔
66

67
void writer::enable_hw_rate()
×
68
{
69
    assert(config.get_rate_method() != rate_method::SW);
×
70
    hw_rate = true;
×
71
}
×
72

73
writer::timer_type::time_point writer::update_send_times(timer_type::time_point now)
447✔
74
{
75
    std::chrono::duration<double> wait_burst(rate_bytes * seconds_per_byte_burst);
447✔
76
    std::chrono::duration<double> wait(rate_bytes * seconds_per_byte);
447✔
77
    send_time_burst += wait_burst;
447✔
78
    send_time += wait;
447✔
79
    rate_bytes = 0;
447✔
80

81
    /* send_time_burst needs to reflect the time the burst
82
     * was actually sent (as well as we can estimate it), even if
83
     * send_time or now is later.
84
     */
85
    precise_time target_time = std::max(send_time_burst, send_time);
447✔
86
    send_time_burst = std::max(precise_time(now), target_time);
447✔
87
    return target_time.get_coarse();
447✔
88
}
89

90
void writer::update_send_time_empty()
597✔
91
{
92
    timer_type::time_point now = timer_type::clock_type::now();
597✔
93
    /* Compute what send_time would need to be to make the next packet due to be
94
     * transmitted now. The calculations are mostly done without using
95
     * precise_time, because "now" is coarse to start with.
96
     */
97
    std::chrono::duration<double> wait(rate_bytes * seconds_per_byte);
597✔
98
    auto wait2 = std::chrono::duration_cast<timer_type::clock_type::duration>(wait);
597✔
99
    timer_type::time_point backdate = now - wait2;
597✔
100
    send_time = std::max(send_time, precise_time(backdate));
597✔
101
}
597✔
102

103
writer::packet_result writer::get_packet(transmit_packet &data, std::uint8_t *scratch)
23,995✔
104
{
105
    if (must_sleep)
23,995✔
106
    {
107
        auto now = timer_type::clock_type::now();
338✔
108
        if (now < send_time_burst.get_coarse())
338✔
109
            return packet_result::SLEEP;
338✔
110
        else
111
            must_sleep = false;
×
112
    }
113
    if (rate_bytes >= config.get_burst_size())
23,657✔
114
    {
115
        auto now = timer_type::clock_type::now();
447✔
116
        auto target_time = update_send_times(now);
447✔
117
        if (now < target_time)
447✔
118
        {
119
            must_sleep = true;
346✔
120
            return packet_result::SLEEP;
346✔
121
        }
122
    }
123

124
    if (active == queue_tail)
23,311✔
125
    {
126
        /* We've read up to our cached tail. See if there has been
127
         * new work added in the meantime.
128
         */
129
        queue_tail = get_owner()->queue_tail.load(std::memory_order_acquire);
1,588✔
130
        if (active == queue_tail)
1,588✔
131
            return packet_result::EMPTY;
973✔
132
    }
133
    detail::queue_item *cur = get_owner()->get_queue(active);
22,338✔
134
    assert(cur->gen.has_next_packet());
22,338✔
135

136
    data.buffers = cur->gen.next_packet(scratch);
22,338✔
137
    data.size = boost::asio::buffer_size(data.buffers);
22,338✔
138
    data.substream_index = cur->substream_index;
22,338✔
139
    // Point at the start of the group, so that errors and byte counts accumulate
140
    // in one place.
141
    data.item = get_owner()->get_queue(active_start);
22,338✔
142
    if (!hw_rate)
22,338✔
143
        rate_bytes += data.size;
22,338✔
144
    data.last = false;
22,338✔
145

146
    switch (cur->mode)
22,338✔
147
    {
148
    case group_mode::ROUND_ROBIN:
9,010✔
149
        {
150
            std::size_t next_active = active;
9,010✔
151
            // Find the heap to use for the next packet, skipping exhausted heaps
152
            next_active = cur->group_next;
9,010✔
153
            detail::queue_item *next = (next_active == active) ? cur : get_owner()->get_queue(next_active);
9,010✔
154
            while (!next->gen.has_next_packet())
9,174✔
155
            {
156
                if (next_active == active)
220✔
157
                {
158
                    // We've gone all the way around the group and not found anything,
159
                    // so the group is exhausted.
160
                    data.last = true;
56✔
161
                    active = cur->group_end;
56✔
162
                    active_start = active;
56✔
163
                    return packet_result::SUCCESS;
56✔
164
                }
165
                next_active = next->group_next;
164✔
166
                next = get_owner()->get_queue(next_active);
164✔
167
            }
168
            // Cache the result so that we can skip the search next time
169
            cur->group_next = next_active;
8,954✔
170
            active = next_active;
8,954✔
171
        }
172
        break;
8,954✔
173
    case group_mode::SERIAL:
13,328✔
174
        {
175
            detail::queue_item *next = cur;
13,328✔
176
            while (!next->gen.has_next_packet())
13,492✔
177
            {
178
                active++;
727✔
179
                if (active == cur->group_end)
727✔
180
                {
181
                    // We've finished all the heaps in the group
182
                    data.last = true;
563✔
183
                    active_start = active;
563✔
184
                    return packet_result::SUCCESS;
563✔
185
                }
186
                next = get_owner()->get_queue(active);
164✔
187
            }
188
        }
189
        break;
12,765✔
190
    }
191
    return packet_result::SUCCESS;
21,719✔
192
}
193

194
void writer::groups_completed(std::size_t n)
619✔
195
{
196
    struct bound_handler
197
    {
198
        stream::completion_handler handler;
199
        boost::system::error_code result;
200
        std::size_t bytes_transferred;
201

202
        bound_handler() = default;
9,904✔
203
        bound_handler(stream::completion_handler &&handler, const boost::system::error_code &result, std::size_t bytes_transferred)
619✔
204
            : handler(std::move(handler)), result(result), bytes_transferred(bytes_transferred)
619✔
205
        {
206
        }
619✔
207
    };
208

209
    /**
210
     * We have to ensure that we vacate the slots (and update the head)
211
     * before we trigger any callbacks or promises, because otherwise a
212
     * callback that immediately tries to enqueue new heaps might find that
213
     * the queue is still full.
214
     *
215
     * Batching amortises the locking overhead when using small heaps. We
216
     * could use a single dynamically-sized batch, but that would require
217
     * dynamic memory allocation to hold the handlers.
218
     */
219
    constexpr std::size_t max_batch = 16;
619✔
220
    std::forward_list<std::promise<void>> waiters;
619✔
221
    std::array<bound_handler, max_batch> handlers;
619✔
222
    while (n > 0)
1,238✔
223
    {
224
        std::size_t batch = std::min(max_batch, n);
619✔
225
        {
226
            std::lock_guard<std::mutex> lock(get_owner()->head_mutex);
619✔
227
            for (std::size_t i = 0; i < batch; i++)
1,238✔
228
            {
229
                detail::queue_item *cur = get_owner()->get_queue(queue_head);
619✔
230
                handlers[i] = bound_handler(
1,238✔
231
                    std::move(cur->handler), cur->result, cur->bytes_sent);
1,238✔
232
                waiters.splice_after(waiters.before_begin(), cur->waiters);
619✔
233
                std::size_t next_queue_head = cur->group_end;
619✔
234
                cur->~queue_item();
619✔
235
                queue_head++;
619✔
236
                // For a group with > 1 heap, destroy the rest of the group
237
                // and splice in waiters.
238
                while (queue_head != next_queue_head)
947✔
239
                {
240
                    cur = get_owner()->get_queue(queue_head);
328✔
241
                    waiters.splice_after(waiters.before_begin(), cur->waiters);
328✔
242
                    cur->~queue_item();
328✔
243
                    queue_head++;
328✔
244
                }
245
            }
246
            // After this, async_send_heaps is free to reuse the slots we've
247
            // just vacated.
248
            get_owner()->queue_head.store(queue_head, std::memory_order_release);
619✔
249
        }
619✔
250
        for (std::size_t i = 0; i < batch; i++)
1,238✔
251
            handlers[i].handler(handlers[i].result, handlers[i].bytes_transferred);
619✔
252
        while (!waiters.empty())
621✔
253
        {
254
            waiters.front().set_value();
2✔
255
            waiters.pop_front();
2✔
256
        }
257
        n -= batch;
619✔
258
    }
259
}
619✔
260

261
void writer::sleep()
346✔
262
{
263
    if (must_sleep)
346✔
264
    {
265
        timer.expires_at(send_time_burst.get_coarse());
346✔
266
        timer.async_wait(
346✔
267
            [this](const boost::system::error_code &) {
346✔
268
                must_sleep = false;
346✔
269
                wakeup();
346✔
270
            }
346✔
271
        );
272
    }
273
    else
274
    {
275
        post_wakeup();
×
276
    }
277
}
346✔
278

279
void writer::request_wakeup()
885✔
280
{
281
    std::size_t old_tail = queue_tail;
885✔
282
    {
283
        std::lock_guard<std::mutex> tail_lock(get_owner()->tail_mutex);
885✔
284
        queue_tail = get_owner()->queue_tail.load(std::memory_order_acquire);
885✔
285
        if (queue_tail == old_tail)
885✔
286
        {
287
            get_owner()->need_wakeup = true;
885✔
288
            return;
885✔
289
        }
290
    }
885✔
291
    // If we get here, new work was added since the last call to get_packet,
292
    // so we must just wake ourselves up.
293
    post_wakeup();
×
294
}
295

296
void writer::post_wakeup()
4,965✔
297
{
298
    get_io_service().post([this]() { wakeup(); });
9,930✔
299
}
4,965✔
300

301
writer::writer(io_service_ref io_service, const stream_config &config)
292✔
302
    : config(config),
292✔
303
    seconds_per_byte_burst(config.get_burst_rate() > 0.0 ? 1.0 / config.get_burst_rate() : 0.0),
292✔
304
    seconds_per_byte(config.get_rate() > 0.0 ? 1.0 / config.get_rate() : 0.0),
292✔
305
    io_service(std::move(io_service)),
292✔
306
    timer(*this->io_service)
584✔
307
{
308
}
292✔
309

310
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc