• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CrowCpp / Crow / 618

20 Feb 2025 05:42PM UTC coverage: 87.862% (-0.08%) from 87.939%
618

push

gh-actions

web-flow
Merge pull request #1000 from softcom-su/synchronious_write_stream_handling_fix

Synchronious write stream handling fix

7 of 10 new or added lines in 1 file covered. (70.0%)

4 existing lines in 2 files now uncovered.

4010 of 4564 relevant lines covered (87.86%)

135.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/include/crow/http_server.h
1
#pragma once
2

3
#ifdef CROW_USE_BOOST
4
#include <boost/asio.hpp>
5
#ifdef CROW_ENABLE_SSL
6
#include <boost/asio/ssl.hpp>
7
#endif
8
#else
9
#ifndef ASIO_STANDALONE
10
#define ASIO_STANDALONE
11
#endif
12
#include <asio.hpp>
13
#ifdef CROW_ENABLE_SSL
14
#include <asio/ssl.hpp>
15
#endif
16
#endif
17

18
#include <atomic>
19
#include <chrono>
20
#include <cstdint>
21
#include <future>
22
#include <memory>
23
#include <vector>
24

25
#include "crow/version.h"
26
#include "crow/http_connection.h"
27
#include "crow/logging.h"
28
#include "crow/task_timer.h"
29

30

31
namespace crow // NOTE: Already documented in "crow/app.h"
32
{
33
#ifdef CROW_USE_BOOST
34
    namespace asio = boost::asio;
35
    using error_code = boost::system::error_code;
36
#else
37
    using error_code = asio::error_code;
38
#endif
39
    using tcp = asio::ip::tcp;
40

41
    template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares>
42
    class Server
43
    {
44
    public:
45
      Server(Handler* handler,
34✔
46
             const tcp::endpoint& endpoint,
47
             std::string server_name = std::string("Crow/") + VERSION,
48
             std::tuple<Middlewares...>* middlewares = nullptr,
49
             uint16_t concurrency = 1,
50
             uint8_t timeout = 5,
51
             typename Adaptor::context* adaptor_ctx = nullptr):
52
          acceptor_(io_context_,endpoint),
34✔
53
          signals_(io_context_),
34✔
54
          tick_timer_(io_context_),
34✔
55
          handler_(handler),
34✔
56
          concurrency_(concurrency),
34✔
57
          timeout_(timeout),
34✔
58
          server_name_(server_name),
34✔
59
          task_queue_length_pool_(concurrency_ - 1),
68✔
60
          middlewares_(middlewares),
34✔
61
          adaptor_ctx_(adaptor_ctx)
102✔
62
        {}
34✔
63

64
        void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
33✔
65
        {
66
            tick_interval_ = d;
33✔
67
            tick_function_ = f;
33✔
68
        }
33✔
69

70
        void on_tick()
×
71
        {
72
            tick_function_();
×
73
            tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
74
            tick_timer_.async_wait([this](const error_code& ec) {
×
75
                if (ec)
×
76
                    return;
×
77
                on_tick();
×
78
            });
79
        }
80

81
        void run()
33✔
82
        {
83
            uint16_t worker_thread_count = concurrency_ - 1;
33✔
84
            for (int i = 0; i < worker_thread_count; i++)
66✔
85
                io_context_pool_.emplace_back(new asio::io_context());
33✔
86
            get_cached_date_str_pool_.resize(worker_thread_count);
33✔
87
            task_timer_pool_.resize(worker_thread_count);
33✔
88

89
            std::vector<std::future<void>> v;
33✔
90
            std::atomic<int> init_count(0);
33✔
91
            for (uint16_t i = 0; i < worker_thread_count; i++)
66✔
92
                v.push_back(
33✔
93
                  std::async(
94
                    std::launch::async, [this, i, &init_count] {
99✔
95
                        // thread local date string get function
96
                        auto last = std::chrono::steady_clock::now();
33✔
97

98
                        std::string date_str;
33✔
99
                        auto update_date_str = [&] {
93✔
100
                            auto last_time_t = time(0);
60✔
101
                            tm my_tm;
102

103
#if defined(_MSC_VER) || defined(__MINGW32__)
104
                            gmtime_s(&my_tm, &last_time_t);
105
#else
106
                            gmtime_r(&last_time_t, &my_tm);
60✔
107
#endif
108
                            date_str.resize(100);
60✔
109
                            size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
60✔
110
                            date_str.resize(date_str_sz);
60✔
111
                        };
112
                        update_date_str();
33✔
113
                        get_cached_date_str_pool_[i] = [&]() -> std::string {
106✔
114
                            if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
73✔
115
                            {
116
                                last = std::chrono::steady_clock::now();
27✔
117
                                update_date_str();
27✔
118
                            }
119
                            return date_str;
73✔
120
                        };
121

122
                        // initializing task timers
123
                        detail::task_timer task_timer(*io_context_pool_[i]);
33✔
124
                        task_timer.set_default_timeout(timeout_);
33✔
125
                        task_timer_pool_[i] = &task_timer;
33✔
126
                        task_queue_length_pool_[i] = 0;
33✔
127

128
                        init_count++;
33✔
129
                        while (1)
32✔
130
                        {
131
                            try
132
                            {
133
                                if (io_context_pool_[i]->run() == 0)
65✔
134
                                {
135
                                    // when io_service.run returns 0, there are no more works to do.
136
                                    break;
33✔
137
                                }
138
                            }
UNCOV
139
                            catch (std::exception& e)
×
140
                            {
UNCOV
141
                                CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
×
142
                            }
143
                        }
144
                    }));
33✔
145

146
            if (tick_function_ && tick_interval_.count() > 0)
33✔
147
            {
148
                tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count()));
×
149
                tick_timer_.async_wait(
×
150
                  [this](const error_code& ec) {
×
151
                      if (ec)
×
152
                          return;
×
153
                      on_tick();
×
154
                  });
155
            }
156

157
            handler_->port(acceptor_.local_endpoint().port());
33✔
158

159

160
            CROW_LOG_INFO << server_name_
66✔
161
                          << " server is running at " << (handler_->ssl_used() ? "https://" : "http://")
33✔
162
                          << acceptor_.local_endpoint().address() << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads";
33✔
163
            CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";
33✔
164

165
            signals_.async_wait(
33✔
166
              [&](const error_code& /*error*/, int /*signal_number*/) {
33✔
167
                  stop();
×
168
              });
169

170
            while (worker_thread_count != init_count)
576✔
171
                std::this_thread::yield();
543✔
172

173
            do_accept();
33✔
174

175
            std::thread(
33✔
176
              [this] {
66✔
177
                  notify_start();
33✔
178
                  io_context_.run();
33✔
179
                  CROW_LOG_INFO << "Exiting.";
33✔
180
              })
181
              .join();
33✔
182
        }
33✔
183

184
        void stop()
33✔
185
        {
186
            shutting_down_ = true; // Prevent the acceptor from taking new connections
33✔
187
            for (auto& io_context : io_context_pool_)
66✔
188
            {
189
                if (io_context != nullptr)
33✔
190
                {
191
                    CROW_LOG_INFO << "Closing IO service " << &io_context;
33✔
192
                    io_context->stop(); // Close all io_services (and HTTP connections)
33✔
193
                }
194
            }
195

196
            CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')';
33✔
197
            io_context_.stop(); // Close main io_service
33✔
198
        }
33✔
199

200
        uint16_t port() const {
4✔
201
            return acceptor_.local_endpoint().port();
4✔
202
        }
203

204
        /// Wait until the server has properly started or until timeout
205
        std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until)
33✔
206
        {
207
            std::unique_lock<std::mutex> lock(start_mutex_);
33✔
208
            
209
            std::cv_status status = std::cv_status::no_timeout;
33✔
210
            while (!server_started_ && ( status==std::cv_status::no_timeout ))
66✔
211
                status = cv_started_.wait_until(lock,wait_until);
33✔
212
            return status;
33✔
213
        }
33✔
214

215
        void signal_clear()
1✔
216
        {
217
            signals_.clear();
1✔
218
        }
1✔
219

220
        void signal_add(int signal_number)
66✔
221
        {
222
            signals_.add(signal_number);
66✔
223
        }
66✔
224

225
    private:
226
        uint16_t pick_io_context_idx()
100✔
227
        {
228
            uint16_t min_queue_idx = 0;
100✔
229

230
            // TODO improve load balancing
231
            // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/
232
            // even though the max value of this can be only uint16_t as concurrency is uint16_t.
233
            for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++)
100✔
234
            // No need to check other io_services if the current one has no tasks
235
            {
236
                if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx])
×
237
                    min_queue_idx = i;
×
238
            }
239
            return min_queue_idx;
100✔
240
        }
241

242
        void do_accept()
100✔
243
        {
244
            if (!shutting_down_)
100✔
245
            {
246
                uint16_t context_idx = pick_io_context_idx();
100✔
247
                asio::io_context& ic = *io_context_pool_[context_idx];
100✔
248
                task_queue_length_pool_[context_idx]++;
100✔
249
                CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
100✔
250

251
                auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>(
200✔
252
                  ic, handler_, server_name_, middlewares_,
100✔
253
                  get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]);
100✔
254

255
                acceptor_.async_accept(
102✔
256
                  p->socket(),
100✔
257
                  [this, p, &ic, context_idx](error_code ec) {
267✔
258
                      if (!ec)
67✔
259
                      {
260
                          asio::post(ic,
67✔
261
                            [p] {
268✔
262
                                p->start();
67✔
263
                            });
264
                      }
265
                      else
266
                      {
267
                          task_queue_length_pool_[context_idx]--;
×
268
                          CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx];
×
269
                      }
270
                      do_accept();
67✔
271
                  });
272
            }
100✔
273
        }
100✔
274

275
        /// Notify anything using `wait_for_start()` to proceed
276
        void notify_start()
33✔
277
        {
278
            std::unique_lock<std::mutex> lock(start_mutex_);
33✔
279
            server_started_ = true;
33✔
280
            cv_started_.notify_all();
33✔
281
        }
33✔
282

283
    private:
284
        std::vector<std::unique_ptr<asio::io_context>> io_context_pool_;
285
        asio::io_context io_context_;
286
        std::vector<detail::task_timer*> task_timer_pool_;
287
        std::vector<std::function<std::string()>> get_cached_date_str_pool_;
288
        tcp::acceptor acceptor_;
289
        bool shutting_down_ = false;
290
        bool server_started_{false};
291
        std::condition_variable cv_started_;
292
        std::mutex start_mutex_;
293
        asio::signal_set signals_;
294

295
        asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_;
296

297
        Handler* handler_;
298
        uint16_t concurrency_{2};
299
        std::uint8_t timeout_;
300
        std::string server_name_;
301
        std::vector<std::atomic<unsigned int>> task_queue_length_pool_;
302

303
        std::chrono::milliseconds tick_interval_;
304
        std::function<void()> tick_function_;
305

306
        std::tuple<Middlewares...>* middlewares_;
307

308
        typename Adaptor::context* adaptor_ctx_;
309
    };
310
} // namespace crow
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc