• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5736039794

pending completion
5736039794

push

github

bmerry
Run isort over the code

Also add it to pre-commit config.

5404 of 7212 relevant lines covered (74.93%)

52841.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.73
/src/send_stream.cpp
1
/* Copyright 2015, 2017, 2019-2020 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <algorithm>
22
#include <limits>
23
#include <cmath>
24
#include <thread>
25
#include <stdexcept>
26
#include <spead2/common_logging.h>
27
#include <spead2/send_stream.h>
28
#include <spead2/send_writer.h>
29

30
namespace spead2
31
{
32
namespace send
33
{
34

35
detail::queue_item *stream::get_queue(std::size_t idx)
79,271✔
36
{
37
    return reinterpret_cast<detail::queue_item *>(queue.get() + (idx & queue_mask));
79,271✔
38
}
39

40
static std::size_t compute_queue_mask(std::size_t size)
298✔
41
{
42
    if (size == 0)
298✔
43
        throw std::invalid_argument("max_heaps must be at least 1");
×
44
    if (size > std::numeric_limits<std::size_t>::max() / 2 + 1)
298✔
45
        throw std::invalid_argument("max_heaps is too large");
×
46
    std::size_t p2 = 1;
298✔
47
    while (p2 < size)
885✔
48
        p2 <<= 1;
587✔
49
    return p2 - 1;
298✔
50
}
51

52
stream::unwinder::unwinder(stream &s, std::size_t tail)
112✔
53
    : s(s), orig_tail(tail), tail(tail)
112✔
54
{
55
}
112✔
56

57
void stream::unwinder::set_tail(std::size_t tail)
440✔
58
{
59
    this->tail = tail;
440✔
60
}
440✔
61

62
void stream::unwinder::abort()
113✔
63
{
64
    for (std::size_t i = orig_tail; i != tail; i++)
115✔
65
        s.get_queue(i)->~queue_item();
2✔
66
}
113✔
67

68
void stream::unwinder::commit()
111✔
69
{
70
    orig_tail = tail;
111✔
71
}
111✔
72

73
stream::stream(std::unique_ptr<writer> &&w)
298✔
74
    : queue_size(w->config.get_max_heaps()),
298✔
75
    queue_mask(compute_queue_mask(queue_size)),
298✔
76
    num_substreams(w->get_num_substreams()),
298✔
77
    max_packet_size(w->config.get_max_packet_size()),
298✔
78
    w(std::move(w)),
298✔
79
    queue(new queue_item_storage[queue_mask + 1])
894✔
80
{
81
    this->w->set_owner(this);
298✔
82
    this->w->start();
298✔
83
}
298✔
84

85
stream::~stream()
298✔
86
{
87
    flush();
298✔
88
    /* The writer might still have a pending wakeup to check for new work.
89
     * Before we can safely delete it, we need it to have set need_wakeup.
90
     * A spin loop is not normally great style, but we take a hit on shutdown
91
     * to keep worker::request_wakeup fast when we're not shutting down.
92
     */
93
    std::unique_lock<std::mutex> lock(tail_mutex);
298✔
94
    while (!need_wakeup)
298✔
95
    {
96
        lock.unlock();
×
97
        std::this_thread::yield();
×
98
        lock.lock();
×
99
    }
100
}
298✔
101

102
boost::asio::io_service &stream::get_io_service() const
5,194✔
103
{
104
    return w->get_io_service();
5,194✔
105
}
106

107
void stream::set_cnt_sequence(item_pointer_t next, item_pointer_t step)
2✔
108
{
109
    if (step == 0)
2✔
110
        throw std::invalid_argument("step cannot be 0");
×
111
    std::unique_lock<std::mutex> lock(tail_mutex);
2✔
112
    next_cnt = next;
2✔
113
    step_cnt = step;
2✔
114
}
2✔
115

116
bool stream::async_send_heap(const heap &h, completion_handler handler,
5,109✔
117
                             s_item_pointer_t cnt,
118
                             std::size_t substream_index)
119
{
120
    heap_reference ref(h, cnt, substream_index);
5,109✔
121
    return async_send_heaps_impl<null_unwinder>(
5,109✔
122
        &ref, &ref + 1, std::move(handler), group_mode::SERIAL);
15,327✔
123
}
124

125
void stream::flush()
300✔
126
{
127
    std::future<void> future;
300✔
128
    {
129
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
300✔
130
        std::lock_guard<std::mutex> head_lock(head_mutex);
300✔
131
        // These could probably be read with relaxed consistency because the locks
132
        // ensure ordering, but this is not performance-critical.
133
        std::size_t tail = queue_tail.load();
300✔
134
        std::size_t head = queue_head.load();
300✔
135
        if (head == tail)
300✔
136
            return;
298✔
137
        detail::queue_item *item = get_queue(tail - 1);
2✔
138
        item->waiters.emplace_front();
2✔
139
        future = item->waiters.front().get_future();
2✔
140
    }
598✔
141

142
    future.wait();
2✔
143
}
300✔
144

145
std::size_t stream::get_num_substreams() const
×
146
{
147
    return num_substreams;
×
148
}
149

150
// Explicit instantiation
151
template bool stream::async_send_heaps_impl<stream::null_unwinder, heap_reference *>(
152
    heap_reference *first, heap_reference *last,
153
    completion_handler &&handler, group_mode mode);
154

155
} // namespace send
156
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc