• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6022999521

30 Aug 2023 09:19AM UTC coverage: 78.38% (-0.04%) from 78.424%
6022999521

push

github

bmerry
MacOS wheels: eliminate before_build_macos.sh again

Just handle it in before_all, since we only support one ARCH.

5311 of 6776 relevant lines covered (78.38%)

56369.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.04
/src/send_stream.cpp
1
/* Copyright 2015, 2017, 2019-2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <algorithm>
22
#include <limits>
23
#include <cmath>
24
#include <thread>
25
#include <stdexcept>
26
#include <new>
27
#include <spead2/common_logging.h>
28
#include <spead2/send_stream.h>
29
#include <spead2/send_writer.h>
30

31
namespace spead2::send
32
{
33

34
stream::queue_item_storage &stream::get_queue_storage(std::size_t idx)
79,271✔
35
{
36
    return queue[idx & queue_mask];
79,271✔
37
}
38

39
detail::queue_item *stream::get_queue(std::size_t idx)
73,723✔
40
{
41
    return get_queue_storage(idx).get();
73,723✔
42
}
43

44
static std::size_t compute_queue_mask(std::size_t size)
297✔
45
{
46
    if (size == 0)
297✔
47
        throw std::invalid_argument("max_heaps must be at least 1");
×
48
    if (size > std::numeric_limits<std::size_t>::max() / 2 + 1)
297✔
49
        throw std::invalid_argument("max_heaps is too large");
×
50
    std::size_t p2 = 1;
297✔
51
    while (p2 < size)
882✔
52
        p2 <<= 1;
585✔
53
    return p2 - 1;
297✔
54
}
55

56
stream::unwinder::unwinder(stream &s, std::size_t tail)
112✔
57
    : s(s), orig_tail(tail), tail(tail)
112✔
58
{
59
}
112✔
60

61
void stream::unwinder::set_tail(std::size_t tail)
440✔
62
{
63
    this->tail = tail;
440✔
64
}
440✔
65

66
void stream::unwinder::abort()
113✔
67
{
68
    for (std::size_t i = orig_tail; i != tail; i++)
115✔
69
        s.get_queue_storage(i).destroy();
2✔
70
}
113✔
71

72
void stream::unwinder::commit()
111✔
73
{
74
    orig_tail = tail;
111✔
75
}
111✔
76

77
stream::stream(std::unique_ptr<writer> &&w)
297✔
78
    : queue_size(w->config.get_max_heaps()),
297✔
79
    queue_mask(compute_queue_mask(queue_size)),
297✔
80
    num_substreams(w->get_num_substreams()),
297✔
81
    max_packet_size(w->config.get_max_packet_size()),
297✔
82
    w(std::move(w)),
297✔
83
    queue(new queue_item_storage[queue_mask + 1])
891✔
84
{
85
    this->w->set_owner(this);
297✔
86
    this->w->start();
297✔
87
}
297✔
88

89
stream::~stream()
297✔
90
{
91
    flush();
297✔
92
    /* The writer might still have a pending wakeup to check for new work.
93
     * Before we can safely delete it, we need it to have set need_wakeup.
94
     * A spin loop is not normally great style, but we take a hit on shutdown
95
     * to keep worker::request_wakeup fast when we're not shutting down.
96
     */
97
    std::unique_lock<std::mutex> lock(tail_mutex);
297✔
98
    while (!need_wakeup)
297✔
99
    {
100
        lock.unlock();
×
101
        std::this_thread::yield();
×
102
        lock.lock();
×
103
    }
104
}
297✔
105

106
boost::asio::io_service &stream::get_io_service() const
5,212✔
107
{
108
    return w->get_io_service();
5,212✔
109
}
110

111
void stream::set_cnt_sequence(item_pointer_t next, item_pointer_t step)
2✔
112
{
113
    if (step == 0)
2✔
114
        throw std::invalid_argument("step cannot be 0");
×
115
    std::unique_lock<std::mutex> lock(tail_mutex);
2✔
116
    next_cnt = next;
2✔
117
    step_cnt = step;
2✔
118
}
2✔
119

120
bool stream::async_send_heap(const heap &h, completion_handler handler,
5,109✔
121
                             s_item_pointer_t cnt,
122
                             std::size_t substream_index)
123
{
124
    heap_reference ref(h, cnt, substream_index);
5,109✔
125
    return async_send_heaps_impl<null_unwinder>(
5,109✔
126
        &ref, &ref + 1, std::move(handler), group_mode::SERIAL);
15,327✔
127
}
128

129
void stream::flush()
299✔
130
{
131
    std::future<void> future;
299✔
132
    {
133
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
299✔
134
        std::lock_guard<std::mutex> head_lock(head_mutex);
299✔
135
        // These could probably be read with relaxed consistency because the locks
136
        // ensure ordering, but this is not performance-critical.
137
        std::size_t tail = queue_tail.load();
299✔
138
        std::size_t head = queue_head.load();
299✔
139
        if (head == tail)
299✔
140
            return;
297✔
141
        detail::queue_item *item = get_queue(tail - 1);
2✔
142
        item->waiters.emplace_front();
2✔
143
        future = item->waiters.front().get_future();
2✔
144
    }
596✔
145

146
    future.wait();
2✔
147
}
299✔
148

149
std::size_t stream::get_num_substreams() const
×
150
{
151
    return num_substreams;
×
152
}
153

154
// Explicit instantiation
155
template bool stream::async_send_heaps_impl<stream::null_unwinder, heap_reference *>(
156
    heap_reference *first, heap_reference *last,
157
    completion_handler &&handler, group_mode mode);
158

159
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc