• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CrowCpp / Crow / 619

20 Feb 2025 05:47PM UTC coverage: 87.905% (-0.2%) from 88.07%
619

push

gh-actions

gittiver
cherry picked "synchronous Write Stream handling fix"

7 of 10 new or added lines in 1 file covered. (70.0%)

5 existing lines in 2 files now uncovered.

4012 of 4564 relevant lines covered (87.91%)

270.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/include/crow/http_server.h
1
#pragma once
2

3
#ifdef CROW_USE_BOOST
4
#include <boost/asio.hpp>
5
#ifdef CROW_ENABLE_SSL
6
#include <boost/asio/ssl.hpp>
7
#endif
8
#else
9
#ifndef ASIO_STANDALONE
10
#define ASIO_STANDALONE
11
#endif
12
#include <asio.hpp>
13
#ifdef CROW_ENABLE_SSL
14
#include <asio/ssl.hpp>
15
#endif
16
#endif
17

18
#include <atomic>
19
#include <chrono>
20
#include <cstdint>
21
#include <future>
22
#include <memory>
23
#include <vector>
24

25
#include "crow/version.h"
26
#include "crow/http_connection.h"
27
#include "crow/logging.h"
28
#include "crow/task_timer.h"
29

30

31
namespace crow // NOTE: Already documented in "crow/app.h"
32
{
33
#ifdef CROW_USE_BOOST
34
    namespace asio = boost::asio;
35
    using error_code = boost::system::error_code;
36
#else
37
    using error_code = asio::error_code;
38
#endif
39
    using tcp = asio::ip::tcp;
40

41
    template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42
    class Server
43
    {
44
    public:
45
      Server(Handler* handler,
68✔
46
             const tcp::endpoint& endpoint,
47
             std::string server_name = std::string("Crow/") + VERSION,
48
             std::tuple<Middlewares...>* middlewares = nullptr,
49
             uint16_t concurrency = 1,
50
             uint8_t timeout = 5,
51
             typename Adaptor::context* adaptor_ctx = nullptr):
52
          acceptor_(io_context_,endpoint),
68✔
53
          signals_(io_context_),
68✔
54
          tick_timer_(io_context_),
68✔
55
          handler_(handler),
68✔
56
          concurrency_(concurrency),
68✔
57
          timeout_(timeout),
68✔
58
          server_name_(server_name),
68✔
59
          task_queue_length_pool_(concurrency_ - 1),
136✔
60
          middlewares_(middlewares),
68✔
61
          adaptor_ctx_(adaptor_ctx)
204✔
62
        {}
68✔
63

64
        void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
66✔
65
        {
66
            tick_interval_ = d;
66✔
67
            tick_function_ = f;
66✔
68
        }
66✔
69

70
        void on_tick()
×
71
        {
72
            tick_function_();
×
73
            tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
74
            tick_timer_.async_wait([this](const error_code& ec) {
×
75
                if (ec)
×
76
                    return;
×
77
                on_tick();
×
78
            });
79
        }
80

81
        void run()
66✔
82
        {
83
            uint16_t worker_thread_count = concurrency_ - 1;
66✔
84
            for (int i = 0; i < worker_thread_count; i++)
132✔
85
                io_context_pool_.emplace_back(new asio::io_context());
66✔
86
            get_cached_date_str_pool_.resize(worker_thread_count);
66✔
87
            task_timer_pool_.resize(worker_thread_count);
66✔
88

89
            std::vector<std::future<void>> v;
66✔
90
            std::atomic<int> init_count(0);
66✔
91
            for (uint16_t i = 0; i < worker_thread_count; i++)
132✔
92
                v.push_back(
66✔
93
                  std::async(
94
                    std::launch::async, [this, i, &init_count] {
198✔
95
                        // thread local date string get function
96
                        auto last = std::chrono::steady_clock::now();
66✔
97

98
                        std::string date_str;
66✔
99
                        auto update_date_str = [&] {
186✔
100
                            auto last_time_t = time(0);
120✔
101
                            tm my_tm;
102

103
#if defined(_MSC_VER) || defined(__MINGW32__)
104
                            gmtime_s(&my_tm, &last_time_t);
105
#else
106
                            gmtime_r(&last_time_t, &my_tm);
120✔
107
#endif
108
                            date_str.resize(100);
120✔
109
                            size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
120✔
110
                            date_str.resize(date_str_sz);
120✔
111
                        };
112
                        update_date_str();
66✔
113
                        get_cached_date_str_pool_[i] = [&]() -> std::string {
212✔
114
                            if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
146✔
115
                            {
116
                                last = std::chrono::steady_clock::now();
54✔
117
                                update_date_str();
54✔
118
                            }
119
                            return date_str;
146✔
120
                        };
121

122
                        // initializing task timers
123
                        detail::task_timer task_timer(*io_context_pool_[i]);
66✔
124
                        task_timer.set_default_timeout(timeout_);
66✔
125
                        task_timer_pool_[i] = &task_timer;
66✔
126
                        task_queue_length_pool_[i] = 0;
66✔
127

128
                        init_count++;
66✔
129
                        while (1)
62✔
130
                        {
131
                            try
132
                            {
133
                                if (io_context_pool_[i]->run() == 0)
128✔
134
                                {
135
                                    // when io_service.run returns 0, there are no more works to do.
136
                                    break;
66✔
137
                                }
138
                            }
UNCOV
139
                            catch (std::exception& e)
×
140
                            {
UNCOV
141
                                CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
×
142
                            }
143
                        }
144
                    }));
66✔
145

146
            if (tick_function_ && tick_interval_.count() > 0)
66✔
147
            {
148
                tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
149
                tick_timer_.async_wait(
×
150
                  [this](const error_code& ec) {
×
151
                      if (ec)
×
152
                          return;
×
153
                      on_tick();
×
154
                  });
155
            }
156

157
            handler_->port(acceptor_.local_endpoint().port());
66✔
158

159

160
            CROW_LOG_INFO << server_name_
132✔
161
                          << " server is running at " << (handler_->ssl_used() ? "https://" : "http://")
66✔
162
                          << acceptor_.local_endpoint().address() << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
66✔
163
            CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
66✔
164

165
            signals_.async_wait(
66✔
166
              [&](const error_code& /*error*/, int /*signal_number*/) {
66✔
167
                  stop();
×
168
              });
169

170
            while (worker_thread_count != init_count)
1,188✔
171
                std::this_thread::yield();
1,122✔
172

173
            do_accept();
66✔
174

175
            std::thread(
66✔
176
              [this] {
132✔
177
                  notify_start();
66✔
178
                  io_context_.run();
66✔
179
                  CROW_LOG_INFO << "Exiting.";
66✔
180
              })
181
              .join();
66✔
182
        }
66✔
183

184
        void stop()
66✔
185
        {
186
            shutting_down_ = true; // Prevent the acceptor from taking new connections
66✔
187
            for (auto& io_context : io_context_pool_)
132✔
188
            {
189
                if (io_context != nullptr)
66✔
190
                {
191
                    CROW_LOG_INFO << "Closing IO service " << &io_context;
66✔
192
                    io_context->stop(); // Close all io_services (and HTTP connections)
66✔
193
                }
194
            }
195

196
            CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
66✔
197
            io_context_.stop(); // Close main io_service
66✔
198
        }
66✔
199

200
        uint16_t port() const {
8✔
201
            return acceptor_.local_endpoint().port();
8✔
202
        }
203

204
        /// Wait until the server has properly started or until timeout
205
        std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
66✔
206
        {
207
            std::unique_lock<std::mutex> lock(start_mutex_);
66✔
208
            
209
            std::cv_status status = std::cv_status::no_timeout;
66✔
210
            while (!server_started_ && ( status==std::cv_status::no_timeout ))
128✔
211
                status = cv_started_.wait_until(lock,wait_until);
62✔
212
            return status;
66✔
213
        }
66✔
214

215
        void signal_clear()
2✔
216
        {
217
            signals_.clear();
2✔
218
        }
2✔
219

220
        void signal_add(int signal_number)
132✔
221
        {
222
            signals_.add(signal_number);
132✔
223
        }
132✔
224

225
    private:
226
        uint16_t pick_io_context_idx()
198✔
227
        {
228
            uint16_t min_queue_idx = 0;
198✔
229

230
            // TODO improve load balancing
231
            // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
232
            // even though the max value of this can be only uint16_t as concurrency is uint16_t.
233
            for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
198✔
234
            // No need to check other io_services if the current one has no tasks
235
            {
236
                if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
×
237
                    min_queue_idx = i;
×
238
            }
239
            return min_queue_idx;
198✔
240
        }
241

242
        void do_accept()
200✔
243
        {
244
            if (!shutting_down_)
200✔
245
            {
246
                uint16_t context_idx = pick_io_context_idx();
198✔
247
                asio::io_context& ic = *io_context_pool_[context_idx];
198✔
248
                task_queue_length_pool_[context_idx]++;
198✔
249
                CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
198✔
250

251
                auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
396✔
252
                  ic, handler_, server_name_, middlewares_,
198✔
253
                  get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
198✔
254

255
                acceptor_.async_accept(
202✔
256
                  p->socket(),
198✔
257
                  [this, p, &ic, context_idx](error_code ec) {
530✔
258
                      if (!ec)
134✔
259
                      {
260
                          asio::post(ic,
134✔
261
                            [p] {
532✔
262
                                p->start();
132✔
263
                            });
264
                      }
265
                      else
266
                      {
267
                          task_queue_length_pool_[context_idx]--;
×
268
                          CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
×
269
                      }
270
                      do_accept();
134✔
271
                  });
272
            }
198✔
273
        }
200✔
274

275
        /// Notify anything using `wait_for_start()` to proceed
276
        void notify_start()
66✔
277
        {
278
            std::unique_lock<std::mutex> lock(start_mutex_);
66✔
279
            server_started_ = true;
66✔
280
            cv_started_.notify_all();
66✔
281
        }
66✔
282

283
    private:
284
        std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
285
        asio::io_context io_context_;
286
        std::vector<detail::task_timer*> task_timer_pool_;
287
        std::vector<std::function<std::string()>> get_cached_date_str_pool_;
288
        tcp::acceptor acceptor_;
289
        bool shutting_down_ = false;
290
        bool server_started_{false};
291
        std::condition_variable cv_started_;
292
        std::mutex start_mutex_;
293
        asio::signal_set signals_;
294

295
        asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
296

297
        Handler* handler_;
298
        uint16_t concurrency_{2};
299
        std::uint8_t timeout_;
300
        std::string server_name_;
301
        std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
302

303
        std::chrono::milliseconds tick_interval_;
304
        std::function<void()> tick_function_;
305

306
        std::tuple<Middlewares...>* middlewares_;
307

308
        typename Adaptor::context* adaptor_ctx_;
309
    };
310
} // namespace crow
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc