• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6981759313

24 Nov 2023 02:10PM UTC coverage: 69.57% (-0.04%) from 69.614%
6981759313

push

github

bmerry
Add dependabot configuration to update packages

4744 of 6819 relevant lines covered (69.57%)

93656.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.04
/src/send_stream.cpp
1
/* Copyright 2015, 2017, 2019-2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <algorithm>
22
#include <limits>
23
#include <cmath>
24
#include <thread>
25
#include <stdexcept>
26
#include <new>
27
#include <spead2/common_logging.h>
28
#include <spead2/send_stream.h>
29
#include <spead2/send_writer.h>
30

31
namespace spead2::send
32
{
33

34
stream::queue_item_storage &stream::get_queue_storage(std::size_t idx)
56,281✔
35
{
36
    return queue[idx & queue_mask];
56,281✔
37
}
38

39
detail::queue_item *stream::get_queue(std::size_t idx)
55,331✔
40
{
41
    return get_queue_storage(idx).get();
55,331✔
42
}
43

44
static std::size_t compute_queue_mask(std::size_t size)
288✔
45
{
46
    if (size == 0)
288✔
47
        throw std::invalid_argument("max_heaps must be at least 1");
×
48
    if (size > std::numeric_limits<std::size_t>::max() / 2 + 1)
288✔
49
        throw std::invalid_argument("max_heaps is too large");
×
50
    std::size_t p2 = 1;
288✔
51
    while (p2 < size)
855✔
52
        p2 <<= 1;
567✔
53
    return p2 - 1;
288✔
54
}
55

56
stream::unwinder::unwinder(stream &s, std::size_t tail)
112✔
57
    : s(s), orig_tail(tail), tail(tail)
112✔
58
{
59
}
112✔
60

61
void stream::unwinder::set_tail(std::size_t tail)
440✔
62
{
63
    this->tail = tail;
440✔
64
}
440✔
65

66
void stream::unwinder::abort()
113✔
67
{
68
    for (std::size_t i = orig_tail; i != tail; i++)
115✔
69
        s.get_queue_storage(i).destroy();
2✔
70
}
113✔
71

72
void stream::unwinder::commit()
111✔
73
{
74
    orig_tail = tail;
111✔
75
}
111✔
76

77
stream::stream(std::unique_ptr<writer> &&w)
288✔
78
    : queue_size(w->config.get_max_heaps()),
288✔
79
    queue_mask(compute_queue_mask(queue_size)),
288✔
80
    num_substreams(w->get_num_substreams()),
288✔
81
    max_packet_size(w->config.get_max_packet_size()),
288✔
82
    w(std::move(w)),
288✔
83
    queue(new queue_item_storage[queue_mask + 1])
864✔
84
{
85
    this->w->set_owner(this);
288✔
86
    this->w->start();
288✔
87
}
288✔
88

89
stream::~stream()
288✔
90
{
91
    flush();
288✔
92
    /* The writer might still have a pending wakeup to check for new work.
93
     * Before we can safely delete it, we need it to have set need_wakeup.
94
     * A spin loop is not normally great style, but we take a hit on shutdown
95
     * to keep worker::request_wakeup fast when we're not shutting down.
96
     */
97
    std::unique_lock<std::mutex> lock(tail_mutex);
288✔
98
    while (!need_wakeup)
288✔
99
    {
100
        lock.unlock();
×
101
        std::this_thread::yield();
×
102
        lock.lock();
×
103
    }
104
}
288✔
105

106
boost::asio::io_service &stream::get_io_service() const
615✔
107
{
108
    return w->get_io_service();
615✔
109
}
110

111
void stream::set_cnt_sequence(item_pointer_t next, item_pointer_t step)
2✔
112
{
113
    if (step == 0)
2✔
114
        throw std::invalid_argument("step cannot be 0");
×
115
    std::unique_lock<std::mutex> lock(tail_mutex);
2✔
116
    next_cnt = next;
2✔
117
    step_cnt = step;
2✔
118
}
2✔
119

120
bool stream::async_send_heap(const heap &h, completion_handler handler,
511✔
121
                             s_item_pointer_t cnt,
122
                             std::size_t substream_index)
123
{
124
    heap_reference ref(h, cnt, substream_index);
511✔
125
    return async_send_heaps_impl<null_unwinder>(
511✔
126
        &ref, &ref + 1, std::move(handler), group_mode::SERIAL);
1,533✔
127
}
128

129
void stream::flush()
290✔
130
{
131
    std::future<void> future;
290✔
132
    {
133
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
290✔
134
        std::lock_guard<std::mutex> head_lock(head_mutex);
290✔
135
        // These could probably be read with relaxed consistency because the locks
136
        // ensure ordering, but this is not performance-critical.
137
        std::size_t tail = queue_tail.load();
290✔
138
        std::size_t head = queue_head.load();
290✔
139
        if (head == tail)
290✔
140
            return;
288✔
141
        detail::queue_item *item = get_queue(tail - 1);
2✔
142
        item->waiters.emplace_front();
2✔
143
        future = item->waiters.front().get_future();
2✔
144
    }
578✔
145

146
    future.wait();
2✔
147
}
290✔
148

149
std::size_t stream::get_num_substreams() const
×
150
{
151
    return num_substreams;
×
152
}
153

154
// Explicit instantiation
155
template bool stream::async_send_heaps_impl<stream::null_unwinder, heap_reference *>(
156
    heap_reference *first, heap_reference *last,
157
    completion_handler &&handler, group_mode mode);
158

159
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc