• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5486054982

pending completion
5486054982

Pull #219

github

web-flow
Merge 1634bb00d into 99464cfdf
Pull Request #219: Add chunk stream groups

589 of 589 new or added lines in 16 files covered. (100.0%)

5401 of 7206 relevant lines covered (74.95%)

52885.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.33
/src/common_thread_pool.cpp
1
/* Copyright 2015 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <memory>
22
#include <thread>
23
#include <stdexcept>
24
#include <system_error>
25
#include <spead2/common_thread_pool.h>
26
#include <spead2/common_logging.h>
27
#include <spead2/common_features.h>
28
#if SPEAD2_USE_PTHREAD_SETAFFINITY_NP
29
# include <sched.h>
30
# include <pthread.h>
31
#endif
32

33
namespace spead2
34
{
35

36
static void run_io_service(boost::asio::io_service &io_service)
927✔
37
{
38
    try
39
    {
40
        /* Glibc's memory allocator seems to do some initialisation the
41
         * first time a thread does a dynamic allocation. To avoid that
42
         * occurring in a real-time situation, do a dummy allocation now to
43
         * initialise it.
44
         */
45
        int *dummy = new int;
927✔
46
        delete dummy;
927✔
47
        io_service.run();
927✔
48
    }
49
    catch (const std::exception &e)
×
50
    {
51
        log_warning("Worker thread threw exception (expect deadlocks!): %1%", e.what());
×
52
        throw;
×
53
    }
×
54
    catch (...)
×
55
    {
56
        log_warning("Worker thread threw unknown exception (expect deadlocks!)");
×
57
        throw;
×
58
    }
×
59
}
927✔
60

61
thread_pool::thread_pool(int num_threads)
906✔
62
    : work(io_service)
906✔
63
{
64
    if (num_threads < 1)
906✔
65
        throw std::invalid_argument("at least one thread is required");
1✔
66
    workers.reserve(num_threads);
905✔
67
    for (int i = 0; i < num_threads; i++)
1,825✔
68
    {
69
        workers.push_back(std::async(std::launch::async, [this] { run_io_service(io_service); }));
1,840✔
70
    }
71
}
908✔
72

73
thread_pool::thread_pool(int num_threads, const std::vector<int> &affinity)
4✔
74
    : work(io_service)
4✔
75
{
76
    if (num_threads < 1)
4✔
77
        throw std::invalid_argument("at least one thread is required");
1✔
78
    workers.reserve(num_threads);
3✔
79
    for (int i = 0; i < num_threads; i++)
10✔
80
    {
81
        if (affinity.empty())
7✔
82
            workers.push_back(std::async(std::launch::async, [this] { run_io_service(io_service); }));
6✔
83
        else
84
        {
85
            int core = affinity[i % affinity.size()];
4✔
86
            workers.push_back(std::async(std::launch::async, [this, core] {
4✔
87
                set_affinity(core);
4✔
88
                run_io_service(io_service);
4✔
89
            }));
4✔
90
        }
91
    }
92
}
6✔
93

94
void thread_pool::set_affinity(int core)
4✔
95
{
96
#if SPEAD2_USE_PTHREAD_SETAFFINITY_NP
97
    if (core < 0 || core >= CPU_SETSIZE)
4✔
98
        log_warning("Core ID %1% is out of range for a CPU_SET", core);
×
99
    else
100
    {
101
        cpu_set_t set;
102
        CPU_ZERO(&set);
4✔
103
        CPU_SET(core, &set);
4✔
104
        int status = pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
4✔
105
        if (status != 0)
4✔
106
        {
107
            std::error_code code(status, std::system_category());
×
108
            log_warning("Failed to bind to core %1%: %2% (%3%)", core, code.value(), code.message());
×
109
        }
110
    }
111
#else
112
    log_warning("Could not set affinity to core %1%: pthread_setaffinity_np not detected", core);
113
#endif
114
}
4✔
115

116
void thread_pool::stop()
1,836✔
117
{
118
    io_service.stop();
1,836✔
119
    for (auto &worker : workers)
2,763✔
120
    {
121
        try
122
        {
123
            worker.get();
927✔
124
        }
125
        catch (std::exception &e)
×
126
        {
127
            log_warning("worker thread threw an exception: %s", e.what());
×
128
        }
×
129
    }
130
    workers.clear();
1,836✔
131
}
1,836✔
132

133
thread_pool::~thread_pool()
908✔
134
{
135
    stop();
908✔
136
}
908✔
137

138

139
io_service_ref::io_service_ref(boost::asio::io_service &io_service)
39✔
140
    : io_service(io_service)
39✔
141
{
142
}
39✔
143

144
io_service_ref::io_service_ref(thread_pool &tpool)
5✔
145
    : io_service(tpool.get_io_service())
5✔
146
{
147
}
5✔
148

149
void io_service_ref::check_non_null(thread_pool *ptr)
908✔
150
{
151
    if (ptr == nullptr)
908✔
152
        throw std::invalid_argument("io_service_ref cannot be constructed from a null thread pool");
×
153
}
908✔
154

155
boost::asio::io_service &io_service_ref::operator*() const
16,412✔
156
{
157
    return io_service;
16,412✔
158
}
159

160
boost::asio::io_service *io_service_ref::operator->() const
1✔
161
{
162
    return &io_service;
1✔
163
}
164

165
std::shared_ptr<thread_pool> io_service_ref::get_shared_thread_pool() const &
×
166
{
167
    return thread_pool_holder;
×
168
}
169

170
std::shared_ptr<thread_pool> &&io_service_ref::get_shared_thread_pool() &&
609✔
171
{
172
    return std::move(thread_pool_holder);
609✔
173
}
174

175
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc