• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 2271300743

19 Jan 2026 11:42AM UTC coverage: 81.376% (+1.7%) from 79.701%
2271300743

push

gitlab-ci

web-flow
Merge pull request #1879 from lluiscampos/MEN-8687-ci-debian-updates

MEN-8687: Update Debian base images for CI jobs

8791 of 10803 relevant lines covered (81.38%)

20310.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.35
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
#include <mender-version.h>
28

29
namespace mender {
30
namespace common {
31
namespace http {
32

33
namespace common = mender::common;
34
namespace crypto = mender::common::crypto;
35

36
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
37
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
38
const unsigned int BeastHttpVersion = 11;
39

40
namespace asio = boost::asio;
41
namespace http = boost::beast::http;
42

43
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
44

45
static http::verb MethodToBeastVerb(Method method) {
306✔
46
        switch (method) {
306✔
47
        case Method::GET:
48
                return http::verb::get;
49
        case Method::HEAD:
50
                return http::verb::head;
51
        case Method::POST:
52
                return http::verb::post;
53
        case Method::PUT:
54
                return http::verb::put;
55
        case Method::PATCH:
56
                return http::verb::patch;
57
        case Method::CONNECT:
58
                return http::verb::connect;
59
        case Method::Invalid:
60
                // Fallthrough to end (no-op).
61
                break;
62
        }
63
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
64
        // still assert here for safety.
65
        assert(false);
66
        return http::verb::get;
67
}
68

69
static expected::expected<Method, error::Error> BeastVerbToMethod(
285✔
70
        http::verb verb, const string &verb_string) {
71
        switch (verb) {
285✔
72
        case http::verb::get:
245✔
73
                return Method::GET;
74
        case http::verb::head:
×
75
                return Method::HEAD;
76
        case http::verb::post:
16✔
77
                return Method::POST;
78
        case http::verb::put:
24✔
79
                return Method::PUT;
80
        case http::verb::patch:
×
81
                return Method::PATCH;
82
        case http::verb::connect:
×
83
                return Method::CONNECT;
84
        default:
×
85
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
86
        }
87
}
88

89
template <typename StreamType>
90
class BodyAsyncReader : virtual public io::AsyncReader {
91
public:
92
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
510✔
93
                stream_ {stream},
255✔
94
                cancelled_ {cancelled} {
255✔
95
        }
510✔
96
        ~BodyAsyncReader() {
510✔
97
                Cancel();
255✔
98
        }
765✔
99

100
        error::Error AsyncRead(
7,004✔
101
                vector<uint8_t>::iterator start,
102
                vector<uint8_t>::iterator end,
103
                io::AsyncIoHandler handler) override {
104
                if (eof_) {
7,004✔
105
                        handler(0);
×
106
                        return error::NoError;
×
107
                }
108

109
                if (*cancelled_) {
7,004✔
110
                        return error::MakeError(
×
111
                                error::ProgrammingError,
112
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
113
                }
114
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
28,494✔
115
                        if (size && size.value() == 0) {
7,003✔
116
                                eof_ = true;
137✔
117
                        }
118
                        handler(size);
14,006✔
119
                });
120
                return error::NoError;
7,004✔
121
        }
122

123
        void Cancel() override {
256✔
124
                if (!*cancelled_) {
256✔
125
                        stream_.Cancel();
9✔
126
                }
127
        }
256✔
128

129
private:
130
        StreamType &stream_;
131
        shared_ptr<bool> cancelled_;
132
        bool eof_ {false};
133

134
        friend class Client;
135
        friend class Server;
136
};
137

138
template <typename StreamType>
139
class RawSocket : virtual public io::AsyncReadWriter {
140
public:
141
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
30✔
142
                destroying_ {make_shared<bool>(false)},
15✔
143
                stream_ {stream},
144
                buffered_ {buffered} {
15✔
145
                // If there are no buffered bytes, then we don't need it.
146
                if (buffered_ && buffered_->size() == 0) {
15✔
147
                        buffered_.reset();
11✔
148
                }
149
        }
30✔
150

151
        ~RawSocket() {
30✔
152
                *destroying_ = true;
15✔
153
                Cancel();
15✔
154
        }
45✔
155

156
        error::Error AsyncRead(
320✔
157
                vector<uint8_t>::iterator start,
158
                vector<uint8_t>::iterator end,
159
                io::AsyncIoHandler handler) override {
160
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
161
                // header and parts of the body in one block, return those first.
162
                if (buffered_) {
320✔
163
                        return DrainPrebufferedData(start, end, handler);
8✔
164
                }
165

166
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
167
                auto &destroying = destroying_;
168
                stream_->async_read_some(
316✔
169
                        read_buffer_,
316✔
170
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
945✔
171
                                if (*destroying) {
313✔
172
                                        return;
173
                                }
174

175
                                if (ec == asio::error::operation_aborted) {
313✔
176
                                        handler(expected::unexpected(error::Error(
9✔
177
                                                make_error_condition(errc::operation_canceled),
6✔
178
                                                "Could not read from socket")));
179
                                } else if (ec) {
310✔
180
                                        handler(expected::unexpected(
6✔
181
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
182
                                } else {
183
                                        handler(num_read);
608✔
184
                                }
185
                        });
186
                return error::NoError;
316✔
187
        }
188

189
        error::Error AsyncWrite(
309✔
190
                vector<uint8_t>::const_iterator start,
191
                vector<uint8_t>::const_iterator end,
192
                io::AsyncIoHandler handler) override {
193
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
194
                auto &destroying = destroying_;
195
                stream_->async_write_some(
309✔
196
                        write_buffer_,
309✔
197
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
924✔
198
                                if (*destroying) {
306✔
199
                                        return;
200
                                }
201

202
                                if (ec == asio::error::operation_aborted) {
306✔
203
                                        handler(expected::unexpected(error::Error(
×
204
                                                make_error_condition(errc::operation_canceled),
×
205
                                                "Could not write to socket")));
206
                                } else if (ec) {
306✔
207
                                        handler(expected::unexpected(
×
208
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
209
                                } else {
210
                                        handler(num_written);
612✔
211
                                }
212
                        });
213
                return error::NoError;
309✔
214
        }
215

216
        void Cancel() override {
28✔
217
                // close() is sufficient:
218
                // https://www.boost.org/doc/libs/latest/doc/html/boost_asio/reference/basic_stream_socket/close/overload1.html
219
                beast::close_socket(beast::get_lowest_layer(*stream_));
220
        }
28✔
221

222
private:
223
        error::Error DrainPrebufferedData(
4✔
224
                vector<uint8_t>::iterator start,
225
                vector<uint8_t>::iterator end,
226
                io::AsyncIoHandler handler) {
227
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
228

229
                // These two lines are equivalent to:
230
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
231
                // but compatible with Boost 1.67.
232
                const beast::flat_buffer &cbuffered = *buffered_;
233
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
234
                buffered_->consume(to_copy);
4✔
235
                if (buffered_->size() == 0) {
4✔
236
                        // We don't need it anymore.
237
                        buffered_.reset();
4✔
238
                }
239
                handler(to_copy);
4✔
240
                return error::NoError;
4✔
241
        }
242

243
        shared_ptr<bool> destroying_;
244
        shared_ptr<StreamType> stream_;
245
        shared_ptr<beast::flat_buffer> buffered_;
246
        asio::mutable_buffer read_buffer_;
247
        asio::const_buffer write_buffer_;
248
};
249

250
template <typename PARSER>
251
int64_t GetContentLength(const PARSER &parser) {
864✔
252
        auto content_length = parser.content_length();
864✔
253
        if (content_length) {
864✔
254
                return content_length.value();
553✔
255
        } else {
256
                return 0;
257
        }
258
}
259

260
expected::ExpectedBool HasBody(
567✔
261
        const expected::ExpectedString &content_length,
262
        const expected::ExpectedString &transfer_encoding) {
263
        if (transfer_encoding) {
567✔
264
                if (transfer_encoding.value() != "chunked") {
4✔
265
                        return expected::unexpected(error::Error(
×
266
                                make_error_condition(errc::not_supported),
×
267
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
268
                }
269
                return true;
270
        }
271

272
        if (content_length) {
563✔
273
                auto length = common::StringToLongLong(content_length.value());
281✔
274
                if (!length || length.value() < 0) {
281✔
275
                        return expected::unexpected(error::Error(
×
276
                                length.error().code,
277
                                "Content-Length contains invalid number: " + content_length.value()));
×
278
                }
279
                return length.value() > 0;
281✔
280
        }
281

282
        return false;
283
}
284

285
Client::Client(
732✔
286
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
366✔
287
        event_loop_ {event_loop},
366✔
288
        logger_name_ {logger_name},
366✔
289
        client_config_ {client},
366✔
290
        http_proxy_ {client.http_proxy},
366✔
291
        https_proxy_ {client.https_proxy},
366✔
292
        no_proxy_ {client.no_proxy},
366✔
293
        cancelled_ {make_shared<bool>(true)},
366✔
294
        resolver_(GetAsioIoContext(event_loop)),
366✔
295
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,464✔
296
}
732✔
297

298
Client::~Client() {
1,830✔
299
        if (!*cancelled_) {
366✔
300
                logger_.Warning("Client destroyed while request is still active!");
30✔
301
        }
302
        DoCancel();
366✔
303
}
2,196✔
304

305
error::Error Client::Initialize() {
342✔
306
        if (initialized_) {
342✔
307
                return error::NoError;
119✔
308
        }
309

310
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
661✔
311
                ssl_ctx_[i].set_verify_mode(
882✔
312
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
313

314
                beast::error_code ec {};
442✔
315
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
442✔
316
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
317
                        ssl_ctx_[i].use_certificate_file(
4✔
318
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
319
                        if (ec) {
4✔
320
                                return error::Error(
321
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
322
                        }
323
                        auto exp_key = crypto::PrivateKey::Load(
324
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
3✔
325
                        if (!exp_key) {
3✔
326
                                return exp_key.error().WithContext(
327
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
328
                        }
329

330
                        const int ret =
331
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value()->Get());
2✔
332
                        if (ret != 1) {
2✔
333
                                return MakeError(
334
                                        HTTPInitError,
335
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
336
                                                + " to the SSL CTX");
×
337
                        }
338
                } else if (
339
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
438✔
340
                        return error::Error(
341
                                make_error_condition(errc::invalid_argument),
4✔
342
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
343
                }
344

345
                bool cert_loaded = true;
346
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
438✔
347
                if (ec) {
438✔
348
                        auto err = error::Error(
349
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
350
                        if (client_config_.server_cert_path == "") {
×
351
                                // We aren't going to have any valid certificates then.
352
                                return err;
×
353
                        } else {
354
                                // We have a dedicated certificate, so this is not fatal.
355
                                log::Info(err.String());
×
356
                                cert_loaded = false;
357
                        }
358
                }
359
                if (client_config_.server_cert_path != "") {
438✔
360
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
361
                        if (ec) {
52✔
362
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
2✔
363
                                if (!cert_loaded) {
2✔
364
                                        return error::Error(
365
                                                ec.default_error_condition(),
×
366
                                                "Failed to load SSL default directory and server certificate");
×
367
                                }
368
                        }
369
                }
370
        }
371

372
        initialized_ = true;
219✔
373

374
        return error::NoError;
219✔
375
}
3✔
376

377
// Create the HOST header according to:
378
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
379
// In short: Add the port-number if it is non-standard HTTP
380
static string CreateHOSTAddress(OutgoingRequestPtr req) {
330✔
381
        if (req->GetPort() == 80 || req->GetPort() == 443) {
330✔
382
                return req->GetHost();
4✔
383
        }
384
        return req->GetHost() + ":" + to_string(req->GetPort());
652✔
385
}
386

387
error::Error Client::AsyncCall(
342✔
388
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
389
        auto err = Initialize();
342✔
390
        if (err != error::NoError) {
342✔
391
                return err;
4✔
392
        }
393

394
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
338✔
395
                return error::Error(
396
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
397
        }
398

399
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
338✔
400
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
401
        }
402

403
        if (!header_handler || !body_handler) {
336✔
404
                return error::MakeError(
2✔
405
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
1✔
406
        }
407

408
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
335✔
409
                return error::Error(
410
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
411
        }
412

413
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
668✔
414

415
        request_ = req;
416

417
        err = HandleProxySetup();
334✔
418
        if (err != error::NoError) {
334✔
419
                return err;
4✔
420
        }
421

422
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
423
        // request to route to our k8s cluster. Set this in all cases.
424
        const string header_url = CreateHOSTAddress(req);
660✔
425
        req->SetHeader("HOST", header_url);
330✔
426

427
        log::Trace("Setting HOST address: " + header_url);
660✔
428

429
        // Add User-Agent header for all requests
430
        req->SetHeader("User-Agent", "Mender/" MENDER_VERSION);
660✔
431

432
        header_handler_ = header_handler;
330✔
433
        body_handler_ = body_handler;
330✔
434
        status_ = TransactionStatus::None;
330✔
435

436
        cancelled_ = make_shared<bool>(false);
330✔
437

438
        auto &cancelled = cancelled_;
439

440
        resolver_.async_resolve(
330✔
441
                request_->address_.host,
442
                to_string(request_->address_.port),
660✔
443
                [this, cancelled](
986✔
444
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
445
                        if (!*cancelled) {
326✔
446
                                ResolveHandler(ec, results);
325✔
447
                        }
448
                });
326✔
449

450
        return error::NoError;
330✔
451
}
452

453
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
454
        if (proxy_address.username == "") {
22✔
455
                // nothing to do
456
                return error::NoError;
19✔
457
        }
458
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
459
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
460
        if (!ex_dec_username) {
3✔
461
                return ex_dec_username.error();
×
462
        }
463
        if (!ex_dec_password) {
3✔
464
                return ex_dec_password.error();
×
465
        }
466
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
467
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
468
        if (!ex_encoded_creds) {
3✔
469
                return ex_encoded_creds.error();
×
470
        }
471
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
472
        log::Warning(
3✔
473
                "Avoid using basic authentication if possible, and make sure if it's used, it's through HTTPS");
474

475
        return error::NoError;
3✔
476
}
477

478
error::Error Client::HandleProxySetup() {
334✔
479
        secondary_req_.reset();
334✔
480

481
        if (request_->address_.protocol == "http") {
334✔
482
                socket_mode_ = SocketMode::Plain;
308✔
483

484
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
308✔
485
                        // Make a modified proxy request.
486
                        BrokenDownUrl proxy_address;
11✔
487
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
488
                        if (err != error::NoError) {
11✔
489
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
490
                        }
491
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
492
                                return MakeError(
2✔
493
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
1✔
494
                        }
495

496
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
9✔
497
                                                                          + ":" + to_string(request_->address_.port)
18✔
498
                                                                          + request_->address_.path;
27✔
499
                        request_->address_.host = proxy_address.host;
9✔
500
                        request_->address_.port = proxy_address.port;
9✔
501
                        request_->address_.protocol = proxy_address.protocol;
9✔
502

503
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
504
                        if (err != error::NoError) {
9✔
505
                                return err;
×
506
                        }
507

508
                        if (proxy_address.protocol == "https") {
9✔
509
                                socket_mode_ = SocketMode::Tls;
5✔
510
                        } else if (proxy_address.protocol == "http") {
4✔
511
                                socket_mode_ = SocketMode::Plain;
4✔
512
                        } else {
513
                                // Should never get here.
514
                                assert(false);
515
                        }
516
                }
11✔
517
        } else if (request_->address_.protocol == "https") {
26✔
518
                socket_mode_ = SocketMode::Tls;
26✔
519

520
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
521
                        // Save the original request for later, so that we can make a new request
522
                        // over the channel established by CONNECT.
523
                        secondary_req_ = std::move(request_);
524

525
                        request_ = make_shared<OutgoingRequest>();
30✔
526
                        request_->SetMethod(Method::CONNECT);
15✔
527
                        BrokenDownUrl proxy_address;
15✔
528
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
529
                        if (err != error::NoError) {
15✔
530
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
531
                        }
532
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
533
                                return MakeError(
2✔
534
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
1✔
535
                        }
536

537
                        request_->address_.path =
538
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
539
                        request_->address_.host = proxy_address.host;
13✔
540
                        request_->address_.port = proxy_address.port;
13✔
541
                        request_->address_.protocol = proxy_address.protocol;
13✔
542

543
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
544
                        if (err != error::NoError) {
13✔
545
                                return err;
×
546
                        }
547

548
                        if (proxy_address.protocol == "https") {
13✔
549
                                socket_mode_ = SocketMode::Tls;
7✔
550
                        } else if (proxy_address.protocol == "http") {
6✔
551
                                socket_mode_ = SocketMode::Plain;
6✔
552
                        } else {
553
                                // Should never get here.
554
                                assert(false);
555
                        }
556
                }
15✔
557
        } else {
558
                // Should never get here
559
                assert(false);
560
        }
561

562
        return error::NoError;
330✔
563
}
564

565
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
232✔
566
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
232✔
567
                return expected::unexpected(error::Error(
2✔
568
                        make_error_condition(errc::operation_in_progress),
4✔
569
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
570
        }
571

572
        if (GetContentLength(*response_data_.http_response_parser_) == 0
230✔
573
                && !response_data_.http_response_parser_->chunked()) {
230✔
574
                return expected::unexpected(
18✔
575
                        MakeError(BodyMissingError, "Response does not contain a body"));
54✔
576
        }
577

578
        status_ = TransactionStatus::ReaderCreated;
212✔
579
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
424✔
580
}
581

582
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
583
        if (*cancelled_) {
7✔
584
                return expected::unexpected(error::Error(
×
585
                        make_error_condition(errc::not_connected),
×
586
                        "Cannot switch protocols if endpoint is not connected"));
×
587
        }
588

589
        // Rest of the connection is done directly on the socket, we are done here.
590
        status_ = TransactionStatus::Done;
7✔
591
        *cancelled_ = true;
7✔
592
        cancelled_ = make_shared<bool>(false);
14✔
593

594
        auto stream = stream_;
595
        // This no longer belongs to us.
596
        stream_.reset();
7✔
597

598
        switch (socket_mode_) {
7✔
599
        case SocketMode::TlsTls:
×
600
                return make_shared<RawSocket<ssl::stream<ssl::stream<beast::tcp_stream>>>>(
×
601
                        stream, response_data_.response_buffer_);
×
602
        case SocketMode::Tls:
×
603
                return make_shared<RawSocket<ssl::stream<beast::tcp_stream>>>(
×
604
                        make_shared<ssl::stream<beast::tcp_stream>>(std::move(stream->next_layer())),
×
605
                        response_data_.response_buffer_);
×
606
        case SocketMode::Plain:
7✔
607
                return make_shared<RawSocket<beast::tcp_stream>>(
7✔
608
                        make_shared<beast::tcp_stream>(std::move(stream->next_layer().next_layer())),
14✔
609
                        response_data_.response_buffer_);
14✔
610
        }
611

612
        AssertOrReturnUnexpected(false);
×
613
        // This should not happen. It's here to silence compiler warnings.
614
        return expected::unexpected(MakeError(error::ProgrammingError, "Invalid socket mode"));
615
}
616

617
void Client::CallHandler(ResponseHandler handler) {
425✔
618
        // This function exists to make sure we have a copy of the handler we're calling (in the
619
        // argument list). This is important in case the handler owns the client instance through a
620
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
621
        // does, then it destroys the final copy of the handler, and therefore also the client,
622
        // which is why we need to make a copy here, before calling it.
623
        handler(response_);
425✔
624
}
425✔
625

626
void Client::CallErrorHandler(
132✔
627
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
628
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
396✔
629
}
132✔
630

631
void Client::CallErrorHandler(
174✔
632
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
633
        status_ = TransactionStatus::Done;
174✔
634
        DoCancel();
174✔
635
        handler(expected::unexpected(
174✔
636
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
174✔
637
}
174✔
638

639
void Client::ResolveHandler(
325✔
640
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
641
        if (ec) {
325✔
642
                CallErrorHandler(ec, request_, header_handler_);
×
643
                return;
×
644
        }
645

646
        if (logger_.Level() >= log::LogLevel::Debug) {
325✔
647
                string ips = "[";
300✔
648
                string sep;
649
                for (auto r : results) {
924✔
650
                        ips += sep;
324✔
651
                        ips += r.endpoint().address().to_string();
324✔
652
                        sep = ", ";
324✔
653
                }
324✔
654
                ips += "]";
300✔
655
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
600✔
656
        }
657

658
        resolver_results_ = results;
659

660
        stream_ = make_shared<ssl::stream<ssl::stream<beast::tcp_stream>>>(
325✔
661
                ssl::stream<beast::tcp_stream>(
325✔
662
                        beast::tcp_stream(GetAsioIoContext(event_loop_)), ssl_ctx_[0]),
650✔
663
                ssl_ctx_[1]);
325✔
664

665
        if (response_data_.response_buffer_) {
325✔
666
                // We can reuse this if preexisting, just make sure we start with a
667
                // clean state (while avoiding shrinking/discarding the buffer, see
668
                // https://www.boost.org/doc/libs/1_70_0/libs/beast/doc/html/beast/ref/boost__beast__basic_flat_buffer/clear.html
669
                // for details).
670
                // Since there should be no leftover bytes from previous responses, we
671
                // log if there are some, but let's not bother all users with a warning,
672
                // there is nothing they could do about it. However, for
673
                // testing/debugging/CI, it can be useful to have this information.
674
                if (response_data_.response_buffer_->size() > 0) {
117✔
675
                        logger_.Debug(
1✔
676
                                "Leftover data from the previous response! ("
677
                                + to_string(response_data_.response_buffer_->size()) + " bytes)");
2✔
678
                }
679
                response_data_.response_buffer_->clear();
680
        } else {
681
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
416✔
682

683
                // This is equivalent to:
684
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
685
                // but compatible with Boost 1.67.
686
                response_data_.response_buffer_->prepare(
208✔
687
                        body_buffer_.size() - response_data_.response_buffer_->size());
688
        }
689

690
        auto &cancelled = cancelled_;
691

692
        // Set timeout to 5 minutes to ensure we don't hang during async connect
693
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
694
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
695
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
325✔
696

697
        asio::async_connect(
325✔
698
                stream_->lowest_layer(),
699
                resolver_results_,
325✔
700
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
975✔
701
                        if (!*cancelled) {
325✔
702
                                switch (socket_mode_) {
325✔
703
                                case SocketMode::TlsTls:
×
704
                                        // Should never happen because we always need to handshake
705
                                        // the innermost Tls first, then the outermost, but the
706
                                        // latter doesn't happen here.
707
                                        assert(false);
708
                                        CallErrorHandler(
×
709
                                                error::MakeError(
×
710
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
711
                                                request_,
×
712
                                                header_handler_);
×
713
                                case SocketMode::Tls:
21✔
714
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
715
                                case SocketMode::Plain:
304✔
716
                                        return ConnectHandler(ec, endpoint);
304✔
717
                                }
718
                        }
719
                });
720
}
721

722
template <typename StreamType>
723
void Client::HandshakeHandler(
28✔
724
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
725
        if (ec) {
28✔
726
                CallErrorHandler(ec, request_, header_handler_);
2✔
727
                return;
2✔
728
        }
729

730
        // Enable TCP keepalive
731
        boost::asio::socket_base::keep_alive option(true);
732
        stream_->lowest_layer().set_option(option);
26✔
733

734
        // We can't avoid a C style cast on this next line. The usual method by which system headers
735
        // are excluded from warnings doesn't work, because `SSL_set_tlsext_host_name` is a macro,
736
        // containing a cast, which expands here, not in the original file. So just disable the
737
        // warning here.
738
#ifdef __clang__
739
#pragma clang diagnostic push
740
#pragma clang diagnostic ignored "-Wold-style-cast"
741
#else
742
#pragma GCC diagnostic push
743
#pragma GCC diagnostic ignored "-Wold-style-cast"
744
#endif
745
        // Set SNI Hostname (many hosts need this to handshake successfully)
746
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
26✔
747
#ifdef __clang__
748
#pragma clang diagnostic pop
749
#else
750
#pragma GCC diagnostic pop
751
#endif
752
                beast::error_code ec2 {
×
753
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
754
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
755
        }
756

757
        // Enable host name verification (not done automatically and we don't have
758
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
759
        // hence the callback that boost provides).
760
        boost::system::error_code b_ec;
26✔
761
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
52✔
762
        if (b_ec) {
26✔
763
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
764
                CallErrorHandler(b_ec, request_, header_handler_);
×
765
                return;
×
766
        }
767

768
        auto &cancelled = cancelled_;
769

770
        stream.async_handshake(
26✔
771
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
78✔
772
                        if (*cancelled) {
26✔
773
                                return;
774
                        }
775
                        if (ec) {
26✔
776
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
10✔
777
                                CallErrorHandler(ec, request_, header_handler_);
10✔
778
                                return;
10✔
779
                        }
780
                        logger_.Debug("https: Successful SSL handshake");
16✔
781
                        ConnectHandler(ec, endpoint);
16✔
782
                });
783
}
784

785

786
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
320✔
787
        if (ec) {
320✔
788
                CallErrorHandler(ec, request_, header_handler_);
14✔
789
                return;
14✔
790
        }
791

792
        // Enable TCP keepalive
793
        boost::asio::socket_base::keep_alive option(true);
794
        stream_->lowest_layer().set_option(option);
306✔
795

796
        logger_.Debug("Connected to " + endpoint.address().to_string());
612✔
797

798
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
612✔
799
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
612✔
800

801
        for (const auto &header : request_->headers_) {
1,130✔
802
                request_data_.http_request_->set(header.first, header.second);
824✔
803
        }
804

805
        request_data_.http_request_serializer_ =
806
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
306✔
807

808
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
612✔
809

810
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
811
        // if they do, they should be handled higher up in the application logic.
812
        //
813
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
814
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
815
        // `has_value()` in their code, causing their subsequent comparison operation to
816
        // misbehave. So pass highest possible value instead.
817
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
818

819
        auto &cancelled = cancelled_;
820
        auto &request_data = request_data_;
306✔
821

822
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
306✔
823
                if (!*cancelled) {
306✔
824
                        WriteHeaderHandler(ec, num_written);
306✔
825
                }
826
        };
306✔
827

828
        // Set timeout to 5 minutes to ensure we don't hang during async write
829
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
830
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
831
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
306✔
832

833
        switch (socket_mode_) {
306✔
834
        case SocketMode::TlsTls:
2✔
835
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
836
                break;
837
        case SocketMode::Tls:
14✔
838
                http::async_write_header(
14✔
839
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
840
                break;
841
        case SocketMode::Plain:
290✔
842
                http::async_write_header(
290✔
843
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
844
                break;
845
        }
846
}
306✔
847

848
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
306✔
849
        if (num_written > 0) {
306✔
850
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
612✔
851
        }
852

853
        if (ec) {
306✔
854
                CallErrorHandler(ec, request_, header_handler_);
×
855
                return;
262✔
856
        }
857

858
        auto exp_has_body =
859
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
918✔
860
        if (!exp_has_body) {
306✔
861
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
862
                return;
×
863
        }
864
        if (!exp_has_body.value()) {
306✔
865
                ReadHeader();
261✔
866
                return;
867
        }
868

869
        if (!request_->body_gen_ && !request_->async_body_gen_) {
45✔
870
                auto err = MakeError(BodyMissingError, "No body generator");
1✔
871
                CallErrorHandler(err, request_, header_handler_);
2✔
872
                return;
873
        }
874

875
        assert(!(request_->body_gen_ && request_->async_body_gen_));
876

877
        if (request_->body_gen_) {
44✔
878
                auto body_reader = request_->body_gen_();
38✔
879
                if (!body_reader) {
38✔
880
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
881
                        return;
882
                }
883
                request_->body_reader_ = body_reader.value();
38✔
884
        } else {
885
                auto body_reader = request_->async_body_gen_();
6✔
886
                if (!body_reader) {
6✔
887
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
888
                        return;
889
                }
890
                request_->async_body_reader_ = body_reader.value();
6✔
891
        }
892

893
        PrepareAndWriteNewBodyBuffer();
44✔
894
}
895

896
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,290✔
897
        if (num_written > 0) {
2,290✔
898
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,250✔
899
        }
900

901
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,290✔
902
                // Write next block of the body.
903
                PrepareAndWriteNewBodyBuffer();
1,124✔
904
        } else if (ec) {
1,166✔
905
                CallErrorHandler(ec, request_, header_handler_);
8✔
906
        } else if (num_written > 0) {
1,162✔
907
                // We are still writing the body.
908
                WriteBody();
1,125✔
909
        } else {
910
                // We are ready to receive the response.
911
                ReadHeader();
37✔
912
        }
913
}
2,290✔
914

915
void Client::PrepareAndWriteNewBodyBuffer() {
1,168✔
916
        // request_->body_reader_ XOR request_->async_body_reader_
917
        assert(
918
                (request_->body_reader_ || request_->async_body_reader_)
919
                && !(request_->body_reader_ && request_->async_body_reader_));
920

921
        auto cancelled = cancelled_;
922
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,608✔
923
                if (!*cancelled) {
1,168✔
924
                        if (!read) {
1,167✔
925
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
926
                                return;
2✔
927
                        }
928
                        WriteNewBodyBuffer(read.value());
1,165✔
929
                }
930
        };
1,168✔
931

932

933
        if (request_->body_reader_) {
1,168✔
934
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,486✔
935
        } else {
936
                auto err = request_->async_body_reader_->AsyncRead(
425✔
937
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
425✔
938
                if (err != error::NoError) {
425✔
939
                        CallErrorHandler(err, request_, header_handler_);
×
940
                }
941
        }
942
}
1,168✔
943

944
void Client::WriteNewBodyBuffer(size_t size) {
1,165✔
945
        request_data_.http_request_->body().data = body_buffer_.data();
1,165✔
946
        request_data_.http_request_->body().size = size;
1,165✔
947

948
        if (size > 0) {
1,165✔
949
                request_data_.http_request_->body().more = true;
1,128✔
950
        } else {
951
                // Release ownership of Body reader.
952
                request_->body_reader_.reset();
37✔
953
                request_->async_body_reader_.reset();
37✔
954
                request_data_.http_request_->body().more = false;
37✔
955
        }
956

957
        WriteBody();
1,165✔
958
}
1,165✔
959

960
void Client::WriteBody() {
2,290✔
961
        auto &cancelled = cancelled_;
962
        auto &request_data = request_data_;
2,290✔
963

964
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,290✔
965
                if (!*cancelled) {
2,290✔
966
                        WriteBodyHandler(ec, num_written);
2,290✔
967
                }
968
        };
2,290✔
969

970
        // Set timeout to 5 minutes to ensure we don't hang during async write
971
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
972
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
973
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
2,290✔
974

975
        switch (socket_mode_) {
2,290✔
976
        case SocketMode::TlsTls:
×
977
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
978
                break;
979
        case SocketMode::Tls:
×
980
                http::async_write_some(
981
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
982
                break;
983
        case SocketMode::Plain:
2,290✔
984
                http::async_write_some(
985
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
986
                break;
987
        }
988
}
2,290✔
989

990
void Client::ReadHeader() {
298✔
991
        auto &cancelled = cancelled_;
992
        auto &response_data = response_data_;
298✔
993

994
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
295✔
995
                if (!*cancelled) {
295✔
996
                        ReadHeaderHandler(ec, num_read);
295✔
997
                }
998
        };
298✔
999

1000
        // Set timeout to 5 minutes to ensure we don't hang during async read
1001
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
1002
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
1003
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
298✔
1004

1005
        switch (socket_mode_) {
298✔
1006
        case SocketMode::TlsTls:
2✔
1007
                http::async_read_some(
2✔
1008
                        *stream_,
1009
                        *response_data_.response_buffer_,
1010
                        *response_data_.http_response_parser_,
1011
                        handler);
1012
                break;
1013
        case SocketMode::Tls:
14✔
1014
                http::async_read_some(
14✔
1015
                        stream_->next_layer(),
1016
                        *response_data_.response_buffer_,
1017
                        *response_data_.http_response_parser_,
1018
                        handler);
1019
                break;
1020
        case SocketMode::Plain:
282✔
1021
                http::async_read_some(
282✔
1022
                        stream_->next_layer().next_layer(),
1023
                        *response_data_.response_buffer_,
1024
                        *response_data_.http_response_parser_,
1025
                        handler);
1026
                break;
1027
        }
1028
}
298✔
1029

1030
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
295✔
1031
        if (num_read > 0) {
295✔
1032
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
580✔
1033
        }
1034

1035
        if (ec) {
295✔
1036
                CallErrorHandler(ec, request_, header_handler_);
5✔
1037
                return;
66✔
1038
        }
1039

1040
        if (!response_data_.http_response_parser_->is_header_done()) {
290✔
1041
                ReadHeader();
×
1042
                return;
×
1043
        }
1044

1045
        if (secondary_req_) {
290✔
1046
                HandleSecondaryRequest();
9✔
1047
                return;
9✔
1048
        }
1049

1050
        response_.reset(new IncomingResponse(*this, cancelled_));
843✔
1051
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
281✔
1052
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
281✔
1053

1054
        logger_.Debug(
562✔
1055
                "Received response: " + to_string(response_->status_code_) + " "
281✔
1056
                + response_->status_message_);
843✔
1057

1058
        string debug_str;
1059
        for (auto header = response_data_.http_response_parser_->get().cbegin();
1060
                 header != response_data_.http_response_parser_->get().cend();
645✔
1061
                 header++) {
1062
                response_->headers_[string {header->name_string()}] = string {header->value()};
1,092✔
1063
                if (logger_.Level() >= log::LogLevel::Debug) {
364✔
1064
                        debug_str += string {header->name_string()};
346✔
1065
                        debug_str += ": ";
346✔
1066
                        debug_str += string {header->value()};
346✔
1067
                        debug_str += "\n";
346✔
1068
                }
1069
        }
1070

1071
        logger_.Debug("Received headers:\n" + debug_str);
562✔
1072
        debug_str.clear();
1073

1074
        if (GetContentLength(*response_data_.http_response_parser_) == 0
281✔
1075
                && !response_data_.http_response_parser_->chunked()) {
281✔
1076
                auto cancelled = cancelled_;
1077
                status_ = TransactionStatus::HeaderHandlerCalled;
49✔
1078
                CallHandler(header_handler_);
98✔
1079
                if (!*cancelled) {
49✔
1080
                        status_ = TransactionStatus::Done;
44✔
1081
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
44✔
1082
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1083
                                // is meant to be reused.
1084
                                DoCancel();
40✔
1085
                        }
1086
                        CallHandler(body_handler_);
88✔
1087
                }
1088
                return;
1089
        }
1090

1091
        auto cancelled = cancelled_;
1092
        status_ = TransactionStatus::HeaderHandlerCalled;
232✔
1093
        CallHandler(header_handler_);
464✔
1094
        if (*cancelled) {
232✔
1095
                return;
1096
        }
1097

1098
        // We know that a body reader is required here, because of the check for body above.
1099
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
229✔
1100
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1101
        }
1102
}
1103

1104
void Client::HandleSecondaryRequest() {
9✔
1105
        logger_.Debug(
18✔
1106
                "Received proxy response: "
1107
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1108
                + string {response_data_.http_response_parser_->get().reason()});
27✔
1109

1110
        request_ = std::move(secondary_req_);
1111

1112
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1113
                auto err = MakeError(
1114
                        ProxyError,
1115
                        "Proxy returned unexpected response: "
1116
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1117
                                + string {response_data_.http_response_parser_->get().reason()});
4✔
1118
                CallErrorHandler(err, request_, header_handler_);
4✔
1119
                return;
1120
        }
1121

1122
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1123
                || response_data_.http_response_parser_->chunked()) {
7✔
1124
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
1125
                CallErrorHandler(err, request_, header_handler_);
×
1126
                return;
1127
        }
1128

1129
        // We are connected. Now repeat the request cycle with the original request. Pretend
1130
        // we were just connected.
1131

1132
        assert(request_->GetProtocol() == "https");
1133

1134
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1135
        // a different layer, this will get out of sync.
1136
        assert(response_data_.response_buffer_->size() == 0);
1137

1138
        switch (socket_mode_) {
7✔
1139
        case SocketMode::TlsTls:
×
1140
                // Should never get here, because this is the only place where TlsTls mode
1141
                // is supposed to be turned on.
1142
                assert(false);
1143
                CallErrorHandler(
×
1144
                        error::MakeError(
×
1145
                                error::ProgrammingError,
1146
                                "Any other mode than Tls is not valid when handling secondary request"),
1147
                        request_,
×
1148
                        header_handler_);
×
1149
                break;
×
1150
        case SocketMode::Tls:
3✔
1151
                // Upgrade to TLS inside TLS.
1152
                socket_mode_ = SocketMode::TlsTls;
3✔
1153
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1154
                break;
3✔
1155
        case SocketMode::Plain:
4✔
1156
                // Upgrade to TLS.
1157
                socket_mode_ = SocketMode::Tls;
4✔
1158
                HandshakeHandler(
4✔
1159
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1160
                break;
4✔
1161
        }
1162
}
1163

1164
void Client::AsyncReadNextBodyPart(
5,040✔
1165
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1166
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1167

1168
        if (status_ == TransactionStatus::ReaderCreated) {
5,040✔
1169
                status_ = TransactionStatus::BodyReadingInProgress;
210✔
1170
        }
1171

1172
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
5,040✔
1173
                auto cancelled = cancelled_;
1174
                handler(0);
200✔
1175
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
100✔
1176
                        status_ = TransactionStatus::Done;
100✔
1177
                        DoCancel();
100✔
1178
                        CallHandler(body_handler_);
200✔
1179
                }
1180
                return;
1181
        }
1182

1183
        reader_buf_start_ = start;
4,940✔
1184
        reader_buf_end_ = end;
4,940✔
1185
        reader_handler_ = handler;
4,940✔
1186
        size_t read_size = end - start;
4,940✔
1187
        size_t smallest = min(body_buffer_.size(), read_size);
1188

1189
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,940✔
1190
        response_data_.http_response_parser_->get().body().size = smallest;
4,940✔
1191
        response_data_.last_buffer_size_ = smallest;
4,940✔
1192

1193
        auto &cancelled = cancelled_;
1194
        auto &response_data = response_data_;
4,940✔
1195

1196
        // Set timeout to 5 minutes to ensure we don't hang during async read
1197
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
1198
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
1199
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
4,940✔
1200

1201
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,939✔
1202
                if (!*cancelled) {
4,939✔
1203
                        ReadBodyHandler(ec, num_read);
4,939✔
1204
                }
1205
        };
4,940✔
1206

1207
        switch (socket_mode_) {
4,940✔
1208
        case SocketMode::TlsTls:
2✔
1209
                http::async_read_some(
2✔
1210
                        *stream_,
1211
                        *response_data_.response_buffer_,
1212
                        *response_data_.http_response_parser_,
1213
                        async_handler);
1214
                break;
1215
        case SocketMode::Tls:
4✔
1216
                http::async_read_some(
4✔
1217
                        stream_->next_layer(),
1218
                        *response_data_.response_buffer_,
1219
                        *response_data_.http_response_parser_,
1220
                        async_handler);
1221
                break;
1222
        case SocketMode::Plain:
4,934✔
1223
                http::async_read_some(
4,934✔
1224
                        stream_->next_layer().next_layer(),
1225
                        *response_data_.response_buffer_,
1226
                        *response_data_.http_response_parser_,
1227
                        async_handler);
1228
                break;
1229
        }
1230
}
4,940✔
1231

1232
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,939✔
1233
        if (num_read > 0) {
4,939✔
1234
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
9,676✔
1235
        }
1236

1237
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,939✔
1238
                // This can be ignored. We always reset the buffer between reads anyway.
1239
                ec = error_code();
1,958✔
1240
        }
1241

1242
        assert(reader_handler_);
1243

1244
        if (response_data_.http_response_parser_->is_done()) {
4,939✔
1245
                status_ = TransactionStatus::BodyReadingFinished;
106✔
1246
        }
1247

1248
        auto cancelled = cancelled_;
1249

1250
        if (ec) {
4,939✔
1251
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
202✔
1252
                reader_handler_(expected::unexpected(err));
303✔
1253
                if (!*cancelled) {
101✔
1254
                        CallErrorHandler(ec, request_, body_handler_);
194✔
1255
                }
1256
                return;
1257
        }
1258

1259
        // The num_read from above includes out of band payload data, such as chunk headers, which
1260
        // we are not interested in. So we need to calculate the payload size from the remaining
1261
        // buffer space.
1262
        size_t payload_read =
1263
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,838✔
1264

1265
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,838✔
1266
        size_t smallest = min(payload_read, buf_size);
1267

1268
        if (smallest == 0) {
4,838✔
1269
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1270
                // return 0 to the handler however, because in `io::Reader` context this means
1271
                // EOF. So just repeat the request instead, until we get actual payload data.
1272
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
462✔
1273
        } else {
1274
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
4,607✔
1275
                reader_handler_(smallest);
9,214✔
1276
        }
1277
}
1278

1279
void Client::Cancel() {
258✔
1280
        auto cancelled = cancelled_;
1281

1282
        if (!*cancelled) {
258✔
1283
                auto err =
1284
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
40✔
1285
                switch (status_) {
20✔
1286
                case TransactionStatus::None:
3✔
1287
                        CallErrorHandler(err, request_, header_handler_);
3✔
1288
                        break;
3✔
1289
                case TransactionStatus::HeaderHandlerCalled:
16✔
1290
                case TransactionStatus::ReaderCreated:
1291
                case TransactionStatus::BodyReadingInProgress:
1292
                case TransactionStatus::BodyReadingFinished:
1293
                        CallErrorHandler(err, request_, body_handler_);
16✔
1294
                        break;
16✔
1295
                case TransactionStatus::Replying:
1296
                case TransactionStatus::SwitchingProtocol:
1297
                        // Not used by client.
1298
                        assert(false);
1299
                        break;
1300
                case TransactionStatus::BodyHandlerCalled:
1301
                case TransactionStatus::Done:
1302
                        break;
1303
                }
1304
        }
1305

1306
        if (!*cancelled) {
258✔
1307
                DoCancel();
1✔
1308
        }
1309
}
258✔
1310

1311
void Client::DoCancel() {
681✔
1312
        resolver_.cancel();
681✔
1313
        if (stream_) {
681✔
1314
                beast::error_code ec;
318✔
1315
                stream_->lowest_layer().cancel(ec);
318✔
1316
                stream_->lowest_layer().close(ec);
318✔
1317
                stream_.reset();
318✔
1318
        }
1319

1320
        // Reset logger to no connection.
1321
        logger_ = log::Logger(logger_name_);
681✔
1322

1323
        // Set cancel state and then make a new one. Those who are interested should have their own
1324
        // pointer to the old one.
1325
        *cancelled_ = true;
681✔
1326
        cancelled_ = make_shared<bool>(true);
681✔
1327
}
681✔
1328

1329
Stream::Stream(Server &server) :
553✔
1330
        server_ {server},
553✔
1331
        logger_ {"http"},
1,106✔
1332
        cancelled_(make_shared<bool>(true)),
553✔
1333
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
553✔
1334
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,106✔
1335
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
1,106✔
1336

1337
        // This is equivalent to:
1338
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1339
        // but compatible with Boost 1.67.
1340
        request_data_.request_buffer_->prepare(
553✔
1341
                body_buffer_.size() - request_data_.request_buffer_->size());
1342

1343
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
1,106✔
1344

1345
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1346
        // they do, they should be handled higher up in the application logic.
1347
        //
1348
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1349
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1350
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1351
        // possible value instead.
1352
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1353
}
553✔
1354

1355
Stream::~Stream() {
553✔
1356
        DoCancel();
553✔
1357
}
1,659✔
1358

1359
void Stream::Cancel() {
7✔
1360
        auto cancelled = cancelled_;
1361

1362
        if (!*cancelled) {
7✔
1363
                auto err =
1364
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1365
                switch (status_) {
7✔
1366
                case TransactionStatus::None:
×
1367
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1368
                        break;
×
1369
                case TransactionStatus::HeaderHandlerCalled:
5✔
1370
                case TransactionStatus::ReaderCreated:
1371
                case TransactionStatus::BodyReadingInProgress:
1372
                case TransactionStatus::BodyReadingFinished:
1373
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1374
                        break;
5✔
1375
                case TransactionStatus::BodyHandlerCalled:
×
1376
                        // In between body handler and reply finished. No one to handle the status
1377
                        // here.
1378
                        server_.RemoveStream(shared_from_this());
×
1379
                        break;
×
1380
                case TransactionStatus::Replying:
1✔
1381
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1382
                        break;
1✔
1383
                case TransactionStatus::SwitchingProtocol:
1✔
1384
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1385
                        break;
1✔
1386
                case TransactionStatus::Done:
1387
                        break;
1388
                }
1389
        }
1390

1391
        if (!*cancelled) {
7✔
1392
                DoCancel();
×
1393
        }
1394
}
7✔
1395

1396
void Stream::DoCancel() {
999✔
1397
        if (socket_.is_open()) {
999✔
1398
                socket_.cancel();
277✔
1399
                socket_.close();
277✔
1400
        }
1401

1402
        // Set cancel state and then make a new one. Those who are interested should have their own
1403
        // pointer to the old one.
1404
        *cancelled_ = true;
999✔
1405
        cancelled_ = make_shared<bool>(true);
999✔
1406
}
999✔
1407

1408
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1409
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1410
}
×
1411

1412
void Stream::CallErrorHandler(
×
1413
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1414
        status_ = TransactionStatus::Done;
×
1415
        DoCancel();
×
1416
        handler(expected::unexpected(err.WithContext(
×
1417
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1418

1419
        server_.RemoveStream(shared_from_this());
×
1420
}
×
1421

1422
void Stream::CallErrorHandler(
2✔
1423
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1424
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1425
}
2✔
1426

1427
void Stream::CallErrorHandler(
8✔
1428
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1429
        status_ = TransactionStatus::Done;
8✔
1430
        DoCancel();
8✔
1431
        handler(
16✔
1432
                req,
1433
                err.WithContext(
8✔
1434
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
16✔
1435

1436
        server_.RemoveStream(shared_from_this());
8✔
1437
}
8✔
1438

1439
void Stream::CallErrorHandler(
4✔
1440
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1441
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1442
}
4✔
1443

1444
void Stream::CallErrorHandler(
7✔
1445
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1446
        status_ = TransactionStatus::Done;
7✔
1447
        DoCancel();
7✔
1448
        handler(err.WithContext(
7✔
1449
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1450

1451
        server_.RemoveStream(shared_from_this());
7✔
1452
}
7✔
1453

1454
void Stream::CallErrorHandler(
×
1455
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1456
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1457
}
×
1458

1459
void Stream::CallErrorHandler(
1✔
1460
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1461
        status_ = TransactionStatus::Done;
1✔
1462
        DoCancel();
1✔
1463
        handler(expected::unexpected(err.WithContext(
2✔
1464
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
2✔
1465

1466
        server_.RemoveStream(shared_from_this());
1✔
1467
}
1✔
1468

1469
void Stream::AcceptHandler(const error_code &ec) {
285✔
1470
        if (ec) {
285✔
1471
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1472
                return;
×
1473
        }
1474

1475
        auto ip = socket_.remote_endpoint().address().to_string();
285✔
1476

1477
        // Use IP as context for logging.
1478
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
570✔
1479

1480
        logger_.Debug("Accepted connection.");
285✔
1481

1482
        request_.reset(new IncomingRequest(*this, cancelled_));
855✔
1483

1484
        request_->address_.host = ip;
285✔
1485

1486
        *cancelled_ = false;
285✔
1487

1488
        ReadHeader();
285✔
1489
}
1490

1491
void Stream::ReadHeader() {
285✔
1492
        auto &cancelled = cancelled_;
1493
        auto &request_data = request_data_;
285✔
1494

1495
        http::async_read_some(
285✔
1496
                socket_,
285✔
1497
                *request_data_.request_buffer_,
1498
                *request_data_.http_request_parser_,
1499
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
855✔
1500
                        if (!*cancelled) {
285✔
1501
                                ReadHeaderHandler(ec, num_read);
285✔
1502
                        }
1503
                });
285✔
1504
}
285✔
1505

1506
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
285✔
1507
        if (num_read > 0) {
285✔
1508
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
570✔
1509
        }
1510

1511
        if (ec) {
285✔
1512
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1513
                return;
241✔
1514
        }
1515

1516
        if (!request_data_.http_request_parser_->is_header_done()) {
285✔
1517
                ReadHeader();
×
1518
                return;
×
1519
        }
1520

1521
        auto method_result = BeastVerbToMethod(
1522
                request_data_.http_request_parser_->get().base().method(),
1523
                string {request_data_.http_request_parser_->get().base().method_string()});
570✔
1524
        if (!method_result) {
285✔
1525
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1526
                return;
×
1527
        }
1528
        request_->method_ = method_result.value();
285✔
1529
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
285✔
1530

1531
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
570✔
1532

1533
        string debug_str;
1534
        for (auto header = request_data_.http_request_parser_->get().cbegin();
1535
                 header != request_data_.http_request_parser_->get().cend();
1,088✔
1536
                 header++) {
1537
                request_->headers_[string {header->name_string()}] = string {header->value()};
2,409✔
1538
                if (logger_.Level() >= log::LogLevel::Debug) {
803✔
1539
                        debug_str += string {header->name_string()};
697✔
1540
                        debug_str += ": ";
697✔
1541
                        debug_str += string {header->value()};
697✔
1542
                        debug_str += "\n";
697✔
1543
                }
1544
        }
1545

1546
        logger_.Debug("Received headers:\n" + debug_str);
570✔
1547
        debug_str.clear();
1548

1549
        if (GetContentLength(*request_data_.http_request_parser_) == 0
285✔
1550
                && !request_data_.http_request_parser_->chunked()) {
285✔
1551
                auto cancelled = cancelled_;
1552
                status_ = TransactionStatus::HeaderHandlerCalled;
240✔
1553
                server_.header_handler_(request_);
480✔
1554
                if (!*cancelled) {
240✔
1555
                        status_ = TransactionStatus::BodyHandlerCalled;
240✔
1556
                        CallBodyHandler();
240✔
1557
                }
1558
                return;
1559
        }
1560

1561
        assert(!request_data_.http_request_parser_->is_done());
1562

1563
        auto cancelled = cancelled_;
1564
        status_ = TransactionStatus::HeaderHandlerCalled;
45✔
1565
        server_.header_handler_(request_);
90✔
1566
        if (*cancelled) {
45✔
1567
                return;
1568
        }
1569

1570
        // We know that a body reader is required here, because of the check for body above.
1571
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
44✔
1572
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1573
        }
1574
}
1575

1576
void Stream::AsyncReadNextBodyPart(
2,272✔
1577
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1578
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1579

1580
        if (status_ == TransactionStatus::ReaderCreated) {
2,272✔
1581
                status_ = TransactionStatus::BodyReadingInProgress;
43✔
1582
        }
1583

1584
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,272✔
1585
                auto cancelled = cancelled_;
1586
                handler(0);
74✔
1587
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
37✔
1588
                        status_ = TransactionStatus::BodyHandlerCalled;
37✔
1589
                        CallBodyHandler();
37✔
1590
                }
1591
                return;
1592
        }
1593

1594
        reader_buf_start_ = start;
2,235✔
1595
        reader_buf_end_ = end;
2,235✔
1596
        reader_handler_ = handler;
2,235✔
1597
        size_t read_size = end - start;
2,235✔
1598
        size_t smallest = min(body_buffer_.size(), read_size);
1599

1600
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,235✔
1601
        request_data_.http_request_parser_->get().body().size = smallest;
2,235✔
1602
        request_data_.last_buffer_size_ = smallest;
2,235✔
1603

1604
        auto &cancelled = cancelled_;
1605
        auto &request_data = request_data_;
2,235✔
1606

1607
        http::async_read_some(
2,235✔
1608
                socket_,
2,235✔
1609
                *request_data_.request_buffer_,
1610
                *request_data_.http_request_parser_,
1611
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
6,705✔
1612
                        if (!*cancelled) {
2,235✔
1613
                                ReadBodyHandler(ec, num_read);
2,235✔
1614
                        }
1615
                });
2,235✔
1616
}
1617

1618
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,235✔
1619
        if (num_read > 0) {
2,235✔
1620
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,462✔
1621
        }
1622

1623
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,235✔
1624
                // This can be ignored. We always reset the buffer between reads anyway.
1625
                ec = error_code();
979✔
1626
        }
1627

1628
        assert(reader_handler_);
1629

1630
        if (request_data_.http_request_parser_->is_done()) {
2,235✔
1631
                status_ = TransactionStatus::BodyReadingFinished;
37✔
1632
        }
1633

1634
        auto cancelled = cancelled_;
1635

1636
        if (ec) {
2,235✔
1637
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1638
                reader_handler_(expected::unexpected(err));
12✔
1639
                if (!*cancelled) {
4✔
1640
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1641
                }
1642
                return;
1643
        }
1644

1645
        // The num_read from above includes out of band payload data, such as chunk headers, which
1646
        // we are not interested in. So we need to calculate the payload size from the remaining
1647
        // buffer space.
1648
        size_t payload_read =
1649
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,231✔
1650

1651
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,231✔
1652
        size_t smallest = min(payload_read, buf_size);
1653

1654
        if (smallest == 0) {
2,231✔
1655
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1656
                // return 0 to the handler however, because in `io::Reader` context this means
1657
                // EOF. So just repeat the request instead, until we get actual payload data.
1658
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1659
        } else {
1660
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,154✔
1661
                reader_handler_(smallest);
4,308✔
1662
        }
1663
}
1664

1665
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
262✔
1666
        SetupResponse();
262✔
1667

1668
        reply_finished_handler_ = reply_finished_handler;
262✔
1669

1670
        auto &cancelled = cancelled_;
1671
        auto &response_data = response_data_;
262✔
1672

1673
        http::async_write_header(
262✔
1674
                socket_,
262✔
1675
                *response_data_.http_response_serializer_,
1676
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
786✔
1677
                        if (!*cancelled) {
262✔
1678
                                WriteHeaderHandler(ec, num_written);
261✔
1679
                        }
1680
                });
262✔
1681
}
262✔
1682

1683
void Stream::SetupResponse() {
271✔
1684
        auto response = maybe_response_.lock();
271✔
1685
        // Only called from existing responses, so this should always be true.
1686
        assert(response);
1687

1688
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1689
        status_ = TransactionStatus::Replying;
271✔
1690

1691
        // From here on we take shared ownership.
1692
        response_ = response;
1693

1694
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
542✔
1695

1696
        for (const auto &header : response->headers_) {
607✔
1697
                response_data_.http_response_->base().set(header.first, header.second);
336✔
1698
        }
1699

1700
        response_data_.http_response_->result(response->GetStatusCode());
271✔
1701
        response_data_.http_response_->reason(response->GetStatusMessage());
542✔
1702

1703
        response_data_.http_response_serializer_ =
1704
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
542✔
1705
}
271✔
1706

1707
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
261✔
1708
        if (num_written > 0) {
261✔
1709
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
522✔
1710
        }
1711

1712
        if (ec) {
261✔
1713
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1714
                return;
38✔
1715
        }
1716

1717
        auto exp_has_body =
1718
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
783✔
1719
        if (!exp_has_body) {
261✔
1720
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
1721
                return;
×
1722
        }
1723
        if (!exp_has_body.value()) {
261✔
1724
                FinishReply();
37✔
1725
                return;
1726
        }
1727

1728
        if (!response_->body_reader_ && !response_->async_body_reader_) {
224✔
1729
                auto err = MakeError(BodyMissingError, "No body reader");
1✔
1730
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1731
                return;
1732
        }
1733

1734
        PrepareAndWriteNewBodyBuffer();
223✔
1735
}
1736

1737
void Stream::PrepareAndWriteNewBodyBuffer() {
2,542✔
1738
        // response_->body_reader_ XOR response_->async_body_reader_
1739
        assert(
1740
                (response_->body_reader_ || response_->async_body_reader_)
1741
                && !(response_->body_reader_ && response_->async_body_reader_));
1742

1743
        auto read_handler = [this](io::ExpectedSize read) {
2,542✔
1744
                if (!read) {
2,542✔
1745
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
2✔
1746
                        return;
1✔
1747
                }
1748
                WriteNewBodyBuffer(read.value());
2,541✔
1749
        };
2,542✔
1750

1751
        if (response_->body_reader_) {
2,542✔
1752
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
4,536✔
1753
        } else {
1754
                auto err = response_->async_body_reader_->AsyncRead(
274✔
1755
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1756
                if (err != error::NoError) {
274✔
1757
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1758
                }
1759
        }
1760
}
2,542✔
1761

1762
void Stream::WriteNewBodyBuffer(size_t size) {
2,541✔
1763
        response_data_.http_response_->body().data = body_buffer_.data();
2,541✔
1764
        response_data_.http_response_->body().size = size;
2,541✔
1765

1766
        if (size > 0) {
2,541✔
1767
                response_data_.http_response_->body().more = true;
2,367✔
1768
        } else {
1769
                response_data_.http_response_->body().more = false;
174✔
1770
        }
1771

1772
        WriteBody();
2,541✔
1773
}
2,541✔
1774

1775
void Stream::WriteBody() {
4,879✔
1776
        auto &cancelled = cancelled_;
1777
        auto &response_data = response_data_;
4,879✔
1778

1779
        http::async_write_some(
1780
                socket_,
4,879✔
1781
                *response_data_.http_response_serializer_,
1782
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
14,591✔
1783
                        if (!*cancelled) {
4,833✔
1784
                                WriteBodyHandler(ec, num_written);
4,823✔
1785
                        }
1786
                });
4,833✔
1787
}
4,879✔
1788

1789
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,823✔
1790
        if (num_written > 0) {
4,823✔
1791
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
4,676✔
1792
        }
1793

1794
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,823✔
1795
                // Write next body block.
1796
                PrepareAndWriteNewBodyBuffer();
2,319✔
1797
        } else if (ec) {
2,504✔
1798
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1799
        } else if (num_written > 0) {
2,500✔
1800
                // We are still writing the body.
1801
                WriteBody();
2,338✔
1802
        } else {
1803
                // We are finished.
1804
                FinishReply();
162✔
1805
        }
1806
}
4,823✔
1807

1808
void Stream::FinishReply() {
199✔
1809
        // We are done.
1810
        status_ = TransactionStatus::Done;
199✔
1811
        DoCancel();
199✔
1812
        // Release ownership of Body reader.
1813
        response_->body_reader_.reset();
199✔
1814
        response_->async_body_reader_.reset();
199✔
1815
        reply_finished_handler_(error::NoError);
199✔
1816
        server_.RemoveStream(shared_from_this());
199✔
1817
}
199✔
1818

1819
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1820
        SetupResponse();
9✔
1821

1822
        switch_protocol_handler_ = handler;
9✔
1823
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1824

1825
        auto &cancelled = cancelled_;
1826
        auto &response_data = response_data_;
9✔
1827

1828
        http::async_write_header(
9✔
1829
                socket_,
9✔
1830
                *response_data_.http_response_serializer_,
1831
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
27✔
1832
                        if (!*cancelled) {
9✔
1833
                                SwitchingProtocolHandler(ec, num_written);
8✔
1834
                        }
1835
                });
9✔
1836

1837
        return error::NoError;
9✔
1838
}
1839

1840
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1841
        if (num_written > 0) {
8✔
1842
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1843
        }
1844

1845
        if (ec) {
8✔
1846
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1847
                return;
×
1848
        }
1849

1850
        auto socket = make_shared<RawSocket<tcp::socket>>(
1851
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1852

1853
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1854

1855
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1856
        *cancelled_ = true;
8✔
1857
        cancelled_ = make_shared<bool>(true);
8✔
1858
        server_.RemoveStream(shared_from_this());
16✔
1859

1860
        switch_protocol_handler(socket);
16✔
1861
}
1862

1863
void Stream::CallBodyHandler() {
277✔
1864
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1865
        // it immediately destroys, which would destroy this stream as well. At the end of this
1866
        // function, it's ok to destroy it.
1867
        auto stream_ref = shared_from_this();
1868

1869
        server_.body_handler_(request_, error::NoError);
831✔
1870

1871
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1872
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1873
        // request has not been handled correctly.
1874
        auto response = maybe_response_.lock();
277✔
1875
        if (!response) {
277✔
1876
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1877
                *cancelled_ = true;
3✔
1878
                cancelled_ = make_shared<bool>(true);
3✔
1879
                server_.RemoveStream(shared_from_this());
9✔
1880
        }
1881
}
277✔
1882

1883
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
504✔
1884
        event_loop_ {event_loop},
252✔
1885
        acceptor_(GetAsioIoContext(event_loop_)) {
252✔
1886
}
504✔
1887

1888
Server::~Server() {
515✔
1889
        Cancel();
252✔
1890
}
515✔
1891

1892
error::Error Server::AsyncServeUrl(
263✔
1893
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1894
        return AsyncServeUrl(
526✔
1895
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
1,311✔
1896
                        if (err != error::NoError) {
272✔
1897
                                body_handler(expected::unexpected(err));
12✔
1898
                        } else {
1899
                                body_handler(req);
532✔
1900
                        }
1901
                });
798✔
1902
}
1903

1904
error::Error Server::AsyncServeUrl(
278✔
1905
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1906
        auto err = BreakDownUrl(url, address_);
278✔
1907
        if (error::NoError != err) {
278✔
1908
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1909
        }
1910

1911
        if (address_.protocol != "http") {
278✔
1912
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1913
        }
1914

1915
        if (address_.path.size() > 0 && address_.path != "/") {
278✔
1916
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1917
        }
1918

1919
        boost::system::error_code ec;
277✔
1920
        auto address = asio::ip::make_address(address_.host, ec);
277✔
1921
        if (ec) {
277✔
1922
                return error::Error(
1923
                        ec.default_error_condition(),
×
1924
                        "Could not construct endpoint from address " + address_.host);
×
1925
        }
1926

1927
        asio::ip::tcp::endpoint endpoint(address, address_.port);
277✔
1928

1929
        ec.clear();
1930
        acceptor_.open(endpoint.protocol(), ec);
277✔
1931
        if (ec) {
277✔
1932
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
18✔
1933
        }
1934

1935
        // Allow address reuse, otherwise we can't re-bind later.
1936
        ec.clear();
1937
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
268✔
1938
        if (ec) {
268✔
1939
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1940
        }
1941

1942
        ec.clear();
1943
        acceptor_.bind(endpoint, ec);
268✔
1944
        if (ec) {
268✔
1945
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1946
        }
1947

1948
        ec.clear();
1949
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
268✔
1950
        if (ec) {
268✔
1951
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1952
        }
1953

1954
        header_handler_ = header_handler;
268✔
1955
        body_handler_ = body_handler;
268✔
1956

1957
        PrepareNewStream();
268✔
1958

1959
        return error::NoError;
268✔
1960
}
1961

1962
void Server::Cancel() {
323✔
1963
        if (acceptor_.is_open()) {
323✔
1964
                acceptor_.cancel();
268✔
1965
                acceptor_.close();
268✔
1966
        }
1967
        streams_.clear();
1968
}
323✔
1969

1970
uint16_t Server::GetPort() const {
17✔
1971
        return acceptor_.local_endpoint().port();
17✔
1972
}
1973

1974
string Server::GetUrl() const {
16✔
1975
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1976
}
1977

1978
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
276✔
1979
        if (*req->cancelled_) {
276✔
1980
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1981
        }
1982
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
828✔
1983
        req->stream_.maybe_response_ = response;
276✔
1984
        return response;
276✔
1985
}
1986

1987
error::Error Server::AsyncReply(
262✔
1988
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1989
        if (*resp->cancelled_) {
262✔
1990
                return MakeError(StreamCancelledError, "Cannot send response");
×
1991
        }
1992

1993
        resp->stream_.AsyncReply(reply_finished_handler);
262✔
1994
        return error::NoError;
262✔
1995
}
1996

1997
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
62✔
1998
        if (*req->cancelled_) {
62✔
1999
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
2000
        }
2001

2002
        auto &stream = req->stream_;
62✔
2003
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
62✔
2004
                return expected::unexpected(error::Error(
1✔
2005
                        make_error_condition(errc::operation_in_progress),
2✔
2006
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
2007
        }
2008

2009
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
61✔
2010
                && !stream.request_data_.http_request_parser_->chunked()) {
61✔
2011
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
2012
        }
2013

2014
        stream.status_ = TransactionStatus::ReaderCreated;
43✔
2015
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
86✔
2016
}
2017

2018
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
2019
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
2020
}
2021

2022
void Server::PrepareNewStream() {
553✔
2023
        StreamPtr new_stream {new Stream(*this)};
553✔
2024
        streams_.insert(new_stream);
2025
        AsyncAccept(new_stream);
1,106✔
2026
}
553✔
2027

2028
void Server::AsyncAccept(StreamPtr stream) {
553✔
2029
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
882✔
2030
                if (ec) {
329✔
2031
                        if (ec != errc::operation_canceled) {
44✔
2032
                                log::Error("Could not accept connection: " + ec.message());
×
2033
                        }
2034
                        return;
44✔
2035
                }
2036

2037
                stream->AcceptHandler(ec);
285✔
2038

2039
                this->PrepareNewStream();
285✔
2040
        });
2041
}
553✔
2042

2043
void Server::RemoveStream(StreamPtr stream) {
231✔
2044
        streams_.erase(stream);
231✔
2045

2046
        stream->DoCancel();
231✔
2047
}
231✔
2048

2049
} // namespace http
2050
} // namespace common
2051
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc