• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CrowCpp / Crow / 240

19 Apr 2024 03:06PM UTC coverage: 87.782%. Remained the same
240

push

gh-actions

gittiver
Define GET_IO_SERVICE once

3815 of 4346 relevant lines covered (87.78%)

109.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.13
/include/crow/http_server.h
1
#pragma once
2

3
#ifdef CROW_USE_BOOST
4
#include <boost/asio.hpp>
5
#ifdef CROW_ENABLE_SSL
6
#include <boost/asio/ssl.hpp>
7
#endif
8
#else
9
#ifndef ASIO_STANDALONE
10
#define ASIO_STANDALONE
11
#endif
12
#include <asio.hpp>
13
#ifdef CROW_ENABLE_SSL
14
#include <asio/ssl.hpp>
15
#endif
16
#endif
17

18
#include <atomic>
19
#include <chrono>
20
#include <cstdint>
21
#include <future>
22
#include <memory>
23
#include <vector>
24

25
#include "crow/version.h"
26
#include "crow/http_connection.h"
27
#include "crow/logging.h"
28
#include "crow/task_timer.h"
29

30
namespace crow
31
{
32
#ifdef CROW_USE_BOOST
33
    namespace asio = boost::asio;
34
    using error_code = boost::system::error_code;
35
#else
36
    using error_code = asio::error_code;
37
#endif
38
    using tcp = asio::ip::tcp;
39

40
    template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
41
    class Server
42
    {
43
    public:
44
        Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr):
25✔
45
          acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)),
25✔
46
          signals_(io_service_),
25✔
47
          tick_timer_(io_service_),
25✔
48
          handler_(handler),
25✔
49
          concurrency_(concurrency),
25✔
50
          timeout_(timeout),
25✔
51
          server_name_(server_name),
25✔
52
          port_(port),
25✔
53
          bindaddr_(bindaddr),
25✔
54
          task_queue_length_pool_(concurrency_ - 1),
25✔
55
          middlewares_(middlewares),
25✔
56
          adaptor_ctx_(adaptor_ctx)
75✔
57
        {}
25✔
58

59
        void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
24✔
60
        {
61
            tick_interval_ = d;
24✔
62
            tick_function_ = f;
24✔
63
        }
24✔
64

65
        void on_tick()
×
66
        {
67
            tick_function_();
×
68
            tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
69
            tick_timer_.async_wait([this](const error_code& ec) {
×
70
                if (ec)
×
71
                    return;
×
72
                on_tick();
×
73
            });
74
        }
75

76
        void run()
24✔
77
        {
78
            uint16_t worker_thread_count = concurrency_ - 1;
24✔
79
            for (int i = 0; i < worker_thread_count; i++)
48✔
80
                io_service_pool_.emplace_back(new asio::io_service());
24✔
81
            get_cached_date_str_pool_.resize(worker_thread_count);
24✔
82
            task_timer_pool_.resize(worker_thread_count);
24✔
83

84
            std::vector<std::future<void>> v;
24✔
85
            std::atomic<int> init_count(0);
24✔
86
            for (uint16_t i = 0; i < worker_thread_count; i++)
48✔
87
                v.push_back(
24✔
88
                  std::async(
89
                    std::launch::async, [this, i, &init_count] {
24✔
90
                        // thread local date string get function
91
                        auto last = std::chrono::steady_clock::now();
24✔
92

93
                        std::string date_str;
24✔
94
                        auto update_date_str = [&] {
50✔
95
                            auto last_time_t = time(0);
26✔
96
                            tm my_tm;
97

98
#if defined(_MSC_VER) || defined(__MINGW32__)
99
                            gmtime_s(&my_tm, &last_time_t);
100
#else
101
                            gmtime_r(&last_time_t, &my_tm);
26✔
102
#endif
103
                            date_str.resize(100);
26✔
104
                            size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
26✔
105
                            date_str.resize(date_str_sz);
26✔
106
                        };
107
                        update_date_str();
24✔
108
                        get_cached_date_str_pool_[i] = [&]() -> std::string {
244✔
109
                            if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
72✔
110
                            {
111
                                last = std::chrono::steady_clock::now();
2✔
112
                                update_date_str();
2✔
113
                            }
114
                            return date_str;
72✔
115
                        };
116

117
                        // initializing task timers
118
                        detail::task_timer task_timer(*io_service_pool_[i]);
24✔
119
                        task_timer.set_default_timeout(timeout_);
24✔
120
                        task_timer_pool_[i] = &task_timer;
24✔
121
                        task_queue_length_pool_[i] = 0;
24✔
122

123
                        init_count++;
24✔
124
                        while (1)
24✔
125
                        {
126
                            try
127
                            {
128
                                if (io_service_pool_[i]->run() == 0)
48✔
129
                                {
130
                                    // when io_service.run returns 0, there are no more works to do.
131
                                    break;
24✔
132
                                }
133
                            }
134
                            catch (std::exception& e)
6✔
135
                            {
136
                                CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
3✔
137
                            }
138
                        }
139
                    }));
24✔
140

141
            if (tick_function_ && tick_interval_.count() > 0)
24✔
142
            {
143
                tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
144
                tick_timer_.async_wait(
×
145
                  [this](const error_code& ec) {
×
146
                      if (ec)
×
147
                          return;
×
148
                      on_tick();
×
149
                  });
150
            }
151

152
            port_ = acceptor_.local_endpoint().port();
24✔
153
            handler_->port(port_);
24✔
154

155

156
            CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
24✔
157
            CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
24✔
158

159
            signals_.async_wait(
24✔
160
              [&](const error_code& /*error*/, int /*signal_number*/) {
×
161
                  stop();
×
162
              });
163

164
            while (worker_thread_count != init_count)
556✔
165
                std::this_thread::yield();
532✔
166

167
            do_accept();
24✔
168

169
            std::thread(
48✔
170
              [this] {
24✔
171
                  notify_start();
24✔
172
                  io_service_.run();
24✔
173
                  CROW_LOG_INFO << "Exiting.";
24✔
174
              })
175
              .join();
24✔
176
        }
24✔
177

178
        void stop()
24✔
179
        {
180
            shutting_down_ = true; // Prevent the acceptor from taking new connections
24✔
181
            for (auto& io_service : io_service_pool_)
48✔
182
            {
183
                if (io_service != nullptr)
24✔
184
                {
185
                    CROW_LOG_INFO << "Closing IO service " << &io_service;
24✔
186
                    io_service->stop(); // Close all io_services (and HTTP connections)
24✔
187
                }
188
            }
189

190
            CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')';
24✔
191
            io_service_.stop(); // Close main io_service
24✔
192
        }
24✔
193

194
        /// Wait until the server has properly started
195
        void wait_for_start()
24✔
196
        {
197
            std::unique_lock<std::mutex> lock(start_mutex_);
24✔
198
            if (!server_started_)
24✔
199
                cv_started_.wait(lock);
24✔
200
        }
24✔
201

202
        void signal_clear()
1✔
203
        {
204
            signals_.clear();
1✔
205
        }
1✔
206

207
        void signal_add(int signal_number)
48✔
208
        {
209
            signals_.add(signal_number);
48✔
210
        }
48✔
211

212
    private:
213
        uint16_t pick_io_service_idx()
79✔
214
        {
215
            uint16_t min_queue_idx = 0;
79✔
216

217
            // TODO improve load balancing
218
            // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
219
            // even though the max value of this can be only uint16_t as concurrency is uint16_t.
220
            for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
79✔
221
            // No need to check other io_services if the current one has no tasks
222
            {
223
                if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
×
224
                    min_queue_idx = i;
×
225
            }
226
            return min_queue_idx;
79✔
227
        }
228

229
        void do_accept()
79✔
230
        {
231
            if (!shutting_down_)
79✔
232
            {
233
                uint16_t service_idx = pick_io_service_idx();
79✔
234
                asio::io_service& is = *io_service_pool_[service_idx];
79✔
235
                task_queue_length_pool_[service_idx]++;
79✔
236
                CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
79✔
237

238
                auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
158✔
239
                  is, handler_, server_name_, middlewares_,
79✔
240
                  get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]);
79✔
241

242
                acceptor_.async_accept(
158✔
243
                  p->socket(),
79✔
244
                  [this, p, &is, service_idx](error_code ec) {
55✔
245
                      if (!ec)
55✔
246
                      {
247
                          is.post(
55✔
248
                            [p] {
165✔
249
                                p->start();
55✔
250
                            });
251
                      }
252
                      else
253
                      {
254
                          task_queue_length_pool_[service_idx]--;
×
255
                          CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx];
×
256
                      }
257
                      do_accept();
55✔
258
                  });
259
            }
79✔
260
        }
79✔
261

262
        /// Notify anything using `wait_for_start()` to proceed
263
        void notify_start()
24✔
264
        {
265
            std::unique_lock<std::mutex> lock(start_mutex_);
24✔
266
            server_started_ = true;
24✔
267
            cv_started_.notify_all();
24✔
268
        }
24✔
269

270
    private:
271
        std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
272
        asio::io_service io_service_;
273
        std::vector<detail::task_timer*> task_timer_pool_;
274
        std::vector<std::function<std::string()>> get_cached_date_str_pool_;
275
        tcp::acceptor acceptor_;
276
        bool shutting_down_ = false;
277
        bool server_started_{false};
278
        std::condition_variable cv_started_;
279
        std::mutex start_mutex_;
280
        asio::signal_set signals_;
281

282
        asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
283

284
        Handler* handler_;
285
        uint16_t concurrency_{2};
286
        std::uint8_t timeout_;
287
        std::string server_name_;
288
        uint16_t port_;
289
        std::string bindaddr_;
290
        std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
291

292
        std::chrono::milliseconds tick_interval_;
293
        std::function<void()> tick_function_;
294

295
        std::tuple<Middlewares...>* middlewares_;
296

297
        typename Adaptor::context* adaptor_ctx_;
298
    };
299
} // namespace crow
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc