• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5991859444

27 Aug 2023 02:11PM UTC coverage: 75.031% (+0.03%) from 75.0%
5991859444

Pull #239

github

bmerry
Remove an unneeded lambda capture

It was giving a warning on Clang and hence failing CI.
Pull Request #239: Update to require at least C++17

152 of 152 new or added lines in 39 files covered. (100.0%)

5427 of 7233 relevant lines covered (75.03%)

52706.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.99
/src/recv_stream.cpp
1
/* Copyright 2015, 2017-2021, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cstddef>
22
#include <utility>
23
#include <algorithm>
24
#include <cassert>
25
#include <atomic>
26
#include <new>
27
#include <spead2/recv_stream.h>
28
#include <spead2/recv_live_heap.h>
29
#include <spead2/common_memcpy.h>
30
#include <spead2/common_thread_pool.h>
31
#include <spead2/common_logging.h>
32

33
#define INVALID_ENTRY ((queue_entry *) -1)
34

35
namespace spead2::recv
36
{
37

38
stream_stat_config::stream_stat_config(std::string name, mode mode_)
298✔
39
    : name(std::move(name)), mode_(mode_)
298✔
40
{
41
}
298✔
42

43
std::uint64_t stream_stat_config::combine(std::uint64_t a, std::uint64_t b) const
6,486✔
44
{
45
    switch (mode_)
6,486✔
46
    {
47
    case mode::COUNTER:
6,481✔
48
        return a + b;
6,481✔
49
    case mode::MAXIMUM:
6✔
50
        return std::max(a, b);
6✔
51
    }
52
    /* Line below should normally be unreachable. Using the same expression as
53
     * for COUNTER lets the compiler generate more efficient code, as it only
54
     * has to consider two cases (looks just as good as using GCC's
55
     * __builtin_unreachable, without depending on compiler specifics).
56
     */
57
    return a + b;   // LCOV_EXCL_LINE
58
}
59

60
bool operator==(const stream_stat_config &a, const stream_stat_config &b)
65✔
61
{
62
    return a.get_name() == b.get_name() && a.get_mode() == b.get_mode();
65✔
63
}
64

65
bool operator!=(const stream_stat_config &a, const stream_stat_config &b)
2✔
66
{
67
    return !(a == b);
2✔
68
}
69

70
/**
71
 * Get the index within @a stats corresponding to @a name. If it is not
72
 * present, returns @c stats.size().
73
 */
74
static std::size_t get_stat_index_nothrow(
351✔
75
    const std::vector<stream_stat_config> &stats,
76
    const std::string &name)
77
{
78
    for (std::size_t i = 0; i < stats.size(); i++)
3,186✔
79
        if (stats[i].get_name() == name)
2,947✔
80
            return i;
112✔
81
    return stats.size();
239✔
82
}
83

84
/**
85
 * Get the index within @a stats corresponding to @a name.
86
 *
87
 * @throw std::out_of_range if it is not present
88
 */
89
static std::size_t get_stat_index(
26✔
90
    const std::vector<stream_stat_config> &stats,
91
    const std::string &name)
92
{
93
    std::size_t ret = get_stat_index_nothrow(stats, name);
26✔
94
    if (ret == stats.size())
26✔
95
        throw std::out_of_range(name + " is not a known statistic name");
5✔
96
    return ret;
21✔
97
}
98

99

100
static std::shared_ptr<const std::vector<stream_stat_config>> make_default_stats()
6✔
101
{
102
    auto stats = std::make_shared<std::vector<stream_stat_config>>();
6✔
103
    // Keep this in sync with the stream_stat_* constexprs in the header
104
    stats->emplace_back("heaps", stream_stat_config::mode::COUNTER);
6✔
105
    stats->emplace_back("incomplete_heaps_evicted", stream_stat_config::mode::COUNTER);
6✔
106
    stats->emplace_back("incomplete_heaps_flushed", stream_stat_config::mode::COUNTER);
6✔
107
    stats->emplace_back("packets", stream_stat_config::mode::COUNTER);
6✔
108
    stats->emplace_back("batches", stream_stat_config::mode::COUNTER);
6✔
109
    stats->emplace_back("max_batch", stream_stat_config::mode::MAXIMUM);
6✔
110
    stats->emplace_back("single_packet_heaps", stream_stat_config::mode::COUNTER);
6✔
111
    stats->emplace_back("search_dist", stream_stat_config::mode::COUNTER);
6✔
112
    // For backwards compatibility, worker_blocked is always stats->emplace_backed, although
113
    // it is not part of the base stream statistics
114
    stats->emplace_back("worker_blocked", stream_stat_config::mode::COUNTER);
6✔
115
    assert(stats->size() == stream_stat_indices::custom);
1✔
116
    return stats;
12✔
117
}
6✔
118

119
/* This is used for stream_stats objects that do not have any custom statistics.
120
 * Sharing this means the compatibility check for operator+ requires only a
121
 * pointer comparison rather than comparing arrays.
122
 */
123
static std::shared_ptr<const std::vector<stream_stat_config>> default_stats = make_default_stats();
124

125
stream_stats::stream_stats()
15✔
126
    : stream_stats(default_stats)
15✔
127
{
128
}
15✔
129

130
stream_stats::stream_stats(std::shared_ptr<const std::vector<stream_stat_config>> config)
15✔
131
    : stream_stats(config, std::vector<std::uint64_t>(config->size()))
15✔
132
{
133
    // Note: annoyingly, can't use std::move(config) above, because we access
134
    // config to get the size to use for the vector.
135
}
15✔
136

137
stream_stats::stream_stats(std::shared_ptr<const std::vector<stream_stat_config>> config,
42✔
138
                           std::vector<std::uint64_t> values)
42✔
139
    : config(std::move(config)),
42✔
140
    values(std::move(values)),
42✔
141
    heaps(this->values[stream_stat_indices::heaps]),
42✔
142
    incomplete_heaps_evicted(this->values[stream_stat_indices::incomplete_heaps_evicted]),
42✔
143
    incomplete_heaps_flushed(this->values[stream_stat_indices::incomplete_heaps_flushed]),
42✔
144
    packets(this->values[stream_stat_indices::packets]),
42✔
145
    batches(this->values[stream_stat_indices::batches]),
42✔
146
    worker_blocked(this->values[stream_stat_indices::worker_blocked]),
42✔
147
    max_batch(this->values[stream_stat_indices::max_batch]),
42✔
148
    single_packet_heaps(this->values[stream_stat_indices::single_packet_heaps]),
42✔
149
    search_dist(this->values[stream_stat_indices::search_dist])
42✔
150
{
151
    assert(this->config->size() >= stream_stat_indices::custom);
16✔
152
    assert(this->config->size() == this->values.size());
16✔
153
}
42✔
154

155
stream_stats::stream_stats(const stream_stats &other)
4✔
156
    : stream_stats(other.config, other.values)
4✔
157
{
158
}
4✔
159

160
stream_stats &stream_stats::operator=(const stream_stats &other)
2✔
161
{
162
    if (config != other.config && *config != *other.config)
2✔
163
        throw std::invalid_argument("config must match to assign stats");
×
164
    for (std::size_t i = 0; i < values.size(); i++)
20✔
165
        values[i] = other.values[i];
18✔
166
    return *this;
2✔
167
}
168

169
std::uint64_t &stream_stats::operator[](const std::string &name)
16✔
170
{
171
    return at(name);
16✔
172
}
173

174
const std::uint64_t &stream_stats::operator[](const std::string &name) const
2✔
175
{
176
    return at(name);
2✔
177
}
178

179
std::uint64_t &stream_stats::at(const std::string &name)
19✔
180
{
181
    return values[get_stat_index(*config, name)];
19✔
182
}
183

184
const std::uint64_t &stream_stats::at(const std::string &name) const
4✔
185
{
186
    return values[get_stat_index(*config, name)];
4✔
187
}
188

189
stream_stats::iterator stream_stats::find(const std::string &name)
30✔
190
{
191
    return iterator(*this, get_stat_index_nothrow(*config, name));
30✔
192
}
193

194
stream_stats::const_iterator stream_stats::find(const std::string &name) const
62✔
195
{
196
    return const_iterator(*this, get_stat_index_nothrow(*config, name));
62✔
197
}
198

199
std::size_t stream_stats::count(const std::string &name) const
2✔
200
{
201
    return get_stat_index_nothrow(*config, name) != values.size() ? 1 : 0;
2✔
202
}
203

204
stream_stats stream_stats::operator+(const stream_stats &other) const
3✔
205
{
206
    stream_stats out = *this;
3✔
207
    out += other;
3✔
208
    return out;
2✔
209
}
1✔
210

211
stream_stats &stream_stats::operator+=(const stream_stats &other)
5✔
212
{
213
    if (config != other.config && *config != *other.config)
5✔
214
        throw std::invalid_argument("config must match to add stats together");
2✔
215
    for (std::size_t i = 0; i < values.size(); i++)
36✔
216
        values[i] = (*config)[i].combine(values[i], other.values[i]);
33✔
217
    return *this;
3✔
218
}
219

220

221
static std::size_t compute_bucket_count(std::size_t total_max_heaps)
1,075✔
222
{
223
    std::size_t buckets = 4;
1,075✔
224
    while (buckets < total_max_heaps)
1,391✔
225
        buckets *= 2;
316✔
226
    buckets *= 4;    // Make sure the table has a low load factor
1,075✔
227
    return buckets;
1,075✔
228
}
229

230
/* Compute shift such that (x >> shift) < bucket_count for all 64-bit x
231
 * and bucket_count a power of 2.
232
 */
233
static int compute_bucket_shift(std::size_t bucket_count)
1,075✔
234
{
235
    int shift = 64;
1,075✔
236
    while (bucket_count > 1)
5,691✔
237
    {
238
        shift--;
4,616✔
239
        bucket_count >>= 1;
4,616✔
240
    }
241
    return shift;
1,075✔
242
}
243

244
static void packet_memcpy_std(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
21,736✔
245
{
246
    std::memcpy(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
21,736✔
247
}
21,733✔
248

249
static void packet_memcpy_nontemporal(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
1,695✔
250
{
251
    spead2::memcpy_nontemporal(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
1,695✔
252
}
1,695✔
253

254
stream_config::stream_config()
1,093✔
255
    : memcpy(packet_memcpy_std),
1,093✔
256
    allocator(std::make_shared<memory_allocator>()),
1,093✔
257
    stats(default_stats)  // Initially point to shared defaults; make a copy on write
2,186✔
258
{
259
}
1,093✔
260

261
stream_config &stream_config::set_max_heaps(std::size_t max_heaps)
491✔
262
{
263
    if (max_heaps == 0)
491✔
264
        throw std::invalid_argument("max_heaps cannot be 0");
1✔
265
    this->max_heaps = max_heaps;
490✔
266
    return *this;
490✔
267
}
268

269
stream_config &stream_config::set_substreams(std::size_t substreams)
1✔
270
{
271
    if (substreams == 0)
1✔
272
        throw std::invalid_argument("substreams cannot be 0");
×
273
    this->substreams = substreams;
1✔
274
    return *this;
1✔
275
}
276

277
stream_config &stream_config::set_bug_compat(bug_compat_mask bug_compat)
507✔
278
{
279
    if (bug_compat & ~BUG_COMPAT_PYSPEAD_0_5_2)
507✔
280
        throw std::invalid_argument("unknown compatibility bits in bug_compat");
1✔
281
    this->bug_compat = bug_compat;
506✔
282
    return *this;
506✔
283
}
284

285
stream_config &stream_config::set_memory_allocator(std::shared_ptr<memory_allocator> allocator)
122✔
286
{
287
    this->allocator = std::move(allocator);
122✔
288
    return *this;
122✔
289
}
290

291
stream_config &stream_config::set_memcpy(packet_memcpy_function memcpy)
601✔
292
{
293
    this->memcpy = memcpy;
601✔
294
    return *this;
601✔
295
}
296

297
stream_config &stream_config::set_memcpy(memcpy_function memcpy)
×
298
{
299
    return set_memcpy(
×
300
        packet_memcpy_function([memcpy](
×
301
            const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
302
        {
303
            memcpy(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
×
304
        })
×
305
    );
×
306
}
307

308
stream_config &stream_config::set_memcpy(memcpy_function_id id)
494✔
309
{
310
    /* We adapt each case to the packet_memcpy signature rather than using the
311
     * generic wrapping in the memcpy_function overload. This ensures that
312
     * there is only one level of indirect function call instead of two. It
313
     * also makes it possible to reverse the mapping by comparing function
314
     * pointers.
315
     */
316
    switch (id)
494✔
317
    {
318
    case MEMCPY_STD:
475✔
319
        set_memcpy(packet_memcpy_std);
475✔
320
        break;
475✔
321
    case MEMCPY_NONTEMPORAL:
19✔
322
        set_memcpy(packet_memcpy_nontemporal);
19✔
323
        break;
19✔
324
    default:
×
325
        throw std::invalid_argument("Unknown memcpy function");
×
326
    }
327
    return *this;
494✔
328
}
329

330
stream_config &stream_config::set_stop_on_stop_item(bool stop)
5✔
331
{
332
    stop_on_stop_item = stop;
5✔
333
    return *this;
5✔
334
}
335

336
stream_config &stream_config::set_allow_unsized_heaps(bool allow)
110✔
337
{
338
    allow_unsized_heaps = allow;
110✔
339
    return *this;
110✔
340
}
341

342
stream_config &stream_config::set_allow_out_of_order(bool allow)
5✔
343
{
344
    allow_out_of_order = allow;
5✔
345
    return *this;
5✔
346
}
347

348
stream_config &stream_config::set_stream_id(std::uintptr_t id)
3✔
349
{
350
    stream_id = id;
3✔
351
    return *this;
3✔
352
}
353

354
std::size_t stream_config::add_stat(std::string name, stream_stat_config::mode mode)
231✔
355
{
356
    if (spead2::recv::get_stat_index_nothrow(*stats, name) != stats->size())
231✔
357
        throw std::invalid_argument("A statistic called " + name + " already exists");
2✔
358
    // Make a copy so that we don't modify any shared copies
359
    auto new_stats = std::make_shared<std::vector<stream_stat_config>>(*stats);
229✔
360
    std::size_t index = new_stats->size();
229✔
361
    new_stats->emplace_back(std::move(name), mode);
229✔
362
    stats = std::move(new_stats);
229✔
363
    return index;
229✔
364
}
229✔
365

366
std::size_t stream_config::get_stat_index(const std::string &name) const
3✔
367
{
368
    return spead2::recv::get_stat_index(*stats, name);
3✔
369
}
370

371

372
stream_base::stream_base(const stream_config &config)
1,075✔
373
    : queue_storage(new queue_entry[config.get_max_heaps() * config.get_substreams()]),
1,075✔
374
    bucket_count(compute_bucket_count(config.get_max_heaps() * config.get_substreams())),
1,075✔
375
    bucket_shift(compute_bucket_shift(bucket_count)),
1,075✔
376
    buckets(new queue_entry *[bucket_count]),
1,075✔
377
    substreams(new substream[config.get_substreams() + 1]),
1,075✔
378
    substream_div(config.get_substreams()),
1,075✔
379
    config(config),
1,075✔
380
    shared(std::make_shared<shared_state>(this)),
1,075✔
381
    stats(config.get_stats().size()),
1,075✔
382
    batch_stats(config.get_stats().size())
3,225✔
383
{
384
    for (std::size_t i = 0; i < config.get_max_heaps() * config.get_substreams(); i++)
11,799✔
385
        queue_storage[i].next = INVALID_ENTRY;
10,724✔
386
    for (std::size_t i = 0; i < bucket_count; i++)
49,539✔
387
        buckets[i] = NULL;
48,464✔
388
    for (std::size_t i = 0; i <= config.get_substreams(); i++)
3,226✔
389
    {
390
        substreams[i].start = i * config.get_max_heaps();
2,151✔
391
        substreams[i].head = substreams[i].start;
2,151✔
392
    }
393
}
1,075✔
394

395
stream_base::~stream_base()
5,375✔
396
{
397
    for (std::size_t i = 0; i < get_config().get_max_heaps() * get_config().get_substreams(); i++)
11,799✔
398
    {
399
        queue_entry *entry = &queue_storage[i];
10,724✔
400
        if (entry->next != INVALID_ENTRY)
10,724✔
401
        {
402
            unlink_entry(entry);
×
403
            entry->heap.destroy();
×
404
        }
405
    }
406
}
1,075✔
407

408
std::size_t stream_base::get_bucket(item_pointer_t heap_cnt) const
29,665✔
409
{
410
    // Look up Fibonacci hashing for an explanation of the magic number
411
    return (heap_cnt * 11400714819323198485ULL) >> bucket_shift;
29,665✔
412
}
413

414
std::size_t stream_base::get_substream(item_pointer_t heap_cnt) const
5,362✔
415
{
416
    // libdivide doesn't provide operator %
417
    return heap_cnt - (heap_cnt / substream_div * config.get_substreams());
5,362✔
418
}
419

420
void stream_base::unlink_entry(queue_entry *entry)
5,352✔
421
{
422
    assert(entry->next != INVALID_ENTRY);
2✔
423
    std::size_t bucket_id = get_bucket(entry->heap->get_cnt());
5,352✔
424
    queue_entry **prev = &buckets[bucket_id];
5,352✔
425
    while (*prev != entry)
5,352✔
426
    {
427
        assert(*prev != NULL && *prev != INVALID_ENTRY);
×
428
        prev = &(*prev)->next;
×
429
    }
430
    *prev = entry->next;
5,352✔
431
    entry->next = INVALID_ENTRY;
5,352✔
432
}
5,352✔
433

434
stream_base::add_packet_state::add_packet_state(shared_state &owner)
15,689✔
435
    : lock(owner.queue_mutex), owner(owner.self)
15,689✔
436
{
437
    if (this->owner)
15,811✔
438
    {
439
        stopped = this->owner->stopped;
15,798✔
440
        std::fill(this->owner->batch_stats.begin(), this->owner->batch_stats.end(), 0);
15,798✔
441
    }
442
    else
443
    {
444
        stopped = true;
13✔
445
    }
446
}
15,747✔
447

448
stream_base::add_packet_state::~add_packet_state()
31,595✔
449
{
450
    if (owner && stopped)
15,821✔
451
        owner->stop_received();
535✔
452
    if (!owner || (!packets && is_stopped()))
15,822✔
453
        return;   // Stream was stopped before we could do anything - don't count as a batch
65✔
454
    std::lock_guard<std::mutex> stats_lock(owner->stats_mutex);
15,757✔
455
    // The built-in stats are updated directly; batch_stats is not used
456
    owner->stats[stream_stat_indices::packets] += packets;
15,778✔
457
    owner->stats[stream_stat_indices::batches]++;
15,777✔
458
    owner->stats[stream_stat_indices::heaps] += complete_heaps + incomplete_heaps_evicted;
15,771✔
459
    owner->stats[stream_stat_indices::incomplete_heaps_evicted] += incomplete_heaps_evicted;
15,769✔
460
    owner->stats[stream_stat_indices::single_packet_heaps] += single_packet_heaps;
15,769✔
461
    owner->stats[stream_stat_indices::search_dist] += search_dist;
15,766✔
462
    auto &owner_max_batch = owner->stats[stream_stat_indices::max_batch];
15,762✔
463
    owner_max_batch = std::max(owner_max_batch, packets);
15,766✔
464
    // Update custom statistics
465
    const auto &stats_config = owner->get_config().get_stats();
15,766✔
466
    for (std::size_t i = stream_stat_indices::custom; i < stats_config.size(); i++)
22,217✔
467
        owner->stats[i] = stats_config[i].combine(owner->stats[i], owner->batch_stats[i]);
6,454✔
468
}
15,827✔
469

470
bool stream_base::add_packet(add_packet_state &state, const packet_header &packet)
24,326✔
471
{
472
    const stream_config &config = state.owner->get_config();
24,326✔
473
    assert(!stopped);
71✔
474
    state.packets++;
24,325✔
475
    if (packet.heap_length < 0 && !config.get_allow_unsized_heaps())
24,325✔
476
    {
477
        log_info("packet rejected because it has no HEAP_LEN");
1✔
478
        return false;
1✔
479
    }
480

481
    // Look for matching heap.
482
    queue_entry *entry = NULL;
24,324✔
483
    s_item_pointer_t heap_cnt = packet.heap_cnt;
24,324✔
484
    std::size_t bucket_id = get_bucket(heap_cnt);
24,324✔
485
    assert(bucket_id < bucket_count);
71✔
486
    if (packet.heap_length >= 0 && packet.payload_length == packet.heap_length)
24,329✔
487
    {
488
        // Packet is a complete heap, so it shouldn't match any partial heap.
489
        entry = NULL;
5,169✔
490
        state.single_packet_heaps++;
5,169✔
491
    }
492
    else
493
    {
494
        int search_dist = 1;
19,160✔
495
        for (entry = buckets[bucket_id]; entry != NULL; entry = entry->next, search_dist++)
19,160✔
496
        {
497
            assert(entry != INVALID_ENTRY);
69✔
498
            if (entry->heap->get_cnt() == heap_cnt)
18,959✔
499
                break;
18,955✔
500
        }
501
        state.search_dist += search_dist;
19,146✔
502
    }
503

504
    if (!entry)
24,315✔
505
    {
506
        /* Never seen this heap before. Evict the old one in its slot,
507
         * if any. However, if we're in in-order mode, only accept the
508
         * packet if it is supposed to be at the start of the heap.
509
         *
510
         * Note: not safe to dereference h just anywhere here!
511
         */
512
        if (!config.get_allow_out_of_order() && packet.payload_offset != 0)
5,363✔
513
        {
514
            log_debug("packet rejected because there is a gap in the heap and "
1✔
515
                      "allow_out_of_order is false");
516
            return false;
1✔
517
        }
518

519
        std::size_t substream_id = get_substream(heap_cnt);
5,362✔
520
        substream &ss = substreams[substream_id];
5,362✔
521
        if (++ss.head == substreams[substream_id + 1].start)
5,361✔
522
            ss.head = ss.start;
1,215✔
523
        entry = &queue_storage[ss.head];
5,361✔
524
        if (entry->next != INVALID_ENTRY)
5,361✔
525
        {
526
            state.incomplete_heaps_evicted++;
×
527
            unlink_entry(entry);
×
528
            heap_ready(std::move(*entry->heap));
×
529
            entry->heap.destroy();
×
530
        }
531
        entry->next = buckets[bucket_id];
5,361✔
532
        buckets[bucket_id] = entry;
5,361✔
533
        entry->heap.construct(packet, config.get_bug_compat());
5,361✔
534
    }
535

536
    live_heap *h = entry->heap.get();
24,314✔
537
    bool result = false;
24,310✔
538
    bool end_of_stream = false;
24,310✔
539
    if (h->add_packet(packet, config.get_memcpy(), *config.get_memory_allocator(),
24,308✔
540
                      config.get_allow_out_of_order()))
24,310✔
541
    {
542
        result = true;
24,310✔
543
        end_of_stream = config.get_stop_on_stop_item() && h->is_end_of_stream();
24,310✔
544
        if (h->is_complete())
24,307✔
545
        {
546
            unlink_entry(entry);
5,347✔
547
            if (!end_of_stream)
5,347✔
548
            {
549
                state.complete_heaps++;
4,905✔
550
                heap_ready(std::move(*h));
4,905✔
551
            }
552
            entry->heap.destroy();
5,356✔
553
        }
554
    }
555

556
    if (end_of_stream)
24,309✔
557
        state.stop();
442✔
558
    return result;
24,310✔
559
}
560

561
void stream_base::flush_unlocked()
2,275✔
562
{
563
    const std::size_t num_substreams = get_config().get_substreams();
2,275✔
564
    std::size_t n_flushed = 0;
2,275✔
565
    for (std::size_t i = 0; i < num_substreams; i++)
4,551✔
566
    {
567
        substream &ss = substreams[i];
2,276✔
568
        const std::size_t end = substreams[i + 1].start;
2,276✔
569
        for (std::size_t j = ss.start; j < end; j++)
13,718✔
570
        {
571
            if (++ss.head == substreams[i + 1].start)
11,442✔
572
                ss.head = ss.start;
2,276✔
573
            queue_entry *entry = &queue_storage[ss.head];
11,427✔
574
            if (entry->next != INVALID_ENTRY)
11,438✔
575
            {
576
                n_flushed++;
5✔
577
                unlink_entry(entry);
5✔
578
                heap_ready(std::move(*entry->heap));
5✔
579
                entry->heap.destroy();
5✔
580
            }
581
        }
582
    }
583
    std::lock_guard<std::mutex> stats_lock(stats_mutex);
2,275✔
584
    stats[stream_stat_indices::heaps] += n_flushed;
2,275✔
585
    stats[stream_stat_indices::incomplete_heaps_flushed] += n_flushed;
2,275✔
586
}
2,275✔
587

588
void stream_base::flush()
1,200✔
589
{
590
    std::lock_guard<std::mutex> lock(shared->queue_mutex);
1,200✔
591
    flush_unlocked();
1,200✔
592
}
1,200✔
593

594
void stream_base::stop_unlocked()
1,075✔
595
{
596
    if (!stopped)
1,075✔
597
        stop_received();
540✔
598
}
1,075✔
599

600
void stream_base::stop()
1,075✔
601
{
602
    std::lock_guard<std::mutex> lock(shared->queue_mutex);
1,075✔
603
    stop_unlocked();
1,075✔
604
}
1,075✔
605

606
void stream_base::stop_received()
1,075✔
607
{
608
    assert(!stopped);
1✔
609
    stopped = true;
1,075✔
610
    shared->self = nullptr;
1,075✔
611
    flush_unlocked();
1,075✔
612
}
1,075✔
613

614
stream_stats stream_base::get_stats() const
23✔
615
{
616
    std::lock_guard<std::mutex> stats_lock(stats_mutex);
23✔
617
    stream_stats ret(get_config().stats, stats);
46✔
618
    return ret;
46✔
619
}
23✔
620

621

622
reader::reader(stream &owner)
550✔
623
    : io_service(owner.get_io_service()), owner(owner.shared)
550✔
624
{
625
}
550✔
626

627
bool reader::lossy() const
281✔
628
{
629
    return true;
281✔
630
}
631

632

633
stream::stream(io_service_ref io_service, const stream_config &config)
611✔
634
    : stream_base(config),
635
    thread_pool_holder(std::move(io_service).get_shared_thread_pool()),
611✔
636
    io_service(*io_service)
1,222✔
637
{
638
}
611✔
639

640
void stream::stop_received()
611✔
641
{
642
    stream_base::stop_received();
611✔
643
    std::lock_guard<std::mutex> lock(reader_mutex);
611✔
644
    for (const auto &r : readers)
1,157✔
645
        r->stop();
548✔
646
    /* This ensures that once we stop the readers, any future call to
647
     * emplace_reader will silently be ignored. This avoids issues if there
648
     * is a race between the user calling emplace_reader and a stop packet
649
     * in the stream.
650
     */
651
    stop_readers = true;
609✔
652
}
609✔
653

654
void stream::stop_impl()
611✔
655
{
656
    stream_base::stop();
611✔
657
}
611✔
658

659
void stream::stop()
1,409✔
660
{
661
    std::call_once(stop_once, [this] { stop_impl(); });
2,020✔
662
}
1,409✔
663

664
bool stream::is_lossy() const
6✔
665
{
666
    std::lock_guard<std::mutex> lock(reader_mutex);
6✔
667
    return lossy;
6✔
668
}
6✔
669

670
stream::~stream()
611✔
671
{
672
    stop();
611✔
673
}
611✔
674

675

676
const std::uint8_t *mem_to_stream(stream_base::add_packet_state &state, const std::uint8_t *ptr, std::size_t length)
1,254✔
677
{
678
    while (length > 0 && !state.is_stopped())
2,670✔
679
    {
680
        packet_header packet;
681
        std::size_t size = decode_packet(packet, ptr, length);
1,416✔
682
        if (size > 0)
1,416✔
683
        {
684
            state.add_packet(packet);
1,416✔
685
            ptr += size;
1,416✔
686
            length -= size;
1,416✔
687
        }
688
        else
689
            length = 0; // causes loop to exit
×
690
    }
691
    return ptr;
1,254✔
692
}
693

694
const std::uint8_t *mem_to_stream(stream_base &s, const std::uint8_t *ptr, std::size_t length)
1,200✔
695
{
696
    stream_base::add_packet_state state(s);
1,200✔
697
    return mem_to_stream(state, ptr, length);
2,400✔
698
}
1,200✔
699

700
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc