• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CrowCpp / Crow / 512

06 Jan 2025 07:25PM UTC coverage: 87.519% (-0.07%) from 87.586%
512

push

gh-actions

web-flow
Merge pull request #971 from bugdea1er/io-context

Remove usage of deprecated Asio API

36 of 38 new or added lines in 5 files covered. (94.74%)

3 existing lines in 1 file now uncovered.

3955 of 4519 relevant lines covered (87.52%)

260.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.05
/include/crow/http_server.h
1
#pragma once
2

3
#ifdef CROW_USE_BOOST
4
#include <boost/asio.hpp>
5
#ifdef CROW_ENABLE_SSL
6
#include <boost/asio/ssl.hpp>
7
#endif
8
#else
9
#ifndef ASIO_STANDALONE
10
#define ASIO_STANDALONE
11
#endif
12
#include <asio.hpp>
13
#ifdef CROW_ENABLE_SSL
14
#include <asio/ssl.hpp>
15
#endif
16
#endif
17

18
#include <atomic>
19
#include <chrono>
20
#include <cstdint>
21
#include <future>
22
#include <memory>
23
#include <vector>
24

25
#include "crow/version.h"
26
#include "crow/http_connection.h"
27
#include "crow/logging.h"
28
#include "crow/task_timer.h"
29

30

31
namespace crow // NOTE: Already documented in "crow/app.h"
32
{
33
#ifdef CROW_USE_BOOST
34
    namespace asio = boost::asio;
35
    using error_code = boost::system::error_code;
36
#else
37
    using error_code = asio::error_code;
38
#endif
39
    using tcp = asio::ip::tcp;
40

41
    template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42
    class Server
43
    {
44
    public:
45
        Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr):
64✔
46
          acceptor_(io_context_, tcp::endpoint(asio::ip::make_address(bindaddr), port)),
64✔
47
          signals_(io_context_),
64✔
48
          tick_timer_(io_context_),
64✔
49
          handler_(handler),
64✔
50
          concurrency_(concurrency),
64✔
51
          timeout_(timeout),
64✔
52
          server_name_(server_name),
64✔
53
          port_(port),
64✔
54
          bindaddr_(bindaddr),
64✔
55
          task_queue_length_pool_(concurrency_ - 1),
128✔
56
          middlewares_(middlewares),
64✔
57
          adaptor_ctx_(adaptor_ctx)
192✔
58
        {}
64✔
59

60
        void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
62✔
61
        {
62
            tick_interval_ = d;
62✔
63
            tick_function_ = f;
62✔
64
        }
62✔
65

66
        void on_tick()
×
67
        {
68
            tick_function_();
×
69
            tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
70
            tick_timer_.async_wait([this](const error_code& ec) {
×
71
                if (ec)
×
72
                    return;
×
73
                on_tick();
×
74
            });
75
        }
76

77
        void run()
62✔
78
        {
79
            uint16_t worker_thread_count = concurrency_ - 1;
62✔
80
            for (int i = 0; i < worker_thread_count; i++)
124✔
81
                io_context_pool_.emplace_back(new asio::io_context());
62✔
82
            get_cached_date_str_pool_.resize(worker_thread_count);
62✔
83
            task_timer_pool_.resize(worker_thread_count);
62✔
84

85
            std::vector<std::future<void>> v;
62✔
86
            std::atomic<int> init_count(0);
62✔
87
            for (uint16_t i = 0; i < worker_thread_count; i++)
124✔
88
                v.push_back(
62✔
89
                  std::async(
90
                    std::launch::async, [this, i, &init_count] {
186✔
91
                        // thread local date string get function
92
                        auto last = std::chrono::steady_clock::now();
62✔
93

94
                        std::string date_str;
62✔
95
                        auto update_date_str = [&] {
128✔
96
                            auto last_time_t = time(0);
66✔
97
                            tm my_tm;
98

99
#if defined(_MSC_VER) || defined(__MINGW32__)
100
                            gmtime_s(&my_tm, &last_time_t);
101
#else
102
                            gmtime_r(&last_time_t, &my_tm);
66✔
103
#endif
104
                            date_str.resize(100);
66✔
105
                            size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
66✔
106
                            date_str.resize(date_str_sz);
66✔
107
                        };
108
                        update_date_str();
62✔
109
                        get_cached_date_str_pool_[i] = [&]() -> std::string {
206✔
110
                            if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
144✔
111
                            {
112
                                last = std::chrono::steady_clock::now();
4✔
113
                                update_date_str();
4✔
114
                            }
115
                            return date_str;
144✔
116
                        };
117

118
                        // initializing task timers
119
                        detail::task_timer task_timer(*io_context_pool_[i]);
62✔
120
                        task_timer.set_default_timeout(timeout_);
62✔
121
                        task_timer_pool_[i] = &task_timer;
62✔
122
                        task_queue_length_pool_[i] = 0;
62✔
123

124
                        init_count++;
62✔
125
                        while (1)
62✔
126
                        {
127
                            try
128
                            {
129
                                if (io_context_pool_[i]->run() == 0)
124✔
130
                                {
131
                                    // when io_service.run returns 0, there are no more works to do.
132
                                    break;
62✔
133
                                }
134
                            }
135
                            catch (std::exception& e)
12✔
136
                            {
137
                                CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
6✔
138
                            }
139
                        }
140
                    }));
62✔
141

142
            if (tick_function_ && tick_interval_.count() > 0)
62✔
143
            {
144
                tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
145
                tick_timer_.async_wait(
×
146
                  [this](const error_code& ec) {
×
147
                      if (ec)
×
148
                          return;
×
149
                      on_tick();
×
150
                  });
151
            }
152

153
            port_ = acceptor_.local_endpoint().port();
62✔
154
            handler_->port(port_);
60✔
155

156

157
            CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
60✔
158
            CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
62✔
159

160
            signals_.async_wait(
62✔
161
              [&](const error_code& /*error*/, int /*signal_number*/) {
62✔
162
                  stop();
×
163
              });
164

165
            while (worker_thread_count != init_count)
1,040✔
166
                std::this_thread::yield();
978✔
167

168
            do_accept();
62✔
169

170
            std::thread(
62✔
171
              [this] {
124✔
172
                  notify_start();
62✔
173
                  io_context_.run();
62✔
174
                  CROW_LOG_INFO << "Exiting.";
62✔
175
              })
176
              .join();
62✔
177
        }
62✔
178

179
        void stop()
62✔
180
        {
181
            shutting_down_ = true; // Prevent the acceptor from taking new connections
62✔
182
            for (auto& io_context : io_context_pool_)
124✔
183
            {
184
                if (io_context != nullptr)
62✔
185
                {
186
                    CROW_LOG_INFO << "Closing IO service " << &io_context;
62✔
187
                    io_context->stop(); // Close all io_services (and HTTP connections)
62✔
188
                }
189
            }
190

191
            CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
62✔
192
            io_context_.stop(); // Close main io_service
62✔
193
        }
62✔
194

195
        uint16_t port(){
8✔
196
            return acceptor_.local_endpoint().port();
8✔
197
        }
198

199
        /// Wait until the server has properly started
200
        void wait_for_start()
62✔
201
        {
202
            std::unique_lock<std::mutex> lock(start_mutex_);
62✔
203
            while (!server_started_)
124✔
204
                cv_started_.wait(lock);
62✔
205
        }
62✔
206

207
        void signal_clear()
2✔
208
        {
209
            signals_.clear();
2✔
210
        }
2✔
211

212
        void signal_add(int signal_number)
124✔
213
        {
214
            signals_.add(signal_number);
124✔
215
        }
124✔
216

217
    private:
218
        uint16_t pick_io_context_idx()
184✔
219
        {
220
            uint16_t min_queue_idx = 0;
184✔
221

222
            // TODO improve load balancing
223
            // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
224
            // even though the max value of this can be only uint16_t as concurrency is uint16_t.
225
            for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
184✔
226
            // No need to check other io_services if the current one has no tasks
227
            {
228
                if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
×
229
                    min_queue_idx = i;
×
230
            }
231
            return min_queue_idx;
184✔
232
        }
233

234
        void do_accept()
186✔
235
        {
236
            if (!shutting_down_)
186✔
237
            {
238
                uint16_t context_idx = pick_io_context_idx();
184✔
239
                asio::io_context& ic = *io_context_pool_[context_idx];
184✔
240
                task_queue_length_pool_[context_idx]++;
184✔
241
                CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
184✔
242

243
                auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
368✔
244
                  ic, handler_, server_name_, middlewares_,
184✔
245
                  get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
184✔
246

247
                acceptor_.async_accept(
188✔
248
                  p->socket(),
184✔
249
                  [this, p, &ic, context_idx](error_code ec) {
492✔
250
                      if (!ec)
124✔
251
                      {
252
                          asio::post(ic,
124✔
253
                            [p] {
496✔
254
                                p->start();
124✔
255
                            });
256
                      }
257
                      else
258
                      {
NEW
259
                          task_queue_length_pool_[context_idx]--;
×
NEW
260
                          CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
×
261
                      }
262
                      do_accept();
122✔
263
                  });
264
            }
184✔
265
        }
186✔
266

267
        /// Notify anything using `wait_for_start()` to proceed
268
        void notify_start()
62✔
269
        {
270
            std::unique_lock<std::mutex> lock(start_mutex_);
62✔
271
            server_started_ = true;
62✔
272
            cv_started_.notify_all();
62✔
273
        }
62✔
274

275
    private:
276
        std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
277
        asio::io_context io_context_;
278
        std::vector<detail::task_timer*> task_timer_pool_;
279
        std::vector<std::function<std::string()>> get_cached_date_str_pool_;
280
        tcp::acceptor acceptor_;
281
        bool shutting_down_ = false;
282
        bool server_started_{false};
283
        std::condition_variable cv_started_;
284
        std::mutex start_mutex_;
285
        asio::signal_set signals_;
286

287
        asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
288

289
        Handler* handler_;
290
        uint16_t concurrency_{2};
291
        std::uint8_t timeout_;
292
        std::string server_name_;
293
        uint16_t port_;
294
        std::string bindaddr_;
295
        std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
296

297
        std::chrono::milliseconds tick_interval_;
298
        std::function<void()> tick_function_;
299

300
        std::tuple<Middlewares...>* middlewares_;
301

302
        typename Adaptor::context* adaptor_ctx_;
303
    };
304
} // namespace crow
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc