• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 2264857152

15 Jan 2026 12:07PM UTC coverage: 79.64%. First build
2264857152

push

gitlab-ci

danielskinstad
fix: use boost beast's tcp_stream object

Scheduled async read handlers from boost beast takes a stream
object which we close and destroy in error handlers. The issue is that on
cases where we call the error handler with our custom timeout, we
destroy the stream object which some async read operations still need
access to. By leveraging boost beast's own tcp_stream object, we can get rid
of our custom timeout and set the timeout directly in the object which
will allow the async error handler to do its job.

Ticket: MEN-9104
Changelog: Fixed use-after-free errors when freeing tcp stream socket
which pending async read handlers needed access to.

Signed-off-by: Daniel Skinstad Drabitzius <daniel.drabitzius@northern.tech>

11 of 14 new or added lines in 1 file covered. (78.57%)

7874 of 9887 relevant lines covered (79.64%)

13908.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.39
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
#include <mender-version.h>
28

29
namespace mender {
30
namespace common {
31
namespace http {
32

33
namespace common = mender::common;
34
namespace crypto = mender::common::crypto;
35

36
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
37
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
38
const unsigned int BeastHttpVersion = 11;
39

40
namespace asio = boost::asio;
41
namespace http = boost::beast::http;
42

43
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
44

45
static http::verb MethodToBeastVerb(Method method) {
306✔
46
        switch (method) {
306✔
47
        case Method::GET:
48
                return http::verb::get;
49
        case Method::HEAD:
50
                return http::verb::head;
51
        case Method::POST:
52
                return http::verb::post;
53
        case Method::PUT:
54
                return http::verb::put;
55
        case Method::PATCH:
56
                return http::verb::patch;
57
        case Method::CONNECT:
58
                return http::verb::connect;
59
        case Method::Invalid:
60
                // Fallthrough to end (no-op).
61
                break;
62
        }
63
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
64
        // still assert here for safety.
65
        assert(false);
66
        return http::verb::get;
67
}
68

69
static expected::expected<Method, error::Error> BeastVerbToMethod(
285✔
70
        http::verb verb, const string &verb_string) {
71
        switch (verb) {
285✔
72
        case http::verb::get:
245✔
73
                return Method::GET;
74
        case http::verb::head:
×
75
                return Method::HEAD;
76
        case http::verb::post:
16✔
77
                return Method::POST;
78
        case http::verb::put:
24✔
79
                return Method::PUT;
80
        case http::verb::patch:
×
81
                return Method::PATCH;
82
        case http::verb::connect:
×
83
                return Method::CONNECT;
84
        default:
×
85
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
86
        }
87
}
88

89
template <typename StreamType>
90
class BodyAsyncReader : virtual public io::AsyncReader {
91
public:
92
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
212✔
93
                stream_ {stream},
94
                cancelled_ {cancelled} {
424✔
95
        }
212✔
96
        ~BodyAsyncReader() {
43✔
97
                Cancel();
43✔
98
        }
86✔
99

100
        error::Error AsyncRead(
2,195✔
101
                vector<uint8_t>::iterator start,
102
                vector<uint8_t>::iterator end,
103
                io::AsyncIoHandler handler) override {
104
                if (eof_) {
2,195✔
105
                        handler(0);
×
106
                        return error::NoError;
×
107
                }
108

109
                if (*cancelled_) {
2,195✔
110
                        return error::MakeError(
×
111
                                error::ProgrammingError,
112
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
113
                }
114
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
14,495✔
115
                        if (size && size.value() == 0) {
7,007✔
116
                                eof_ = true;
137✔
117
                        }
118
                        handler(size);
14,014✔
119
                });
120
                return error::NoError;
2,195✔
121
        }
122

123
        void Cancel() override {
43✔
124
                if (!*cancelled_) {
43✔
125
                        stream_.Cancel();
4✔
126
                }
127
        }
43✔
128

129
private:
130
        StreamType &stream_;
131
        shared_ptr<bool> cancelled_;
132
        bool eof_ {false};
133

134
        friend class Client;
135
        friend class Server;
136
};
137

138
template <typename StreamType>
139
class RawSocket : virtual public io::AsyncReadWriter {
140
public:
141
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
142
                destroying_ {make_shared<bool>(false)},
×
143
                stream_ {stream},
144
                buffered_ {buffered} {
×
145
                // If there are no buffered bytes, then we don't need it.
146
                if (buffered_ && buffered_->size() == 0) {
×
147
                        buffered_.reset();
×
148
                }
149
        }
×
150

151
        ~RawSocket() {
8✔
152
                *destroying_ = true;
8✔
153
                Cancel();
8✔
154
        }
16✔
155

156
        error::Error AsyncRead(
159✔
157
                vector<uint8_t>::iterator start,
158
                vector<uint8_t>::iterator end,
159
                io::AsyncIoHandler handler) override {
160
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
161
                // header and parts of the body in one block, return those first.
162
                if (buffered_) {
159✔
163
                        return DrainPrebufferedData(start, end, handler);
×
164
                }
165

166
                read_buffer_ = asio::buffer(&*start, end - start);
159✔
167
                auto &destroying = destroying_;
168
                stream_->async_read_some(
318✔
169
                        read_buffer_,
159✔
170
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
157✔
171
                                if (*destroying) {
313✔
172
                                        return;
173
                                }
174

175
                                if (ec == asio::error::operation_aborted) {
313✔
176
                                        handler(expected::unexpected(error::Error(
12✔
177
                                                make_error_condition(errc::operation_canceled),
6✔
178
                                                "Could not read from socket")));
179
                                } else if (ec) {
310✔
180
                                        handler(expected::unexpected(
12✔
181
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
182
                                } else {
183
                                        handler(num_read);
608✔
184
                                }
185
                        });
186
                return error::NoError;
159✔
187
        }
188

189
        error::Error AsyncWrite(
156✔
190
                vector<uint8_t>::const_iterator start,
191
                vector<uint8_t>::const_iterator end,
192
                io::AsyncIoHandler handler) override {
193
                write_buffer_ = asio::buffer(&*start, end - start);
156✔
194
                auto &destroying = destroying_;
195
                stream_->async_write_some(
312✔
196
                        write_buffer_,
156✔
197
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
154✔
198
                                if (*destroying) {
306✔
199
                                        return;
200
                                }
201

202
                                if (ec == asio::error::operation_aborted) {
306✔
203
                                        handler(expected::unexpected(error::Error(
×
204
                                                make_error_condition(errc::operation_canceled),
×
205
                                                "Could not write to socket")));
206
                                } else if (ec) {
306✔
207
                                        handler(expected::unexpected(
×
208
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
209
                                } else {
210
                                        handler(num_written);
612✔
211
                                }
212
                        });
213
                return error::NoError;
156✔
214
        }
215

216
        void Cancel() override {
15✔
217
                // close() is sufficient:
218
                // https://www.boost.org/doc/libs/latest/doc/html/boost_asio/reference/basic_stream_socket/close/overload1.html
219
                beast::close_socket(beast::get_lowest_layer(*stream_));
220
        }
15✔
221

222
private:
223
        error::Error DrainPrebufferedData(
×
224
                vector<uint8_t>::iterator start,
225
                vector<uint8_t>::iterator end,
226
                io::AsyncIoHandler handler) {
227
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
×
228

229
                // These two lines are equivalent to:
230
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
231
                // but compatible with Boost 1.67.
232
                const beast::flat_buffer &cbuffered = *buffered_;
233
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
×
234
                buffered_->consume(to_copy);
×
235
                if (buffered_->size() == 0) {
×
236
                        // We don't need it anymore.
237
                        buffered_.reset();
×
238
                }
239
                handler(to_copy);
×
240
                return error::NoError;
×
241
        }
242

243
        shared_ptr<bool> destroying_;
244
        shared_ptr<StreamType> stream_;
245
        shared_ptr<beast::flat_buffer> buffered_;
246
        asio::mutable_buffer read_buffer_;
247
        asio::const_buffer write_buffer_;
248
};
249

250
template <typename PARSER>
251
int64_t GetContentLength(const PARSER &parser) {
518✔
252
        auto content_length = parser.content_length();
518✔
253
        if (content_length) {
518✔
254
                return content_length.value();
467✔
255
        } else {
256
                return 0;
257
        }
258
}
259

260
expected::ExpectedBool HasBody(
567✔
261
        const expected::ExpectedString &content_length,
262
        const expected::ExpectedString &transfer_encoding) {
263
        if (transfer_encoding) {
567✔
264
                if (transfer_encoding.value() != "chunked") {
4✔
265
                        return expected::unexpected(error::Error(
×
266
                                make_error_condition(errc::not_supported),
×
267
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
268
                }
269
                return true;
270
        }
271

272
        if (content_length) {
563✔
273
                auto length = common::StringToLongLong(content_length.value());
281✔
274
                if (!length || length.value() < 0) {
281✔
275
                        return expected::unexpected(error::Error(
×
276
                                length.error().code,
277
                                "Content-Length contains invalid number: " + content_length.value()));
×
278
                }
279
                return length.value() > 0;
281✔
280
        }
281

282
        return false;
283
}
284

285
Client::Client(
366✔
286
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
287
        event_loop_ {event_loop},
288
        logger_name_ {logger_name},
289
        client_config_ {client},
290
        http_proxy_ {client.http_proxy},
366✔
291
        https_proxy_ {client.https_proxy},
366✔
292
        no_proxy_ {client.no_proxy},
366✔
293
        cancelled_ {make_shared<bool>(true)},
×
294
        resolver_(GetAsioIoContext(event_loop)),
295
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,464✔
296
}
366✔
297

298
Client::~Client() {
2,196✔
299
        if (!*cancelled_) {
366✔
300
                logger_.Warning("Client destroyed while request is still active!");
30✔
301
        }
302
        DoCancel();
366✔
303
}
366✔
304

305
error::Error Client::Initialize() {
342✔
306
        if (initialized_) {
342✔
307
                return error::NoError;
119✔
308
        }
309

310
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
661✔
311
                ssl_ctx_[i].set_verify_mode(
882✔
312
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
313

314
                beast::error_code ec {};
442✔
315
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
442✔
316
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
317
                        ssl_ctx_[i].use_certificate_file(
318
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
319
                        if (ec) {
4✔
320
                                return error::Error(
321
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
322
                        }
323
                        auto exp_key = crypto::PrivateKey::Load(
324
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
325
                        if (!exp_key) {
3✔
326
                                return exp_key.error().WithContext(
327
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
328
                        }
329

330
                        const int ret =
331
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value()->Get());
2✔
332
                        if (ret != 1) {
2✔
333
                                return MakeError(
334
                                        HTTPInitError,
335
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
336
                                                + " to the SSL CTX");
×
337
                        }
338
                } else if (
339
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
438✔
340
                        return error::Error(
341
                                make_error_condition(errc::invalid_argument),
4✔
342
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
343
                }
344

345
                bool cert_loaded = true;
346
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
438✔
347
                if (ec) {
438✔
348
                        auto err = error::Error(
349
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
350
                        if (client_config_.server_cert_path == "") {
×
351
                                // We aren't going to have any valid certificates then.
352
                                return err;
×
353
                        } else {
354
                                // We have a dedicated certificate, so this is not fatal.
355
                                log::Info(err.String());
×
356
                                cert_loaded = false;
357
                        }
358
                }
359
                if (client_config_.server_cert_path != "") {
438✔
360
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
361
                        if (ec) {
52✔
362
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
363
                                if (!cert_loaded) {
2✔
364
                                        return error::Error(
365
                                                ec.default_error_condition(),
×
366
                                                "Failed to load SSL default directory and server certificate");
×
367
                                }
368
                        }
369
                }
370
        }
371

372
        initialized_ = true;
219✔
373

374
        return error::NoError;
219✔
375
}
376

377
// Create the HOST header according to:
378
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
379
// In short: Add the port-number if it is non-standard HTTP
380
static string CreateHOSTAddress(OutgoingRequestPtr req) {
330✔
381
        if (req->GetPort() == 80 || req->GetPort() == 443) {
330✔
382
                return req->GetHost();
4✔
383
        }
384
        return req->GetHost() + ":" + to_string(req->GetPort());
652✔
385
}
386

387
error::Error Client::AsyncCall(
342✔
388
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
389
        auto err = Initialize();
342✔
390
        if (err != error::NoError) {
342✔
391
                return err;
4✔
392
        }
393

394
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
338✔
395
                return error::Error(
396
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
397
        }
398

399
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
338✔
400
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
401
        }
402

403
        if (!header_handler || !body_handler) {
336✔
404
                return error::MakeError(
405
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
406
        }
407

408
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
335✔
409
                return error::Error(
410
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
411
        }
412

413
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
334✔
414

415
        request_ = req;
416

417
        err = HandleProxySetup();
334✔
418
        if (err != error::NoError) {
334✔
419
                return err;
4✔
420
        }
421

422
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
423
        // request to route to our k8s cluster. Set this in all cases.
424
        const string header_url = CreateHOSTAddress(req);
660✔
425
        req->SetHeader("HOST", header_url);
660✔
426

427
        log::Trace("Setting HOST address: " + header_url);
660✔
428

429
        // Add User-Agent header for all requests
430
        req->SetHeader("User-Agent", "Mender/" MENDER_VERSION);
660✔
431

432
        header_handler_ = header_handler;
330✔
433
        body_handler_ = body_handler;
330✔
434
        status_ = TransactionStatus::None;
330✔
435

436
        cancelled_ = make_shared<bool>(false);
330✔
437

438
        auto &cancelled = cancelled_;
439

440
        resolver_.async_resolve(
660✔
441
                request_->address_.host,
442
                to_string(request_->address_.port),
660✔
443
                [this, cancelled](
656✔
444
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
325✔
445
                        if (!*cancelled) {
326✔
446
                                ResolveHandler(ec, results);
325✔
447
                        }
448
                });
326✔
449

450
        return error::NoError;
330✔
451
}
452

453
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
454
        if (proxy_address.username == "") {
22✔
455
                // nothing to do
456
                return error::NoError;
19✔
457
        }
458
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
459
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
460
        if (!ex_dec_username) {
3✔
461
                return ex_dec_username.error();
×
462
        }
463
        if (!ex_dec_password) {
3✔
464
                return ex_dec_password.error();
×
465
        }
466
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
467
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
468
        if (!ex_encoded_creds) {
3✔
469
                return ex_encoded_creds.error();
×
470
        }
471
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
472
        log::Warning(
3✔
473
                "Avoid using basic authentication if possible, and make sure if it's used, it's through HTTPS");
6✔
474

475
        return error::NoError;
3✔
476
}
477

478
error::Error Client::HandleProxySetup() {
334✔
479
        secondary_req_.reset();
334✔
480

481
        if (request_->address_.protocol == "http") {
334✔
482
                socket_mode_ = SocketMode::Plain;
308✔
483

484
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
308✔
485
                        // Make a modified proxy request.
486
                        BrokenDownUrl proxy_address;
20✔
487
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
488
                        if (err != error::NoError) {
11✔
489
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
490
                        }
491
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
492
                                return MakeError(
493
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
494
                        }
495

496
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
497
                                                                          + ":" + to_string(request_->address_.port)
27✔
498
                                                                          + request_->address_.path;
27✔
499
                        request_->address_.host = proxy_address.host;
9✔
500
                        request_->address_.port = proxy_address.port;
9✔
501
                        request_->address_.protocol = proxy_address.protocol;
9✔
502

503
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
504
                        if (err != error::NoError) {
9✔
505
                                return err;
×
506
                        }
507

508
                        if (proxy_address.protocol == "https") {
9✔
509
                                socket_mode_ = SocketMode::Tls;
5✔
510
                        } else if (proxy_address.protocol == "http") {
4✔
511
                                socket_mode_ = SocketMode::Plain;
4✔
512
                        } else {
513
                                // Should never get here.
514
                                assert(false);
515
                        }
516
                }
517
        } else if (request_->address_.protocol == "https") {
26✔
518
                socket_mode_ = SocketMode::Tls;
26✔
519

520
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
521
                        // Save the original request for later, so that we can make a new request
522
                        // over the channel established by CONNECT.
523
                        secondary_req_ = std::move(request_);
524

525
                        request_ = make_shared<OutgoingRequest>();
30✔
526
                        request_->SetMethod(Method::CONNECT);
15✔
527
                        BrokenDownUrl proxy_address;
28✔
528
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
529
                        if (err != error::NoError) {
15✔
530
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
531
                        }
532
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
533
                                return MakeError(
534
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
535
                        }
536

537
                        request_->address_.path =
538
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
539
                        request_->address_.host = proxy_address.host;
13✔
540
                        request_->address_.port = proxy_address.port;
13✔
541
                        request_->address_.protocol = proxy_address.protocol;
13✔
542

543
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
544
                        if (err != error::NoError) {
13✔
545
                                return err;
×
546
                        }
547

548
                        if (proxy_address.protocol == "https") {
13✔
549
                                socket_mode_ = SocketMode::Tls;
7✔
550
                        } else if (proxy_address.protocol == "http") {
6✔
551
                                socket_mode_ = SocketMode::Plain;
6✔
552
                        } else {
553
                                // Should never get here.
554
                                assert(false);
555
                        }
556
                }
557
        } else {
558
                // Should never get here
559
                assert(false);
560
        }
561

562
        return error::NoError;
330✔
563
}
564

565
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
232✔
566
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
232✔
567
                return expected::unexpected(error::Error(
2✔
568
                        make_error_condition(errc::operation_in_progress),
4✔
569
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
570
        }
571

572
        if (GetContentLength(*response_data_.http_response_parser_) == 0
230✔
573
                && !response_data_.http_response_parser_->chunked()) {
230✔
574
                return expected::unexpected(
18✔
575
                        MakeError(BodyMissingError, "Response does not contain a body"));
54✔
576
        }
577

578
        status_ = TransactionStatus::ReaderCreated;
212✔
579
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
424✔
580
}
581

582
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
583
        if (*cancelled_) {
7✔
584
                return expected::unexpected(error::Error(
×
585
                        make_error_condition(errc::not_connected),
×
586
                        "Cannot switch protocols if endpoint is not connected"));
×
587
        }
588

589
        // Rest of the connection is done directly on the socket, we are done here.
590
        status_ = TransactionStatus::Done;
7✔
591
        *cancelled_ = true;
7✔
592
        cancelled_ = make_shared<bool>(false);
14✔
593

594
        auto stream = stream_;
595
        // This no longer belongs to us.
596
        stream_.reset();
7✔
597

598
        switch (socket_mode_) {
7✔
599
        case SocketMode::TlsTls:
×
NEW
600
                return make_shared<RawSocket<ssl::stream<ssl::stream<beast::tcp_stream>>>>(
×
601
                        stream, response_data_.response_buffer_);
×
602
        case SocketMode::Tls:
×
NEW
603
                return make_shared<RawSocket<ssl::stream<beast::tcp_stream>>>(
×
NEW
604
                        make_shared<ssl::stream<beast::tcp_stream>>(std::move(stream->next_layer())),
×
605
                        response_data_.response_buffer_);
×
606
        case SocketMode::Plain:
7✔
607
                return make_shared<RawSocket<beast::tcp_stream>>(
7✔
608
                        make_shared<beast::tcp_stream>(std::move(stream->next_layer().next_layer())),
14✔
609
                        response_data_.response_buffer_);
7✔
610
        }
611
        return expected::unexpected(MakeError(error::ProgrammingError, "Invalid socket mode"));
×
612
}
613

614
void Client::CallHandler(ResponseHandler handler) {
425✔
615
        // This function exists to make sure we have a copy of the handler we're calling (in the
616
        // argument list). This is important in case the handler owns the client instance through a
617
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
618
        // does, then it destroys the final copy of the handler, and therefore also the client,
619
        // which is why we need to make a copy here, before calling it.
620
        handler(response_);
425✔
621
}
425✔
622

623
void Client::CallErrorHandler(
132✔
624
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
625
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
396✔
626
}
132✔
627

628
void Client::CallErrorHandler(
174✔
629
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
630
        status_ = TransactionStatus::Done;
174✔
631
        DoCancel();
174✔
632
        handler(expected::unexpected(
348✔
633
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
696✔
634
}
174✔
635

636
void Client::ResolveHandler(
325✔
637
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
638
        if (ec) {
325✔
639
                CallErrorHandler(ec, request_, header_handler_);
×
640
                return;
×
641
        }
642

643
        if (logger_.Level() >= log::LogLevel::Debug) {
325✔
644
                string ips = "[";
300✔
645
                string sep;
646
                for (auto r : results) {
1,248✔
647
                        ips += sep;
324✔
648
                        ips += r.endpoint().address().to_string();
324✔
649
                        sep = ", ";
324✔
650
                }
651
                ips += "]";
300✔
652
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
600✔
653
        }
654

655
        resolver_results_ = results;
656

657
        stream_ = make_shared<ssl::stream<ssl::stream<beast::tcp_stream>>>(
325✔
658
                ssl::stream<beast::tcp_stream>(
325✔
659
                        beast::tcp_stream(GetAsioIoContext(event_loop_)), ssl_ctx_[0]),
650✔
660
                ssl_ctx_[1]);
650✔
661

662
        if (response_data_.response_buffer_) {
325✔
663
                // We can reuse this if preexisting, just make sure we start with a
664
                // clean state (while avoiding shrinking/discarding the buffer, see
665
                // https://www.boost.org/doc/libs/1_70_0/libs/beast/doc/html/beast/ref/boost__beast__basic_flat_buffer/clear.html
666
                // for details).
667
                // Since there should be no leftover bytes from previous responses, we
668
                // log if there are some, but let's not bother all users with a warning,
669
                // there is nothing they could do about it. However, for
670
                // testing/debugging/CI, it can be useful to have this information.
671
                if (response_data_.response_buffer_->size() > 0) {
117✔
672
                        logger_.Debug(
1✔
673
                                "Leftover data from the previous response! ("
674
                                + to_string(response_data_.response_buffer_->size()) + " bytes)");
2✔
675
                }
676
                response_data_.response_buffer_->clear();
677
        } else {
678
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
416✔
679

680
                // This is equivalent to:
681
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
682
                // but compatible with Boost 1.67.
683
                response_data_.response_buffer_->prepare(
684
                        body_buffer_.size() - response_data_.response_buffer_->size());
208✔
685
        }
686

687
        auto &cancelled = cancelled_;
688

689
        asio::async_connect(
325✔
690
                stream_->lowest_layer(),
691
                resolver_results_,
325✔
692
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
650✔
693
                        if (!*cancelled) {
325✔
694
                                switch (socket_mode_) {
325✔
695
                                case SocketMode::TlsTls:
×
696
                                        // Should never happen because we always need to handshake
697
                                        // the innermost Tls first, then the outermost, but the
698
                                        // latter doesn't happen here.
699
                                        assert(false);
700
                                        CallErrorHandler(
×
701
                                                error::MakeError(
×
702
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
703
                                                request_,
×
704
                                                header_handler_);
×
705
                                case SocketMode::Tls:
21✔
706
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
707
                                case SocketMode::Plain:
304✔
708
                                        return ConnectHandler(ec, endpoint);
304✔
709
                                }
710
                        }
711
                });
712
}
713

714
template <typename StreamType>
715
void Client::HandshakeHandler(
25✔
716
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
717
        if (ec) {
25✔
718
                CallErrorHandler(ec, request_, header_handler_);
2✔
719
                return;
2✔
720
        }
721

722
        // Enable TCP keepalive
723
        boost::asio::socket_base::keep_alive option(true);
724
        stream_->lowest_layer().set_option(option);
23✔
725

726
        // We can't avoid a C style cast on this next line. The usual method by which system headers
727
        // are excluded from warnings doesn't work, because `SSL_set_tlsext_host_name` is a macro,
728
        // containing a cast, which expands here, not in the original file. So just disable the
729
        // warning here.
730
#ifdef __clang__
731
#pragma clang diagnostic push
732
#pragma clang diagnostic ignored "-Wold-style-cast"
733
#else
734
#pragma GCC diagnostic push
735
#pragma GCC diagnostic ignored "-Wold-style-cast"
736
#endif
737
        // Set SNI Hostname (many hosts need this to handshake successfully)
738
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
739
#ifdef __clang__
740
#pragma clang diagnostic pop
741
#else
742
#pragma GCC diagnostic pop
743
#endif
744
                beast::error_code ec2 {
×
745
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
746
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
747
        }
748

749
        // Enable host name verification (not done automatically and we don't have
750
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
751
        // hence the callback that boost provides).
752
        boost::system::error_code b_ec;
23✔
753
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
754
        if (b_ec) {
23✔
755
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
756
                CallErrorHandler(b_ec, request_, header_handler_);
×
757
                return;
×
758
        }
759

760
        auto &cancelled = cancelled_;
761

762
        stream.async_handshake(
46✔
763
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
764
                        if (*cancelled) {
26✔
765
                                return;
766
                        }
767
                        if (ec) {
26✔
768
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
769
                                CallErrorHandler(ec, request_, header_handler_);
10✔
770
                                return;
10✔
771
                        }
772
                        logger_.Debug("https: Successful SSL handshake");
32✔
773
                        ConnectHandler(ec, endpoint);
16✔
774
                });
775
}
776

777

778
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
320✔
779
        if (ec) {
320✔
780
                CallErrorHandler(ec, request_, header_handler_);
14✔
781
                return;
14✔
782
        }
783

784
        // Enable TCP keepalive
785
        boost::asio::socket_base::keep_alive option(true);
786
        stream_->lowest_layer().set_option(option);
306✔
787

788
        logger_.Debug("Connected to " + endpoint.address().to_string());
612✔
789

790
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
306✔
791
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
612✔
792

793
        for (const auto &header : request_->headers_) {
1,130✔
794
                request_data_.http_request_->set(header.first, header.second);
824✔
795
        }
796

797
        request_data_.http_request_serializer_ =
798
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
306✔
799

800
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
612✔
801

802
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
803
        // if they do, they should be handled higher up in the application logic.
804
        //
805
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
806
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
807
        // `has_value()` in their code, causing their subsequent comparison operation to
808
        // misbehave. So pass highest possible value instead.
809
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
810

811
        auto &cancelled = cancelled_;
812
        auto &request_data = request_data_;
306✔
813

814
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
306✔
815
                if (!*cancelled) {
306✔
816
                        WriteHeaderHandler(ec, num_written);
306✔
817
                }
818
        };
612✔
819

820
        switch (socket_mode_) {
306✔
821
        case SocketMode::TlsTls:
2✔
822
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
823
                break;
824
        case SocketMode::Tls:
14✔
825
                http::async_write_header(
14✔
826
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
827
                break;
828
        case SocketMode::Plain:
290✔
829
                http::async_write_header(
290✔
830
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
831
                break;
832
        }
833
}
834

835
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
306✔
836
        if (num_written > 0) {
306✔
837
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
612✔
838
        }
839

840
        if (ec) {
306✔
841
                CallErrorHandler(ec, request_, header_handler_);
×
842
                return;
262✔
843
        }
844

845
        auto exp_has_body =
846
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
612✔
847
        if (!exp_has_body) {
306✔
848
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
849
                return;
×
850
        }
851
        if (!exp_has_body.value()) {
306✔
852
                ReadHeader();
261✔
853
                return;
854
        }
855

856
        if (!request_->body_gen_ && !request_->async_body_gen_) {
45✔
857
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
858
                CallErrorHandler(err, request_, header_handler_);
2✔
859
                return;
860
        }
861

862
        assert(!(request_->body_gen_ && request_->async_body_gen_));
863

864
        if (request_->body_gen_) {
44✔
865
                auto body_reader = request_->body_gen_();
38✔
866
                if (!body_reader) {
38✔
867
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
868
                        return;
869
                }
870
                request_->body_reader_ = body_reader.value();
38✔
871
        } else {
872
                auto body_reader = request_->async_body_gen_();
6✔
873
                if (!body_reader) {
6✔
874
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
875
                        return;
876
                }
877
                request_->async_body_reader_ = body_reader.value();
6✔
878
        }
879

880
        PrepareAndWriteNewBodyBuffer();
44✔
881
}
882

883
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,290✔
884
        if (num_written > 0) {
2,290✔
885
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,250✔
886
        }
887

888
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,290✔
889
                // Write next block of the body.
890
                PrepareAndWriteNewBodyBuffer();
1,124✔
891
        } else if (ec) {
1,166✔
892
                CallErrorHandler(ec, request_, header_handler_);
8✔
893
        } else if (num_written > 0) {
1,162✔
894
                // We are still writing the body.
895
                WriteBody();
1,125✔
896
        } else {
897
                // We are ready to receive the response.
898
                ReadHeader();
37✔
899
        }
900
}
2,290✔
901

902
void Client::PrepareAndWriteNewBodyBuffer() {
1,168✔
903
        // request_->body_reader_ XOR request_->async_body_reader_
904
        assert(
905
                (request_->body_reader_ || request_->async_body_reader_)
906
                && !(request_->body_reader_ && request_->async_body_reader_));
907

908
        auto cancelled = cancelled_;
909
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,608✔
910
                if (!*cancelled) {
1,168✔
911
                        if (!read) {
1,167✔
912
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
913
                                return;
2✔
914
                        }
915
                        WriteNewBodyBuffer(read.value());
1,165✔
916
                }
917
        };
1,168✔
918

919

920
        if (request_->body_reader_) {
1,168✔
921
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,486✔
922
        } else {
923
                auto err = request_->async_body_reader_->AsyncRead(
924
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
925
                if (err != error::NoError) {
425✔
926
                        CallErrorHandler(err, request_, header_handler_);
×
927
                }
928
        }
929
}
1,168✔
930

931
void Client::WriteNewBodyBuffer(size_t size) {
1,165✔
932
        request_data_.http_request_->body().data = body_buffer_.data();
1,165✔
933
        request_data_.http_request_->body().size = size;
1,165✔
934

935
        if (size > 0) {
1,165✔
936
                request_data_.http_request_->body().more = true;
1,128✔
937
        } else {
938
                // Release ownership of Body reader.
939
                request_->body_reader_.reset();
37✔
940
                request_->async_body_reader_.reset();
37✔
941
                request_data_.http_request_->body().more = false;
37✔
942
        }
943

944
        WriteBody();
1,165✔
945
}
1,165✔
946

947
void Client::WriteBody() {
2,290✔
948
        auto &cancelled = cancelled_;
949
        auto &request_data = request_data_;
2,290✔
950

951
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,290✔
952
                if (!*cancelled) {
2,290✔
953
                        WriteBodyHandler(ec, num_written);
2,290✔
954
                }
955
        };
4,580✔
956

957
        switch (socket_mode_) {
2,290✔
958
        case SocketMode::TlsTls:
×
959
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
960
                break;
961
        case SocketMode::Tls:
×
962
                http::async_write_some(
963
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
964
                break;
965
        case SocketMode::Plain:
2,290✔
966
                http::async_write_some(
967
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
968
                break;
969
        }
970
}
2,290✔
971

972
void Client::ReadHeader() {
298✔
973
        auto &cancelled = cancelled_;
974
        auto &response_data = response_data_;
298✔
975

976
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
295✔
977
                if (!*cancelled) {
295✔
978
                        ReadHeaderHandler(ec, num_read);
295✔
979
                }
980
        };
596✔
981

982
        switch (socket_mode_) {
298✔
983
        case SocketMode::TlsTls:
2✔
984
                http::async_read_some(
2✔
985
                        *stream_,
986
                        *response_data_.response_buffer_,
987
                        *response_data_.http_response_parser_,
988
                        handler);
989
                break;
990
        case SocketMode::Tls:
14✔
991
                http::async_read_some(
14✔
992
                        stream_->next_layer(),
993
                        *response_data_.response_buffer_,
994
                        *response_data_.http_response_parser_,
995
                        handler);
996
                break;
997
        case SocketMode::Plain:
282✔
998
                http::async_read_some(
282✔
999
                        stream_->next_layer().next_layer(),
1000
                        *response_data_.response_buffer_,
1001
                        *response_data_.http_response_parser_,
1002
                        handler);
1003
                break;
1004
        }
1005
}
298✔
1006

1007
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
295✔
1008
        if (num_read > 0) {
295✔
1009
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
580✔
1010
        }
1011

1012
        if (ec) {
295✔
1013
                CallErrorHandler(ec, request_, header_handler_);
5✔
1014
                return;
66✔
1015
        }
1016

1017
        if (!response_data_.http_response_parser_->is_header_done()) {
290✔
1018
                ReadHeader();
×
1019
                return;
×
1020
        }
1021

1022
        if (secondary_req_) {
290✔
1023
                HandleSecondaryRequest();
9✔
1024
                return;
9✔
1025
        }
1026

1027
        response_.reset(new IncomingResponse(*this, cancelled_));
562✔
1028
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
281✔
1029
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
281✔
1030

1031
        logger_.Debug(
562✔
1032
                "Received response: " + to_string(response_->status_code_) + " "
562✔
1033
                + response_->status_message_);
843✔
1034

1035
        string debug_str;
1036
        for (auto header = response_data_.http_response_parser_->get().cbegin();
364✔
1037
                 header != response_data_.http_response_parser_->get().cend();
645✔
1038
                 header++) {
1039
                response_->headers_[string {header->name_string()}] = string {header->value()};
1,092✔
1040
                if (logger_.Level() >= log::LogLevel::Debug) {
364✔
1041
                        debug_str += string {header->name_string()};
346✔
1042
                        debug_str += ": ";
346✔
1043
                        debug_str += string {header->value()};
346✔
1044
                        debug_str += "\n";
346✔
1045
                }
1046
        }
1047

1048
        logger_.Debug("Received headers:\n" + debug_str);
562✔
1049
        debug_str.clear();
1050

1051
        if (GetContentLength(*response_data_.http_response_parser_) == 0
281✔
1052
                && !response_data_.http_response_parser_->chunked()) {
281✔
1053
                auto cancelled = cancelled_;
1054
                status_ = TransactionStatus::HeaderHandlerCalled;
49✔
1055
                CallHandler(header_handler_);
98✔
1056
                if (!*cancelled) {
49✔
1057
                        status_ = TransactionStatus::Done;
44✔
1058
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
44✔
1059
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1060
                                // is meant to be reused.
1061
                                DoCancel();
40✔
1062
                        }
1063
                        CallHandler(body_handler_);
88✔
1064
                }
1065
                return;
1066
        }
1067

1068
        auto cancelled = cancelled_;
1069
        status_ = TransactionStatus::HeaderHandlerCalled;
232✔
1070
        CallHandler(header_handler_);
464✔
1071
        if (*cancelled) {
232✔
1072
                return;
1073
        }
1074

1075
        // We know that a body reader is required here, because of the check for body above.
1076
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
229✔
1077
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1078
        }
1079
}
1080

1081
void Client::HandleSecondaryRequest() {
9✔
1082
        logger_.Debug(
18✔
1083
                "Received proxy response: "
1084
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1085
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1086

1087
        request_ = std::move(secondary_req_);
1088

1089
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1090
                auto err = MakeError(
1091
                        ProxyError,
1092
                        "Proxy returned unexpected response: "
1093
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1094
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1095
                CallErrorHandler(err, request_, header_handler_);
4✔
1096
                return;
1097
        }
1098

1099
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1100
                || response_data_.http_response_parser_->chunked()) {
7✔
1101
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
1102
                CallErrorHandler(err, request_, header_handler_);
×
1103
                return;
1104
        }
1105

1106
        // We are connected. Now repeat the request cycle with the original request. Pretend
1107
        // we were just connected.
1108

1109
        assert(request_->GetProtocol() == "https");
1110

1111
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1112
        // a different layer, this will get out of sync.
1113
        assert(response_data_.response_buffer_->size() == 0);
1114

1115
        switch (socket_mode_) {
7✔
1116
        case SocketMode::TlsTls:
×
1117
                // Should never get here, because this is the only place where TlsTls mode
1118
                // is supposed to be turned on.
1119
                assert(false);
1120
                CallErrorHandler(
×
1121
                        error::MakeError(
×
1122
                                error::ProgrammingError,
1123
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1124
                        request_,
×
1125
                        header_handler_);
×
1126
                break;
×
1127
        case SocketMode::Tls:
3✔
1128
                // Upgrade to TLS inside TLS.
1129
                socket_mode_ = SocketMode::TlsTls;
3✔
1130
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1131
                break;
3✔
1132
        case SocketMode::Plain:
4✔
1133
                // Upgrade to TLS.
1134
                socket_mode_ = SocketMode::Tls;
4✔
1135
                HandshakeHandler(
4✔
1136
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1137
                break;
4✔
1138
        }
1139
}
1140

1141
void Client::AsyncReadNextBodyPart(
5,044✔
1142
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1143
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1144

1145
        if (status_ == TransactionStatus::ReaderCreated) {
5,044✔
1146
                status_ = TransactionStatus::BodyReadingInProgress;
210✔
1147
        }
1148

1149
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
5,044✔
1150
                auto cancelled = cancelled_;
1151
                handler(0);
200✔
1152
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
100✔
1153
                        status_ = TransactionStatus::Done;
100✔
1154
                        DoCancel();
100✔
1155
                        CallHandler(body_handler_);
200✔
1156
                }
1157
                return;
1158
        }
1159

1160
        reader_buf_start_ = start;
4,944✔
1161
        reader_buf_end_ = end;
4,944✔
1162
        reader_handler_ = handler;
4,944✔
1163
        size_t read_size = end - start;
4,944✔
1164
        size_t smallest = min(body_buffer_.size(), read_size);
7,057✔
1165

1166
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,944✔
1167
        response_data_.http_response_parser_->get().body().size = smallest;
4,944✔
1168
        response_data_.last_buffer_size_ = smallest;
4,944✔
1169

1170
        auto &cancelled = cancelled_;
1171
        auto &response_data = response_data_;
4,944✔
1172

1173
        // Set timeout to 5 minutes to ensure we don't hang during async read
1174
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
4,944✔
1175

1176
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,943✔
1177
                if (!*cancelled) {
4,943✔
1178
                        ReadBodyHandler(ec, num_read);
4,943✔
1179
                }
1180
        };
9,888✔
1181

1182
        switch (socket_mode_) {
4,944✔
1183
        case SocketMode::TlsTls:
2✔
1184
                http::async_read_some(
2✔
1185
                        *stream_,
1186
                        *response_data_.response_buffer_,
1187
                        *response_data_.http_response_parser_,
1188
                        async_handler);
1189
                break;
1190
        case SocketMode::Tls:
4✔
1191
                http::async_read_some(
4✔
1192
                        stream_->next_layer(),
1193
                        *response_data_.response_buffer_,
1194
                        *response_data_.http_response_parser_,
1195
                        async_handler);
1196
                break;
1197
        case SocketMode::Plain:
4,938✔
1198
                http::async_read_some(
4,938✔
1199
                        stream_->next_layer().next_layer(),
1200
                        *response_data_.response_buffer_,
1201
                        *response_data_.http_response_parser_,
1202
                        async_handler);
1203
                break;
1204
        }
1205
}
1206

1207
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,943✔
1208
        if (num_read > 0) {
4,943✔
1209
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
9,684✔
1210
        }
1211

1212
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,943✔
1213
                // This can be ignored. We always reset the buffer between reads anyway.
1214
                ec = error_code();
1,958✔
1215
        }
1216

1217
        assert(reader_handler_);
1218

1219
        if (response_data_.http_response_parser_->is_done()) {
4,943✔
1220
                status_ = TransactionStatus::BodyReadingFinished;
106✔
1221
        }
1222

1223
        auto cancelled = cancelled_;
1224

1225
        if (ec) {
4,943✔
1226
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
202✔
1227
                reader_handler_(expected::unexpected(err));
303✔
1228
                if (!*cancelled) {
101✔
1229
                        CallErrorHandler(ec, request_, body_handler_);
194✔
1230
                }
1231
                return;
1232
        }
1233

1234
        // The num_read from above includes out of band payload data, such as chunk headers, which
1235
        // we are not interested in. So we need to calculate the payload size from the remaining
1236
        // buffer space.
1237
        size_t payload_read =
1238
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,842✔
1239

1240
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,842✔
1241
        size_t smallest = min(payload_read, buf_size);
4,842✔
1242

1243
        if (smallest == 0) {
4,842✔
1244
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1245
                // return 0 to the handler however, because in `io::Reader` context this means
1246
                // EOF. So just repeat the request instead, until we get actual payload data.
1247
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
462✔
1248
        } else {
1249
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
4,611✔
1250
                reader_handler_(smallest);
9,222✔
1251
        }
1252
}
1253

1254
void Client::Cancel() {
258✔
1255
        auto cancelled = cancelled_;
1256

1257
        if (!*cancelled) {
258✔
1258
                auto err =
1259
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
40✔
1260
                switch (status_) {
20✔
1261
                case TransactionStatus::None:
3✔
1262
                        CallErrorHandler(err, request_, header_handler_);
3✔
1263
                        break;
3✔
1264
                case TransactionStatus::HeaderHandlerCalled:
16✔
1265
                case TransactionStatus::ReaderCreated:
1266
                case TransactionStatus::BodyReadingInProgress:
1267
                case TransactionStatus::BodyReadingFinished:
1268
                        CallErrorHandler(err, request_, body_handler_);
16✔
1269
                        break;
16✔
1270
                case TransactionStatus::Replying:
1271
                case TransactionStatus::SwitchingProtocol:
1272
                        // Not used by client.
1273
                        assert(false);
1274
                        break;
1275
                case TransactionStatus::BodyHandlerCalled:
1276
                case TransactionStatus::Done:
1277
                        break;
1278
                }
1279
        }
1280

1281
        if (!*cancelled) {
258✔
1282
                DoCancel();
1✔
1283
        }
1284
}
258✔
1285

1286
void Client::DoCancel() {
681✔
1287
        resolver_.cancel();
681✔
1288
        if (stream_) {
681✔
1289
                beast::error_code ec;
318✔
1290
                stream_->lowest_layer().cancel(ec);
318✔
1291
                stream_->lowest_layer().close(ec);
318✔
1292
                stream_.reset();
318✔
1293
        }
1294

1295
        // Reset logger to no connection.
1296
        logger_ = log::Logger(logger_name_);
681✔
1297

1298
        // Set cancel state and then make a new one. Those who are interested should have their own
1299
        // pointer to the old one.
1300
        *cancelled_ = true;
681✔
1301
        cancelled_ = make_shared<bool>(true);
681✔
1302
}
681✔
1303

1304
Stream::Stream(Server &server) :
553✔
1305
        server_ {server},
1306
        logger_ {"http"},
1307
        cancelled_(make_shared<bool>(true)),
553✔
1308
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
553✔
1309
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,659✔
1310
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
1,106✔
1311

1312
        // This is equivalent to:
1313
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1314
        // but compatible with Boost 1.67.
1315
        request_data_.request_buffer_->prepare(
1316
                body_buffer_.size() - request_data_.request_buffer_->size());
553✔
1317

1318
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
1,106✔
1319

1320
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1321
        // they do, they should be handled higher up in the application logic.
1322
        //
1323
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1324
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1325
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1326
        // possible value instead.
1327
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1328
}
553✔
1329

1330
Stream::~Stream() {
1,659✔
1331
        DoCancel();
553✔
1332
}
553✔
1333

1334
void Stream::Cancel() {
7✔
1335
        auto cancelled = cancelled_;
1336

1337
        if (!*cancelled) {
7✔
1338
                auto err =
1339
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1340
                switch (status_) {
7✔
1341
                case TransactionStatus::None:
×
1342
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1343
                        break;
×
1344
                case TransactionStatus::HeaderHandlerCalled:
5✔
1345
                case TransactionStatus::ReaderCreated:
1346
                case TransactionStatus::BodyReadingInProgress:
1347
                case TransactionStatus::BodyReadingFinished:
1348
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1349
                        break;
5✔
1350
                case TransactionStatus::BodyHandlerCalled:
×
1351
                        // In between body handler and reply finished. No one to handle the status
1352
                        // here.
1353
                        server_.RemoveStream(shared_from_this());
×
1354
                        break;
×
1355
                case TransactionStatus::Replying:
1✔
1356
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1357
                        break;
1✔
1358
                case TransactionStatus::SwitchingProtocol:
1✔
1359
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1360
                        break;
1✔
1361
                case TransactionStatus::Done:
1362
                        break;
1363
                }
1364
        }
1365

1366
        if (!*cancelled) {
7✔
1367
                DoCancel();
×
1368
        }
1369
}
7✔
1370

1371
void Stream::DoCancel() {
997✔
1372
        if (socket_.is_open()) {
997✔
1373
                socket_.cancel();
277✔
1374
                socket_.close();
277✔
1375
        }
1376

1377
        // Set cancel state and then make a new one. Those who are interested should have their own
1378
        // pointer to the old one.
1379
        *cancelled_ = true;
997✔
1380
        cancelled_ = make_shared<bool>(true);
997✔
1381
}
997✔
1382

1383
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1384
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1385
}
×
1386

1387
void Stream::CallErrorHandler(
×
1388
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1389
        status_ = TransactionStatus::Done;
×
1390
        DoCancel();
×
1391
        handler(expected::unexpected(err.WithContext(
×
1392
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1393

1394
        server_.RemoveStream(shared_from_this());
×
1395
}
×
1396

1397
void Stream::CallErrorHandler(
2✔
1398
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1399
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1400
}
2✔
1401

1402
void Stream::CallErrorHandler(
8✔
1403
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1404
        status_ = TransactionStatus::Done;
8✔
1405
        DoCancel();
8✔
1406
        handler(
8✔
1407
                req,
1408
                err.WithContext(
8✔
1409
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1410

1411
        server_.RemoveStream(shared_from_this());
8✔
1412
}
8✔
1413

1414
void Stream::CallErrorHandler(
4✔
1415
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1416
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1417
}
4✔
1418

1419
void Stream::CallErrorHandler(
7✔
1420
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1421
        status_ = TransactionStatus::Done;
7✔
1422
        DoCancel();
7✔
1423
        handler(err.WithContext(
14✔
1424
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1425

1426
        server_.RemoveStream(shared_from_this());
7✔
1427
}
7✔
1428

1429
void Stream::CallErrorHandler(
×
1430
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1431
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1432
}
×
1433

1434
void Stream::CallErrorHandler(
1✔
1435
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1436
        status_ = TransactionStatus::Done;
1✔
1437
        DoCancel();
1✔
1438
        handler(expected::unexpected(err.WithContext(
2✔
1439
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1440

1441
        server_.RemoveStream(shared_from_this());
1✔
1442
}
1✔
1443

1444
void Stream::AcceptHandler(const error_code &ec) {
285✔
1445
        if (ec) {
285✔
1446
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1447
                return;
×
1448
        }
1449

1450
        auto ip = socket_.remote_endpoint().address().to_string();
570✔
1451

1452
        // Use IP as context for logging.
1453
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
285✔
1454

1455
        logger_.Debug("Accepted connection.");
570✔
1456

1457
        request_.reset(new IncomingRequest(*this, cancelled_));
570✔
1458

1459
        request_->address_.host = ip;
285✔
1460

1461
        *cancelled_ = false;
285✔
1462

1463
        ReadHeader();
285✔
1464
}
1465

1466
void Stream::ReadHeader() {
285✔
1467
        auto &cancelled = cancelled_;
1468
        auto &request_data = request_data_;
285✔
1469

1470
        http::async_read_some(
570✔
1471
                socket_,
285✔
1472
                *request_data_.request_buffer_,
1473
                *request_data_.http_request_parser_,
1474
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
285✔
1475
                        if (!*cancelled) {
285✔
1476
                                ReadHeaderHandler(ec, num_read);
285✔
1477
                        }
1478
                });
285✔
1479
}
285✔
1480

1481
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
285✔
1482
        if (num_read > 0) {
285✔
1483
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
570✔
1484
        }
1485

1486
        if (ec) {
285✔
1487
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1488
                return;
241✔
1489
        }
1490

1491
        if (!request_data_.http_request_parser_->is_header_done()) {
285✔
1492
                ReadHeader();
×
1493
                return;
×
1494
        }
1495

1496
        auto method_result = BeastVerbToMethod(
1497
                request_data_.http_request_parser_->get().base().method(),
1498
                string {request_data_.http_request_parser_->get().base().method_string()});
570✔
1499
        if (!method_result) {
285✔
1500
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1501
                return;
×
1502
        }
1503
        request_->method_ = method_result.value();
285✔
1504
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
285✔
1505

1506
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
285✔
1507

1508
        string debug_str;
1509
        for (auto header = request_data_.http_request_parser_->get().cbegin();
803✔
1510
                 header != request_data_.http_request_parser_->get().cend();
1,088✔
1511
                 header++) {
1512
                request_->headers_[string {header->name_string()}] = string {header->value()};
2,409✔
1513
                if (logger_.Level() >= log::LogLevel::Debug) {
803✔
1514
                        debug_str += string {header->name_string()};
697✔
1515
                        debug_str += ": ";
697✔
1516
                        debug_str += string {header->value()};
697✔
1517
                        debug_str += "\n";
697✔
1518
                }
1519
        }
1520

1521
        logger_.Debug("Received headers:\n" + debug_str);
570✔
1522
        debug_str.clear();
1523

1524
        if (GetContentLength(*request_data_.http_request_parser_) == 0
285✔
1525
                && !request_data_.http_request_parser_->chunked()) {
285✔
1526
                auto cancelled = cancelled_;
1527
                status_ = TransactionStatus::HeaderHandlerCalled;
240✔
1528
                server_.header_handler_(request_);
480✔
1529
                if (!*cancelled) {
240✔
1530
                        status_ = TransactionStatus::BodyHandlerCalled;
240✔
1531
                        CallBodyHandler();
240✔
1532
                }
1533
                return;
1534
        }
1535

1536
        assert(!request_data_.http_request_parser_->is_done());
1537

1538
        auto cancelled = cancelled_;
1539
        status_ = TransactionStatus::HeaderHandlerCalled;
45✔
1540
        server_.header_handler_(request_);
90✔
1541
        if (*cancelled) {
45✔
1542
                return;
1543
        }
1544

1545
        // We know that a body reader is required here, because of the check for body above.
1546
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
44✔
1547
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1548
        }
1549
}
1550

1551
void Stream::AsyncReadNextBodyPart(
2,272✔
1552
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1553
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1554

1555
        if (status_ == TransactionStatus::ReaderCreated) {
2,272✔
1556
                status_ = TransactionStatus::BodyReadingInProgress;
43✔
1557
        }
1558

1559
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,272✔
1560
                auto cancelled = cancelled_;
1561
                handler(0);
74✔
1562
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
37✔
1563
                        status_ = TransactionStatus::BodyHandlerCalled;
37✔
1564
                        CallBodyHandler();
37✔
1565
                }
1566
                return;
1567
        }
1568

1569
        reader_buf_start_ = start;
2,235✔
1570
        reader_buf_end_ = end;
2,235✔
1571
        reader_handler_ = handler;
2,235✔
1572
        size_t read_size = end - start;
2,235✔
1573
        size_t smallest = min(body_buffer_.size(), read_size);
3,291✔
1574

1575
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,235✔
1576
        request_data_.http_request_parser_->get().body().size = smallest;
2,235✔
1577
        request_data_.last_buffer_size_ = smallest;
2,235✔
1578

1579
        auto &cancelled = cancelled_;
1580
        auto &request_data = request_data_;
2,235✔
1581

1582
        http::async_read_some(
4,470✔
1583
                socket_,
2,235✔
1584
                *request_data_.request_buffer_,
1585
                *request_data_.http_request_parser_,
1586
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,235✔
1587
                        if (!*cancelled) {
2,235✔
1588
                                ReadBodyHandler(ec, num_read);
2,235✔
1589
                        }
1590
                });
2,235✔
1591
}
1592

1593
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,235✔
1594
        if (num_read > 0) {
2,235✔
1595
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,462✔
1596
        }
1597

1598
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,235✔
1599
                // This can be ignored. We always reset the buffer between reads anyway.
1600
                ec = error_code();
979✔
1601
        }
1602

1603
        assert(reader_handler_);
1604

1605
        if (request_data_.http_request_parser_->is_done()) {
2,235✔
1606
                status_ = TransactionStatus::BodyReadingFinished;
37✔
1607
        }
1608

1609
        auto cancelled = cancelled_;
1610

1611
        if (ec) {
2,235✔
1612
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1613
                reader_handler_(expected::unexpected(err));
12✔
1614
                if (!*cancelled) {
4✔
1615
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1616
                }
1617
                return;
1618
        }
1619

1620
        // The num_read from above includes out of band payload data, such as chunk headers, which
1621
        // we are not interested in. So we need to calculate the payload size from the remaining
1622
        // buffer space.
1623
        size_t payload_read =
1624
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,231✔
1625

1626
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,231✔
1627
        size_t smallest = min(payload_read, buf_size);
2,231✔
1628

1629
        if (smallest == 0) {
2,231✔
1630
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1631
                // return 0 to the handler however, because in `io::Reader` context this means
1632
                // EOF. So just repeat the request instead, until we get actual payload data.
1633
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1634
        } else {
1635
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,154✔
1636
                reader_handler_(smallest);
4,308✔
1637
        }
1638
}
1639

1640
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
262✔
1641
        SetupResponse();
262✔
1642

1643
        reply_finished_handler_ = reply_finished_handler;
262✔
1644

1645
        auto &cancelled = cancelled_;
1646
        auto &response_data = response_data_;
262✔
1647

1648
        http::async_write_header(
524✔
1649
                socket_,
262✔
1650
                *response_data_.http_response_serializer_,
1651
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
262✔
1652
                        if (!*cancelled) {
262✔
1653
                                WriteHeaderHandler(ec, num_written);
261✔
1654
                        }
1655
                });
262✔
1656
}
262✔
1657

1658
void Stream::SetupResponse() {
271✔
1659
        auto response = maybe_response_.lock();
271✔
1660
        // Only called from existing responses, so this should always be true.
1661
        assert(response);
1662

1663
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1664
        status_ = TransactionStatus::Replying;
271✔
1665

1666
        // From here on we take shared ownership.
1667
        response_ = response;
1668

1669
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
542✔
1670

1671
        for (const auto &header : response->headers_) {
607✔
1672
                response_data_.http_response_->base().set(header.first, header.second);
336✔
1673
        }
1674

1675
        response_data_.http_response_->result(response->GetStatusCode());
271✔
1676
        response_data_.http_response_->reason(response->GetStatusMessage());
542✔
1677

1678
        response_data_.http_response_serializer_ =
1679
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
542✔
1680
}
271✔
1681

1682
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
261✔
1683
        if (num_written > 0) {
261✔
1684
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
522✔
1685
        }
1686

1687
        if (ec) {
261✔
1688
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1689
                return;
38✔
1690
        }
1691

1692
        auto exp_has_body =
1693
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
522✔
1694
        if (!exp_has_body) {
261✔
1695
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
1696
                return;
×
1697
        }
1698
        if (!exp_has_body.value()) {
261✔
1699
                FinishReply();
37✔
1700
                return;
1701
        }
1702

1703
        if (!response_->body_reader_ && !response_->async_body_reader_) {
224✔
1704
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1705
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1706
                return;
1707
        }
1708

1709
        PrepareAndWriteNewBodyBuffer();
223✔
1710
}
1711

1712
void Stream::PrepareAndWriteNewBodyBuffer() {
2,544✔
1713
        // response_->body_reader_ XOR response_->async_body_reader_
1714
        assert(
1715
                (response_->body_reader_ || response_->async_body_reader_)
1716
                && !(response_->body_reader_ && response_->async_body_reader_));
1717

1718
        auto read_handler = [this](io::ExpectedSize read) {
2,545✔
1719
                if (!read) {
2,544✔
1720
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1721
                        return;
1✔
1722
                }
1723
                WriteNewBodyBuffer(read.value());
2,543✔
1724
        };
2,544✔
1725

1726
        if (response_->body_reader_) {
2,544✔
1727
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
4,540✔
1728
        } else {
1729
                auto err = response_->async_body_reader_->AsyncRead(
1730
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1731
                if (err != error::NoError) {
274✔
1732
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1733
                }
1734
        }
1735
}
2,544✔
1736

1737
void Stream::WriteNewBodyBuffer(size_t size) {
2,543✔
1738
        response_data_.http_response_->body().data = body_buffer_.data();
2,543✔
1739
        response_data_.http_response_->body().size = size;
2,543✔
1740

1741
        if (size > 0) {
2,543✔
1742
                response_data_.http_response_->body().more = true;
2,369✔
1743
        } else {
1744
                response_data_.http_response_->body().more = false;
174✔
1745
        }
1746

1747
        WriteBody();
2,543✔
1748
}
2,543✔
1749

1750
void Stream::WriteBody() {
4,884✔
1751
        auto &cancelled = cancelled_;
1752
        auto &response_data = response_data_;
4,884✔
1753

1754
        http::async_write_some(
9,768✔
1755
                socket_,
4,884✔
1756
                *response_data_.http_response_serializer_,
1757
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
4,837✔
1758
                        if (!*cancelled) {
4,837✔
1759
                                WriteBodyHandler(ec, num_written);
4,827✔
1760
                        }
1761
                });
4,837✔
1762
}
4,884✔
1763

1764
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,827✔
1765
        if (num_written > 0) {
4,827✔
1766
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
4,682✔
1767
        }
1768

1769
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,827✔
1770
                // Write next body block.
1771
                PrepareAndWriteNewBodyBuffer();
2,321✔
1772
        } else if (ec) {
2,506✔
1773
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1774
        } else if (num_written > 0) {
2,502✔
1775
                // We are still writing the body.
1776
                WriteBody();
2,341✔
1777
        } else {
1778
                // We are finished.
1779
                FinishReply();
161✔
1780
        }
1781
}
4,827✔
1782

1783
void Stream::FinishReply() {
198✔
1784
        // We are done.
1785
        status_ = TransactionStatus::Done;
198✔
1786
        DoCancel();
198✔
1787
        // Release ownership of Body reader.
1788
        response_->body_reader_.reset();
198✔
1789
        response_->async_body_reader_.reset();
198✔
1790
        reply_finished_handler_(error::NoError);
198✔
1791
        server_.RemoveStream(shared_from_this());
198✔
1792
}
198✔
1793

1794
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1795
        SetupResponse();
9✔
1796

1797
        switch_protocol_handler_ = handler;
9✔
1798
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1799

1800
        auto &cancelled = cancelled_;
1801
        auto &response_data = response_data_;
9✔
1802

1803
        http::async_write_header(
18✔
1804
                socket_,
9✔
1805
                *response_data_.http_response_serializer_,
1806
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1807
                        if (!*cancelled) {
9✔
1808
                                SwitchingProtocolHandler(ec, num_written);
8✔
1809
                        }
1810
                });
9✔
1811

1812
        return error::NoError;
9✔
1813
}
1814

1815
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1816
        if (num_written > 0) {
8✔
1817
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1818
        }
1819

1820
        if (ec) {
8✔
1821
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1822
                return;
×
1823
        }
1824

1825
        auto socket = make_shared<RawSocket<tcp::socket>>(
1826
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1827

1828
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1829

1830
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1831
        *cancelled_ = true;
8✔
1832
        cancelled_ = make_shared<bool>(true);
8✔
1833
        server_.RemoveStream(shared_from_this());
16✔
1834

1835
        switch_protocol_handler(socket);
16✔
1836
}
1837

1838
void Stream::CallBodyHandler() {
277✔
1839
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1840
        // it immediately destroys, which would destroy this stream as well. At the end of this
1841
        // function, it's ok to destroy it.
1842
        auto stream_ref = shared_from_this();
1843

1844
        server_.body_handler_(request_, error::NoError);
831✔
1845

1846
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1847
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1848
        // request has not been handled correctly.
1849
        auto response = maybe_response_.lock();
277✔
1850
        if (!response) {
277✔
1851
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1852
                *cancelled_ = true;
3✔
1853
                cancelled_ = make_shared<bool>(true);
3✔
1854
                server_.RemoveStream(shared_from_this());
9✔
1855
        }
1856
}
277✔
1857

1858
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
252✔
1859
        event_loop_ {event_loop},
1860
        acceptor_(GetAsioIoContext(event_loop_)) {
428✔
1861
}
252✔
1862

1863
Server::~Server() {
504✔
1864
        Cancel();
252✔
1865
}
252✔
1866

1867
error::Error Server::AsyncServeUrl(
263✔
1868
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1869
        return AsyncServeUrl(
1870
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
1,048✔
1871
                        if (err != error::NoError) {
272✔
1872
                                body_handler(expected::unexpected(err));
12✔
1873
                        } else {
1874
                                body_handler(req);
532✔
1875
                        }
1876
                });
798✔
1877
}
1878

1879
error::Error Server::AsyncServeUrl(
278✔
1880
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1881
        auto err = BreakDownUrl(url, address_);
278✔
1882
        if (error::NoError != err) {
278✔
1883
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1884
        }
1885

1886
        if (address_.protocol != "http") {
278✔
1887
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1888
        }
1889

1890
        if (address_.path.size() > 0 && address_.path != "/") {
278✔
1891
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1892
        }
1893

1894
        boost::system::error_code ec;
277✔
1895
        auto address = asio::ip::make_address(address_.host, ec);
277✔
1896
        if (ec) {
277✔
1897
                return error::Error(
1898
                        ec.default_error_condition(),
×
1899
                        "Could not construct endpoint from address " + address_.host);
×
1900
        }
1901

1902
        asio::ip::tcp::endpoint endpoint(address, address_.port);
277✔
1903

1904
        ec.clear();
1905
        acceptor_.open(endpoint.protocol(), ec);
277✔
1906
        if (ec) {
277✔
1907
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
18✔
1908
        }
1909

1910
        // Allow address reuse, otherwise we can't re-bind later.
1911
        ec.clear();
1912
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
268✔
1913
        if (ec) {
268✔
1914
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1915
        }
1916

1917
        ec.clear();
1918
        acceptor_.bind(endpoint, ec);
268✔
1919
        if (ec) {
268✔
1920
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1921
        }
1922

1923
        ec.clear();
1924
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
268✔
1925
        if (ec) {
268✔
1926
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1927
        }
1928

1929
        header_handler_ = header_handler;
268✔
1930
        body_handler_ = body_handler;
268✔
1931

1932
        PrepareNewStream();
268✔
1933

1934
        return error::NoError;
268✔
1935
}
1936

1937
void Server::Cancel() {
323✔
1938
        if (acceptor_.is_open()) {
323✔
1939
                acceptor_.cancel();
268✔
1940
                acceptor_.close();
268✔
1941
        }
1942
        streams_.clear();
1943
}
323✔
1944

1945
uint16_t Server::GetPort() const {
17✔
1946
        return acceptor_.local_endpoint().port();
17✔
1947
}
1948

1949
string Server::GetUrl() const {
16✔
1950
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1951
}
1952

1953
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
276✔
1954
        if (*req->cancelled_) {
276✔
1955
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1956
        }
1957
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
552✔
1958
        req->stream_.maybe_response_ = response;
276✔
1959
        return response;
276✔
1960
}
1961

1962
error::Error Server::AsyncReply(
262✔
1963
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1964
        if (*resp->cancelled_) {
262✔
1965
                return MakeError(StreamCancelledError, "Cannot send response");
×
1966
        }
1967

1968
        resp->stream_.AsyncReply(reply_finished_handler);
262✔
1969
        return error::NoError;
262✔
1970
}
1971

1972
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
62✔
1973
        if (*req->cancelled_) {
62✔
1974
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1975
        }
1976

1977
        auto &stream = req->stream_;
62✔
1978
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
62✔
1979
                return expected::unexpected(error::Error(
1✔
1980
                        make_error_condition(errc::operation_in_progress),
2✔
1981
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1982
        }
1983

1984
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
61✔
1985
                && !stream.request_data_.http_request_parser_->chunked()) {
61✔
1986
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
1987
        }
1988

1989
        stream.status_ = TransactionStatus::ReaderCreated;
43✔
1990
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
86✔
1991
}
1992

1993
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1994
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1995
}
1996

1997
void Server::PrepareNewStream() {
553✔
1998
        StreamPtr new_stream {new Stream(*this)};
553✔
1999
        streams_.insert(new_stream);
2000
        AsyncAccept(new_stream);
1,106✔
2001
}
553✔
2002

2003
void Server::AsyncAccept(StreamPtr stream) {
553✔
2004
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
882✔
2005
                if (ec) {
329✔
2006
                        if (ec != errc::operation_canceled) {
44✔
2007
                                log::Error("Could not accept connection: " + ec.message());
×
2008
                        }
2009
                        return;
44✔
2010
                }
2011

2012
                stream->AcceptHandler(ec);
285✔
2013

2014
                this->PrepareNewStream();
285✔
2015
        });
2016
}
553✔
2017

2018
void Server::RemoveStream(StreamPtr stream) {
230✔
2019
        streams_.erase(stream);
230✔
2020

2021
        stream->DoCancel();
230✔
2022
}
230✔
2023

2024
} // namespace http
2025
} // namespace common
2026
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc