• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6311808367

26 Sep 2023 11:09AM UTC coverage: 78.043% (-0.2%) from 78.228%
6311808367

push

github

bmerry
Initial work on stream explicit start

This adds the C++ API and refactors the readers to have a separate
start() function. It does not expose the new API to Python and does not
yet have any unit tests.

44 of 44 new or added lines in 9 files covered. (100.0%)

5303 of 6795 relevant lines covered (78.04%)

56186.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.56
/src/recv_stream.cpp
1
/* Copyright 2015, 2017-2021, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cstddef>
22
#include <utility>
23
#include <algorithm>
24
#include <cassert>
25
#include <atomic>
26
#include <new>
27
#include <spead2/recv_stream.h>
28
#include <spead2/recv_live_heap.h>
29
#include <spead2/common_memcpy.h>
30
#include <spead2/common_thread_pool.h>
31
#include <spead2/common_logging.h>
32

33
#define INVALID_ENTRY ((queue_entry *) -1)
34

35
namespace spead2::recv
36
{
37

38
stream_stat_config::stream_stat_config(std::string name, mode mode_)
298✔
39
    : name(std::move(name)), mode_(mode_)
298✔
40
{
41
}
298✔
42

43
std::uint64_t stream_stat_config::combine(std::uint64_t a, std::uint64_t b) const
5,914✔
44
{
45
    switch (mode_)
5,914✔
46
    {
47
    case mode::COUNTER:
5,908✔
48
        return a + b;
5,908✔
49
    case mode::MAXIMUM:
6✔
50
        return std::max(a, b);
6✔
51
    }
52
    /* Line below should normally be unreachable. Using the same expression as
53
     * for COUNTER lets the compiler generate more efficient code, as it only
54
     * has to consider two cases (looks just as good as using GCC's
55
     * __builtin_unreachable, without depending on compiler specifics).
56
     */
57
    return a + b;   // LCOV_EXCL_LINE
58
}
59

60
bool operator==(const stream_stat_config &a, const stream_stat_config &b)
65✔
61
{
62
    return a.get_name() == b.get_name() && a.get_mode() == b.get_mode();
65✔
63
}
64

65
bool operator!=(const stream_stat_config &a, const stream_stat_config &b)
2✔
66
{
67
    return !(a == b);
2✔
68
}
69

70
/**
71
 * Get the index within @a stats corresponding to @a name. If it is not
72
 * present, returns @c stats.size().
73
 */
74
static std::size_t get_stat_index_nothrow(
351✔
75
    const std::vector<stream_stat_config> &stats,
76
    const std::string &name)
77
{
78
    for (std::size_t i = 0; i < stats.size(); i++)
3,186✔
79
        if (stats[i].get_name() == name)
2,947✔
80
            return i;
112✔
81
    return stats.size();
239✔
82
}
83

84
/**
85
 * Get the index within @a stats corresponding to @a name.
86
 *
87
 * @throw std::out_of_range if it is not present
88
 */
89
static std::size_t get_stat_index(
26✔
90
    const std::vector<stream_stat_config> &stats,
91
    const std::string &name)
92
{
93
    std::size_t ret = get_stat_index_nothrow(stats, name);
26✔
94
    if (ret == stats.size())
26✔
95
        throw std::out_of_range(name + " is not a known statistic name");
5✔
96
    return ret;
21✔
97
}
98

99

100
static std::shared_ptr<const std::vector<stream_stat_config>> make_default_stats()
6✔
101
{
102
    auto stats = std::make_shared<std::vector<stream_stat_config>>();
6✔
103
    // Keep this in sync with the stream_stat_* constexprs in the header
104
    stats->emplace_back("heaps", stream_stat_config::mode::COUNTER);
6✔
105
    stats->emplace_back("incomplete_heaps_evicted", stream_stat_config::mode::COUNTER);
6✔
106
    stats->emplace_back("incomplete_heaps_flushed", stream_stat_config::mode::COUNTER);
6✔
107
    stats->emplace_back("packets", stream_stat_config::mode::COUNTER);
6✔
108
    stats->emplace_back("batches", stream_stat_config::mode::COUNTER);
6✔
109
    stats->emplace_back("max_batch", stream_stat_config::mode::MAXIMUM);
6✔
110
    stats->emplace_back("single_packet_heaps", stream_stat_config::mode::COUNTER);
6✔
111
    stats->emplace_back("search_dist", stream_stat_config::mode::COUNTER);
6✔
112
    // For backwards compatibility, worker_blocked is always stats->emplace_backed, although
113
    // it is not part of the base stream statistics
114
    stats->emplace_back("worker_blocked", stream_stat_config::mode::COUNTER);
6✔
115
    assert(stats->size() == stream_stat_indices::custom);
6✔
116
    return stats;
12✔
117
}
6✔
118

119
/* This is used for stream_stats objects that do not have any custom statistics.
120
 * Sharing this means the compatibility check for operator+ requires only a
121
 * pointer comparison rather than comparing arrays.
122
 */
123
static std::shared_ptr<const std::vector<stream_stat_config>> default_stats = make_default_stats();
124

125
stream_stats::stream_stats()
15✔
126
    : stream_stats(default_stats)
15✔
127
{
128
}
15✔
129

130
stream_stats::stream_stats(std::shared_ptr<const std::vector<stream_stat_config>> config)
15✔
131
    : stream_stats(config, std::vector<std::uint64_t>(config->size()))
15✔
132
{
133
    // Note: annoyingly, can't use std::move(config) above, because we access
134
    // config to get the size to use for the vector.
135
}
15✔
136

137
stream_stats::stream_stats(std::shared_ptr<const std::vector<stream_stat_config>> config,
42✔
138
                           std::vector<std::uint64_t> values)
42✔
139
    : config(std::move(config)),
42✔
140
    values(std::move(values)),
42✔
141
    heaps(this->values[stream_stat_indices::heaps]),
42✔
142
    incomplete_heaps_evicted(this->values[stream_stat_indices::incomplete_heaps_evicted]),
42✔
143
    incomplete_heaps_flushed(this->values[stream_stat_indices::incomplete_heaps_flushed]),
42✔
144
    packets(this->values[stream_stat_indices::packets]),
42✔
145
    batches(this->values[stream_stat_indices::batches]),
42✔
146
    worker_blocked(this->values[stream_stat_indices::worker_blocked]),
42✔
147
    max_batch(this->values[stream_stat_indices::max_batch]),
42✔
148
    single_packet_heaps(this->values[stream_stat_indices::single_packet_heaps]),
42✔
149
    search_dist(this->values[stream_stat_indices::search_dist])
42✔
150
{
151
    assert(this->config->size() >= stream_stat_indices::custom);
42✔
152
    assert(this->config->size() == this->values.size());
42✔
153
}
42✔
154

155
stream_stats::stream_stats(const stream_stats &other)
4✔
156
    : stream_stats(other.config, other.values)
4✔
157
{
158
}
4✔
159

160
stream_stats &stream_stats::operator=(const stream_stats &other)
2✔
161
{
162
    if (config != other.config && *config != *other.config)
2✔
163
        throw std::invalid_argument("config must match to assign stats");
×
164
    for (std::size_t i = 0; i < values.size(); i++)
20✔
165
        values[i] = other.values[i];
18✔
166
    return *this;
2✔
167
}
168

169
std::uint64_t &stream_stats::operator[](const std::string &name)
16✔
170
{
171
    return at(name);
16✔
172
}
173

174
const std::uint64_t &stream_stats::operator[](const std::string &name) const
2✔
175
{
176
    return at(name);
2✔
177
}
178

179
std::uint64_t &stream_stats::at(const std::string &name)
19✔
180
{
181
    return values[get_stat_index(*config, name)];
19✔
182
}
183

184
const std::uint64_t &stream_stats::at(const std::string &name) const
4✔
185
{
186
    return values[get_stat_index(*config, name)];
4✔
187
}
188

189
stream_stats::iterator stream_stats::find(const std::string &name)
30✔
190
{
191
    return iterator(*this, get_stat_index_nothrow(*config, name));
30✔
192
}
193

194
stream_stats::const_iterator stream_stats::find(const std::string &name) const
62✔
195
{
196
    return const_iterator(*this, get_stat_index_nothrow(*config, name));
62✔
197
}
198

199
std::size_t stream_stats::count(const std::string &name) const
2✔
200
{
201
    return get_stat_index_nothrow(*config, name) != values.size() ? 1 : 0;
2✔
202
}
203

204
stream_stats stream_stats::operator+(const stream_stats &other) const
3✔
205
{
206
    stream_stats out = *this;
3✔
207
    out += other;
3✔
208
    return out;
2✔
209
}
1✔
210

211
stream_stats &stream_stats::operator+=(const stream_stats &other)
5✔
212
{
213
    if (config != other.config && *config != *other.config)
5✔
214
        throw std::invalid_argument("config must match to add stats together");
2✔
215
    for (std::size_t i = 0; i < values.size(); i++)
36✔
216
        values[i] = (*config)[i].combine(values[i], other.values[i]);
33✔
217
    return *this;
3✔
218
}
219

220

221
static std::size_t compute_bucket_count(std::size_t total_max_heaps)
1,075✔
222
{
223
    std::size_t buckets = 4;
1,075✔
224
    while (buckets < total_max_heaps)
1,391✔
225
        buckets *= 2;
316✔
226
    buckets *= 4;    // Make sure the table has a low load factor
1,075✔
227
    return buckets;
1,075✔
228
}
229

230
/* Compute shift such that (x >> shift) < bucket_count for all 64-bit x
231
 * and bucket_count a power of 2.
232
 */
233
static int compute_bucket_shift(std::size_t bucket_count)
1,075✔
234
{
235
    int shift = 64;
1,075✔
236
    while (bucket_count > 1)
5,691✔
237
    {
238
        shift--;
4,616✔
239
        bucket_count >>= 1;
4,616✔
240
    }
241
    return shift;
1,075✔
242
}
243

244
static void packet_memcpy_std(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
21,817✔
245
{
246
    std::memcpy(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
21,817✔
247
}
21,817✔
248

249
static void packet_memcpy_nontemporal(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
1,695✔
250
{
251
    spead2::memcpy_nontemporal(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
1,695✔
252
}
1,695✔
253

254
stream_config::stream_config()
1,093✔
255
    : memcpy(packet_memcpy_std),
1,093✔
256
    allocator(std::make_shared<memory_allocator>()),
1,093✔
257
    stats(default_stats)  // Initially point to shared defaults; make a copy on write
2,186✔
258
{
259
}
1,093✔
260

261
stream_config &stream_config::set_max_heaps(std::size_t max_heaps)
491✔
262
{
263
    if (max_heaps == 0)
491✔
264
        throw std::invalid_argument("max_heaps cannot be 0");
1✔
265
    this->max_heaps = max_heaps;
490✔
266
    return *this;
490✔
267
}
268

269
stream_config &stream_config::set_substreams(std::size_t substreams)
1✔
270
{
271
    if (substreams == 0)
1✔
272
        throw std::invalid_argument("substreams cannot be 0");
×
273
    this->substreams = substreams;
1✔
274
    return *this;
1✔
275
}
276

277
stream_config &stream_config::set_bug_compat(bug_compat_mask bug_compat)
507✔
278
{
279
    if (bug_compat & ~BUG_COMPAT_PYSPEAD_0_5_2)
507✔
280
        throw std::invalid_argument("unknown compatibility bits in bug_compat");
1✔
281
    this->bug_compat = bug_compat;
506✔
282
    return *this;
506✔
283
}
284

285
stream_config &stream_config::set_memory_allocator(std::shared_ptr<memory_allocator> allocator)
122✔
286
{
287
    this->allocator = std::move(allocator);
122✔
288
    return *this;
122✔
289
}
290

291
stream_config &stream_config::set_memcpy(packet_memcpy_function memcpy)
601✔
292
{
293
    this->memcpy = memcpy;
601✔
294
    return *this;
601✔
295
}
296

297
stream_config &stream_config::set_memcpy(memcpy_function memcpy)
×
298
{
299
    return set_memcpy(
×
300
        packet_memcpy_function([memcpy](
×
301
            const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
302
        {
303
            memcpy(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
×
304
        })
×
305
    );
×
306
}
307

308
stream_config &stream_config::set_memcpy(memcpy_function_id id)
494✔
309
{
310
    /* We adapt each case to the packet_memcpy signature rather than using the
311
     * generic wrapping in the memcpy_function overload. This ensures that
312
     * there is only one level of indirect function call instead of two. It
313
     * also makes it possible to reverse the mapping by comparing function
314
     * pointers.
315
     */
316
    switch (id)
494✔
317
    {
318
    case MEMCPY_STD:
475✔
319
        set_memcpy(packet_memcpy_std);
475✔
320
        break;
475✔
321
    case MEMCPY_NONTEMPORAL:
19✔
322
        set_memcpy(packet_memcpy_nontemporal);
19✔
323
        break;
19✔
324
    default:
×
325
        throw std::invalid_argument("Unknown memcpy function");
×
326
    }
327
    return *this;
494✔
328
}
329

330
stream_config &stream_config::set_stop_on_stop_item(bool stop)
5✔
331
{
332
    stop_on_stop_item = stop;
5✔
333
    return *this;
5✔
334
}
335

336
stream_config &stream_config::set_allow_unsized_heaps(bool allow)
110✔
337
{
338
    allow_unsized_heaps = allow;
110✔
339
    return *this;
110✔
340
}
341

342
stream_config &stream_config::set_allow_out_of_order(bool allow)
5✔
343
{
344
    allow_out_of_order = allow;
5✔
345
    return *this;
5✔
346
}
347

348
stream_config &stream_config::set_stream_id(std::uintptr_t id)
3✔
349
{
350
    stream_id = id;
3✔
351
    return *this;
3✔
352
}
353

354
stream_config &stream_config::set_explicit_start(bool explicit_start)
×
355
{
356
    this->explicit_start = explicit_start;
×
357
    return *this;
×
358
}
359

360
std::size_t stream_config::add_stat(std::string name, stream_stat_config::mode mode)
231✔
361
{
362
    if (spead2::recv::get_stat_index_nothrow(*stats, name) != stats->size())
231✔
363
        throw std::invalid_argument("A statistic called " + name + " already exists");
2✔
364
    // Make a copy so that we don't modify any shared copies
365
    auto new_stats = std::make_shared<std::vector<stream_stat_config>>(*stats);
229✔
366
    std::size_t index = new_stats->size();
229✔
367
    new_stats->emplace_back(std::move(name), mode);
229✔
368
    stats = std::move(new_stats);
229✔
369
    return index;
229✔
370
}
229✔
371

372
std::size_t stream_config::get_stat_index(const std::string &name) const
3✔
373
{
374
    return spead2::recv::get_stat_index(*stats, name);
3✔
375
}
376

377

378
stream_base::stream_base(const stream_config &config)
1,075✔
379
    : queue_storage(new queue_entry[config.get_max_heaps() * config.get_substreams()]),
1,075✔
380
    bucket_count(compute_bucket_count(config.get_max_heaps() * config.get_substreams())),
1,075✔
381
    bucket_shift(compute_bucket_shift(bucket_count)),
1,075✔
382
    buckets(new queue_entry *[bucket_count]),
1,075✔
383
    substreams(new substream[config.get_substreams() + 1]),
1,075✔
384
    substream_div(config.get_substreams()),
1,075✔
385
    config(config),
1,075✔
386
    shared(std::make_shared<shared_state>(this)),
1,075✔
387
    stats(config.get_stats().size()),
1,075✔
388
    batch_stats(config.get_stats().size())
3,225✔
389
{
390
    for (std::size_t i = 0; i < config.get_max_heaps() * config.get_substreams(); i++)
11,799✔
391
        queue_storage[i].next = INVALID_ENTRY;
10,724✔
392
    for (std::size_t i = 0; i < bucket_count; i++)
49,539✔
393
        buckets[i] = NULL;
48,464✔
394
    for (std::size_t i = 0; i <= config.get_substreams(); i++)
3,226✔
395
    {
396
        substreams[i].start = i * config.get_max_heaps();
2,151✔
397
        substreams[i].head = substreams[i].start;
2,151✔
398
    }
399
}
1,075✔
400

401
stream_base::~stream_base()
5,375✔
402
{
403
    for (std::size_t i = 0; i < get_config().get_max_heaps() * get_config().get_substreams(); i++)
11,799✔
404
    {
405
        queue_entry *entry = &queue_storage[i];
10,724✔
406
        if (entry->next != INVALID_ENTRY)
10,724✔
407
        {
408
            unlink_entry(entry);
×
409
            entry->heap.destroy();
×
410
        }
411
    }
412
}
1,075✔
413

414
std::size_t stream_base::get_bucket(item_pointer_t heap_cnt) const
29,155✔
415
{
416
    // Look up Fibonacci hashing for an explanation of the magic number
417
    return (heap_cnt * 11400714819323198485ULL) >> bucket_shift;
29,155✔
418
}
419

420
std::size_t stream_base::get_substream(item_pointer_t heap_cnt) const
5,068✔
421
{
422
    // libdivide doesn't provide operator %
423
    return heap_cnt - (heap_cnt / substream_div * config.get_substreams());
5,068✔
424
}
425

426
void stream_base::unlink_entry(queue_entry *entry)
5,068✔
427
{
428
    assert(entry->next != INVALID_ENTRY);
5,068✔
429
    std::size_t bucket_id = get_bucket(entry->heap->get_cnt());
5,068✔
430
    queue_entry **prev = &buckets[bucket_id];
5,068✔
431
    while (*prev != entry)
5,068✔
432
    {
433
        assert(*prev != NULL && *prev != INVALID_ENTRY);
×
434
        prev = &(*prev)->next;
×
435
    }
436
    *prev = entry->next;
5,068✔
437
    entry->next = INVALID_ENTRY;
5,068✔
438
}
5,068✔
439

440
stream_base::add_packet_state::add_packet_state(shared_state &owner)
14,999✔
441
    : lock(owner.queue_mutex), owner(owner.self)
14,999✔
442
{
443
    if (this->owner)
14,999✔
444
    {
445
        stopped = this->owner->stopped;
14,986✔
446
        std::fill(this->owner->batch_stats.begin(), this->owner->batch_stats.end(), 0);
14,986✔
447
    }
448
    else
449
    {
450
        stopped = true;
13✔
451
    }
452
}
14,999✔
453

454
stream_base::add_packet_state::~add_packet_state()
29,933✔
455
{
456
    if (owner && stopped)
14,999✔
457
        owner->stop_received();
535✔
458
    if (!owner || (!packets && is_stopped()))
14,999✔
459
        return;   // Stream was stopped before we could do anything - don't count as a batch
65✔
460
    std::lock_guard<std::mutex> stats_lock(owner->stats_mutex);
14,934✔
461
    // The built-in stats are updated directly; batch_stats is not used
462
    owner->stats[stream_stat_indices::packets] += packets;
14,934✔
463
    owner->stats[stream_stat_indices::batches]++;
14,934✔
464
    owner->stats[stream_stat_indices::heaps] += complete_heaps + incomplete_heaps_evicted;
14,934✔
465
    owner->stats[stream_stat_indices::incomplete_heaps_evicted] += incomplete_heaps_evicted;
14,934✔
466
    owner->stats[stream_stat_indices::single_packet_heaps] += single_packet_heaps;
14,934✔
467
    owner->stats[stream_stat_indices::search_dist] += search_dist;
14,934✔
468
    auto &owner_max_batch = owner->stats[stream_stat_indices::max_batch];
14,934✔
469
    owner_max_batch = std::max(owner_max_batch, packets);
14,934✔
470
    // Update custom statistics
471
    const auto &stats_config = owner->get_config().get_stats();
14,934✔
472
    for (std::size_t i = stream_stat_indices::custom; i < stats_config.size(); i++)
20,815✔
473
        owner->stats[i] = stats_config[i].combine(owner->stats[i], owner->batch_stats[i]);
5,881✔
474
}
14,999✔
475

476
bool stream_base::add_packet(add_packet_state &state, const packet_header &packet)
24,088✔
477
{
478
    const stream_config &config = state.owner->get_config();
24,088✔
479
    assert(!stopped);
24,088✔
480
    state.packets++;
24,088✔
481
    if (packet.heap_length < 0 && !config.get_allow_unsized_heaps())
24,088✔
482
    {
483
        log_info("packet rejected because it has no HEAP_LEN");
1✔
484
        return false;
1✔
485
    }
486

487
    // Look for matching heap.
488
    queue_entry *entry = NULL;
24,087✔
489
    s_item_pointer_t heap_cnt = packet.heap_cnt;
24,087✔
490
    std::size_t bucket_id = get_bucket(heap_cnt);
24,087✔
491
    assert(bucket_id < bucket_count);
24,087✔
492
    if (packet.heap_length >= 0 && packet.payload_length == packet.heap_length)
24,087✔
493
    {
494
        // Packet is a complete heap, so it shouldn't match any partial heap.
495
        entry = NULL;
4,875✔
496
        state.single_packet_heaps++;
4,875✔
497
    }
498
    else
499
    {
500
        int search_dist = 1;
19,212✔
501
        for (entry = buckets[bucket_id]; entry != NULL; entry = entry->next, search_dist++)
19,212✔
502
        {
503
            assert(entry != INVALID_ENTRY);
19,018✔
504
            if (entry->heap->get_cnt() == heap_cnt)
19,018✔
505
                break;
19,018✔
506
        }
507
        state.search_dist += search_dist;
19,212✔
508
    }
509

510
    if (!entry)
24,087✔
511
    {
512
        /* Never seen this heap before. Evict the old one in its slot,
513
         * if any. However, if we're in in-order mode, only accept the
514
         * packet if it is supposed to be at the start of the heap.
515
         *
516
         * Note: not safe to dereference h just anywhere here!
517
         */
518
        if (!config.get_allow_out_of_order() && packet.payload_offset != 0)
5,069✔
519
        {
520
            log_debug("packet rejected because there is a gap in the heap and "
1✔
521
                      "allow_out_of_order is false");
522
            return false;
1✔
523
        }
524

525
        std::size_t substream_id = get_substream(heap_cnt);
5,068✔
526
        substream &ss = substreams[substream_id];
5,068✔
527
        if (++ss.head == substreams[substream_id + 1].start)
5,068✔
528
            ss.head = ss.start;
1,212✔
529
        entry = &queue_storage[ss.head];
5,068✔
530
        if (entry->next != INVALID_ENTRY)
5,068✔
531
        {
532
            state.incomplete_heaps_evicted++;
×
533
            unlink_entry(entry);
×
534
            heap_ready(std::move(*entry->heap));
×
535
            entry->heap.destroy();
×
536
        }
537
        entry->next = buckets[bucket_id];
5,068✔
538
        buckets[bucket_id] = entry;
5,068✔
539
        entry->heap.construct(packet, config.get_bug_compat());
5,068✔
540
    }
541

542
    live_heap *h = entry->heap.get();
24,086✔
543
    bool result = false;
24,086✔
544
    bool end_of_stream = false;
24,086✔
545
    if (h->add_packet(packet, config.get_memcpy(), *config.get_memory_allocator(),
24,086✔
546
                      config.get_allow_out_of_order()))
24,086✔
547
    {
548
        result = true;
24,086✔
549
        end_of_stream = config.get_stop_on_stop_item() && h->is_end_of_stream();
24,086✔
550
        if (h->is_complete())
24,086✔
551
        {
552
            unlink_entry(entry);
5,063✔
553
            if (!end_of_stream)
5,063✔
554
            {
555
                state.complete_heaps++;
4,621✔
556
                heap_ready(std::move(*h));
4,621✔
557
            }
558
            entry->heap.destroy();
5,063✔
559
        }
560
    }
561

562
    if (end_of_stream)
24,086✔
563
        state.stop();
442✔
564
    return result;
24,086✔
565
}
566

567
void stream_base::flush_unlocked()
2,275✔
568
{
569
    const std::size_t num_substreams = get_config().get_substreams();
2,275✔
570
    std::size_t n_flushed = 0;
2,275✔
571
    for (std::size_t i = 0; i < num_substreams; i++)
4,551✔
572
    {
573
        substream &ss = substreams[i];
2,276✔
574
        const std::size_t end = substreams[i + 1].start;
2,276✔
575
        for (std::size_t j = ss.start; j < end; j++)
14,200✔
576
        {
577
            if (++ss.head == substreams[i + 1].start)
11,924✔
578
                ss.head = ss.start;
2,276✔
579
            queue_entry *entry = &queue_storage[ss.head];
11,924✔
580
            if (entry->next != INVALID_ENTRY)
11,924✔
581
            {
582
                n_flushed++;
5✔
583
                unlink_entry(entry);
5✔
584
                heap_ready(std::move(*entry->heap));
5✔
585
                entry->heap.destroy();
5✔
586
            }
587
        }
588
    }
589
    std::lock_guard<std::mutex> stats_lock(stats_mutex);
2,275✔
590
    stats[stream_stat_indices::heaps] += n_flushed;
2,275✔
591
    stats[stream_stat_indices::incomplete_heaps_flushed] += n_flushed;
2,275✔
592
}
2,275✔
593

594
void stream_base::flush()
1,200✔
595
{
596
    std::lock_guard<std::mutex> lock(shared->queue_mutex);
1,200✔
597
    flush_unlocked();
1,200✔
598
}
1,200✔
599

600
void stream_base::stop_unlocked()
1,075✔
601
{
602
    if (!stopped)
1,075✔
603
        stop_received();
540✔
604
}
1,075✔
605

606
void stream_base::stop()
1,075✔
607
{
608
    std::lock_guard<std::mutex> lock(shared->queue_mutex);
1,075✔
609
    stop_unlocked();
1,075✔
610
}
1,075✔
611

612
void stream_base::stop_received()
1,075✔
613
{
614
    assert(!stopped);
1,075✔
615
    stopped = true;
1,075✔
616
    shared->self = nullptr;
1,075✔
617
    flush_unlocked();
1,075✔
618
}
1,075✔
619

620
stream_stats stream_base::get_stats() const
23✔
621
{
622
    std::lock_guard<std::mutex> stats_lock(stats_mutex);
23✔
623
    stream_stats ret(get_config().stats, stats);
46✔
624
    return ret;
46✔
625
}
23✔
626

627

628
reader::reader(stream &owner)
550✔
629
    : io_service(owner.get_io_service()), owner(owner.shared)
550✔
630
{
631
}
550✔
632

633
bool reader::lossy() const
281✔
634
{
635
    return true;
281✔
636
}
637

638

639
stream::stream(io_service_ref io_service, const stream_config &config)
611✔
640
    : stream_base(config),
641
    thread_pool_holder(std::move(io_service).get_shared_thread_pool()),
611✔
642
    io_service(*io_service),
611✔
643
    readers_started(!config.get_explicit_start())
1,222✔
644
{
645
}
611✔
646

647
void stream::start()
×
648
{
649
    std::lock_guard<std::mutex> lock(reader_mutex);
×
650
    if (!readers_started)
×
651
    {
652
        for (const auto &r : readers)
×
653
            r->start();
×
654
        readers_started = true;
×
655
    }
656
}
×
657

658
void stream::stop_received()
611✔
659
{
660
    stream_base::stop_received();
611✔
661
    std::lock_guard<std::mutex> lock(reader_mutex);
611✔
662
    for (const auto &r : readers)
1,159✔
663
        r->stop();
548✔
664
    /* This ensures that once we stop the readers, any future call to
665
     * emplace_reader will silently be ignored. This avoids issues if there
666
     * is a race between the user calling emplace_reader and a stop packet
667
     * in the stream.
668
     */
669
    stop_readers = true;
611✔
670
}
611✔
671

672
void stream::stop_impl()
611✔
673
{
674
    stream_base::stop();
611✔
675
}
611✔
676

677
void stream::stop()
1,404✔
678
{
679
    std::call_once(stop_once, [this] { stop_impl(); });
2,015✔
680
}
1,404✔
681

682
bool stream::is_lossy() const
4✔
683
{
684
    std::lock_guard<std::mutex> lock(reader_mutex);
4✔
685
    return lossy;
4✔
686
}
4✔
687

688
stream::~stream()
611✔
689
{
690
    stop();
611✔
691
}
611✔
692

693

694
const std::uint8_t *mem_to_stream(stream_base::add_packet_state &state, const std::uint8_t *ptr, std::size_t length)
1,254✔
695
{
696
    while (length > 0 && !state.is_stopped())
2,670✔
697
    {
698
        packet_header packet;
699
        std::size_t size = decode_packet(packet, ptr, length);
1,416✔
700
        if (size > 0)
1,416✔
701
        {
702
            state.add_packet(packet);
1,416✔
703
            ptr += size;
1,416✔
704
            length -= size;
1,416✔
705
        }
706
        else
707
            length = 0; // causes loop to exit
×
708
    }
709
    return ptr;
1,254✔
710
}
711

712
const std::uint8_t *mem_to_stream(stream_base &s, const std::uint8_t *ptr, std::size_t length)
1,200✔
713
{
714
    stream_base::add_packet_state state(s);
1,200✔
715
    return mem_to_stream(state, ptr, length);
2,400✔
716
}
1,200✔
717

718
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc