• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 2314337760

03 Feb 2026 07:39PM UTC coverage: 75.808% (-0.1%) from 75.944%
2314337760

push

gitlab-ci

web-flow
Merge pull request #1893 from mendersoftware/cherry-5.0.x-master

[Cherry 5.0.x]: fix: add Host header to proxy HTTP CONNECT request

2 of 2 new or added lines in 1 file covered. (100.0%)

402 existing lines in 16 files now uncovered.

7508 of 9904 relevant lines covered (75.81%)

13703.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
namespace mender {
28
namespace common {
29
namespace http {
30

31
namespace common = mender::common;
32
namespace crypto = mender::common::crypto;
33

34
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
35
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
36
const unsigned int BeastHttpVersion = 11;
37

38
namespace asio = boost::asio;
39
namespace http = boost::beast::http;
40

41
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
42

43
static http::verb MethodToBeastVerb(Method method) {
249✔
44
        switch (method) {
249✔
45
        case Method::GET:
46
                return http::verb::get;
47
        case Method::HEAD:
48
                return http::verb::head;
49
        case Method::POST:
50
                return http::verb::post;
51
        case Method::PUT:
52
                return http::verb::put;
53
        case Method::PATCH:
54
                return http::verb::patch;
55
        case Method::CONNECT:
56
                return http::verb::connect;
57
        case Method::Invalid:
58
                // Fallthrough to end (no-op).
59
                break;
60
        }
61
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
62
        // still assert here for safety.
63
        assert(false);
64
        return http::verb::get;
65
}
66

67
static expected::expected<Method, error::Error> BeastVerbToMethod(
228✔
68
        http::verb verb, const string &verb_string) {
69
        switch (verb) {
228✔
70
        case http::verb::get:
192✔
71
                return Method::GET;
72
        case http::verb::head:
×
73
                return Method::HEAD;
74
        case http::verb::post:
13✔
75
                return Method::POST;
76
        case http::verb::put:
23✔
77
                return Method::PUT;
78
        case http::verb::patch:
×
79
                return Method::PATCH;
80
        case http::verb::connect:
×
81
                return Method::CONNECT;
82
        default:
×
83
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
84
        }
85
}
86

87
template <typename StreamType>
88
class BodyAsyncReader : virtual public io::AsyncReader {
89
public:
90
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
156✔
91
                stream_ {stream},
92
                cancelled_ {cancelled} {
312✔
93
        }
156✔
94
        ~BodyAsyncReader() {
39✔
95
                Cancel();
39✔
96
        }
78✔
97

98
        error::Error AsyncRead(
2,187✔
99
                vector<uint8_t>::iterator start,
100
                vector<uint8_t>::iterator end,
101
                io::AsyncIoHandler handler) override {
102
                if (eof_) {
2,187✔
103
                        handler(0);
×
104
                        return error::NoError;
×
105
                }
106

107
                if (*cancelled_) {
2,187✔
108
                        return error::MakeError(
×
109
                                error::ProgrammingError,
110
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
111
                }
112
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
13,726✔
113
                        if (size && size.value() == 0) {
6,618✔
114
                                eof_ = true;
128✔
115
                        }
116
                        handler(size);
13,236✔
117
                });
118
                return error::NoError;
2,187✔
119
        }
120

121
        void Cancel() override {
39✔
122
                if (!*cancelled_) {
39✔
123
                        stream_.Cancel();
4✔
124
                }
125
        }
39✔
126

127
private:
128
        StreamType &stream_;
129
        shared_ptr<bool> cancelled_;
130
        bool eof_ {false};
131

132
        friend class Client;
133
        friend class Server;
134
};
135

136
template <typename StreamType>
137
class RawSocket : virtual public io::AsyncReadWriter {
138
public:
139
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
140
                destroying_ {make_shared<bool>(false)},
×
141
                stream_ {stream},
142
                buffered_ {buffered} {
×
143
                // If there are no buffered bytes, then we don't need it.
144
                if (buffered_ && buffered_->size() == 0) {
×
145
                        buffered_.reset();
×
146
                }
147
        }
×
148

149
        ~RawSocket() {
8✔
150
                *destroying_ = true;
8✔
151
                Cancel();
8✔
152
        }
16✔
153

154
        error::Error AsyncRead(
159✔
155
                vector<uint8_t>::iterator start,
156
                vector<uint8_t>::iterator end,
157
                io::AsyncIoHandler handler) override {
158
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
159
                // header and parts of the body in one block, return those first.
160
                if (buffered_) {
159✔
UNCOV
161
                        return DrainPrebufferedData(start, end, handler);
×
162
                }
163

164
                read_buffer_ = asio::buffer(&*start, end - start);
159✔
165
                auto &destroying = destroying_;
166
                stream_->async_read_some(
318✔
167
                        read_buffer_,
159✔
168
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
157✔
169
                                if (*destroying) {
313✔
170
                                        return;
171
                                }
172

173
                                if (ec == asio::error::operation_aborted) {
313✔
174
                                        handler(expected::unexpected(error::Error(
12✔
175
                                                make_error_condition(errc::operation_canceled),
6✔
176
                                                "Could not read from socket")));
177
                                } else if (ec) {
310✔
178
                                        handler(expected::unexpected(
12✔
179
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
180
                                } else {
181
                                        handler(num_read);
608✔
182
                                }
183
                        });
184
                return error::NoError;
159✔
185
        }
186

187
        error::Error AsyncWrite(
156✔
188
                vector<uint8_t>::const_iterator start,
189
                vector<uint8_t>::const_iterator end,
190
                io::AsyncIoHandler handler) override {
191
                write_buffer_ = asio::buffer(&*start, end - start);
156✔
192
                auto &destroying = destroying_;
193
                stream_->async_write_some(
312✔
194
                        write_buffer_,
156✔
195
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
154✔
196
                                if (*destroying) {
306✔
197
                                        return;
198
                                }
199

200
                                if (ec == asio::error::operation_aborted) {
306✔
201
                                        handler(expected::unexpected(error::Error(
×
202
                                                make_error_condition(errc::operation_canceled),
×
203
                                                "Could not write to socket")));
204
                                } else if (ec) {
306✔
205
                                        handler(expected::unexpected(
×
206
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
207
                                } else {
208
                                        handler(num_written);
612✔
209
                                }
210
                        });
211
                return error::NoError;
156✔
212
        }
213

214
        void Cancel() override {
15✔
215
                // close() is sufficient:
216
                // https://www.boost.org/doc/libs/latest/doc/html/boost_asio/reference/basic_stream_socket/close/overload1.html
217
                beast::close_socket(beast::get_lowest_layer(*stream_));
218
        }
15✔
219

220
private:
UNCOV
221
        error::Error DrainPrebufferedData(
×
222
                vector<uint8_t>::iterator start,
223
                vector<uint8_t>::iterator end,
224
                io::AsyncIoHandler handler) {
UNCOV
225
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
×
226

227
                // These two lines are equivalent to:
228
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
229
                // but compatible with Boost 1.67.
230
                const beast::flat_buffer &cbuffered = *buffered_;
UNCOV
231
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
×
UNCOV
232
                buffered_->consume(to_copy);
×
UNCOV
233
                if (buffered_->size() == 0) {
×
234
                        // We don't need it anymore.
UNCOV
235
                        buffered_.reset();
×
236
                }
UNCOV
237
                handler(to_copy);
×
UNCOV
238
                return error::NoError;
×
239
        }
240

241
        shared_ptr<bool> destroying_;
242
        shared_ptr<StreamType> stream_;
243
        shared_ptr<beast::flat_buffer> buffered_;
244
        asio::mutable_buffer read_buffer_;
245
        asio::const_buffer write_buffer_;
246
};
247

248
template <typename PARSER>
249
int64_t GetContentLength(const PARSER &parser) {
404✔
250
        auto content_length = parser.content_length();
404✔
251
        if (content_length) {
404✔
252
                return content_length.value();
353✔
253
        } else {
254
                return 0;
255
        }
256
}
257

258
expected::ExpectedBool HasBody(
453✔
259
        const expected::ExpectedString &content_length,
260
        const expected::ExpectedString &transfer_encoding) {
261
        if (transfer_encoding) {
453✔
262
                if (transfer_encoding.value() != "chunked") {
4✔
UNCOV
263
                        return expected::unexpected(error::Error(
×
264
                                make_error_condition(errc::not_supported),
×
265
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
266
                }
267
                return true;
268
        }
269

270
        if (content_length) {
449✔
271
                auto length = common::StringToLongLong(content_length.value());
220✔
272
                if (!length || length.value() < 0) {
220✔
UNCOV
273
                        return expected::unexpected(error::Error(
×
274
                                length.error().code,
UNCOV
275
                                "Content-Length contains invalid number: " + content_length.value()));
×
276
                }
277
                return length.value() > 0;
220✔
278
        }
279

280
        return false;
281
}
282

283
Client::Client(
358✔
284
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
285
        event_loop_ {event_loop},
286
        logger_name_ {logger_name},
287
        client_config_ {client},
288
        http_proxy_ {client.http_proxy},
358✔
289
        https_proxy_ {client.https_proxy},
358✔
290
        no_proxy_ {client.no_proxy},
358✔
UNCOV
291
        cancelled_ {make_shared<bool>(true)},
×
292
        resolver_(GetAsioIoContext(event_loop)),
293
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,432✔
294
}
358✔
295

296
Client::~Client() {
2,148✔
297
        if (!*cancelled_) {
358✔
298
                logger_.Warning("Client destroyed while request is still active!");
30✔
299
        }
300
        DoCancel();
358✔
301
}
358✔
302

303
error::Error Client::Initialize() {
287✔
304
        if (initialized_) {
287✔
305
                return error::NoError;
72✔
306
        }
307

308
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
637✔
309
                ssl_ctx_[i].set_verify_mode(
850✔
310
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
311

312
                beast::error_code ec {};
426✔
313
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
426✔
314
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
315
                        ssl_ctx_[i].use_certificate_file(
316
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
317
                        if (ec) {
4✔
318
                                return error::Error(
319
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
320
                        }
321
                        auto exp_key = crypto::PrivateKey::Load(
322
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
323
                        if (!exp_key) {
3✔
324
                                return exp_key.error().WithContext(
325
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
326
                        }
327

328
                        const int ret =
329
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value().Get());
2✔
330
                        if (ret != 1) {
2✔
331
                                return MakeError(
332
                                        HTTPInitError,
UNCOV
333
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
UNCOV
334
                                                + " to the SSL CTX");
×
335
                        }
336
                } else if (
337
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
422✔
338
                        return error::Error(
339
                                make_error_condition(errc::invalid_argument),
4✔
340
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
341
                }
342

343
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
422✔
344
                if (ec) {
422✔
345
                        auto err = error::Error(
UNCOV
346
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
UNCOV
347
                        if (client_config_.server_cert_path == "") {
×
348
                                // We aren't going to have any valid certificates then.
349
                                return err;
×
350
                        } else {
351
                                // We have a dedicated certificate, so this is not fatal.
UNCOV
352
                                log::Info(err.String());
×
353
                        }
354
                }
355
                if (client_config_.server_cert_path != "") {
422✔
356
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
357
                        if (ec) {
52✔
358
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
359
                        }
360
                }
361
        }
362

363
        initialized_ = true;
211✔
364

365
        return error::NoError;
211✔
366
}
367

368
// Create the HOST header according to:
369
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
370
// In short: Add the port-number if it is non-standard HTTP
371
static string CreateHOSTAddress(OutgoingRequestPtr req) {
288✔
372
        if (req->GetPort() == 80 || req->GetPort() == 443) {
288✔
373
                return req->GetHost();
4✔
374
        }
375
        return req->GetHost() + ":" + to_string(req->GetPort());
568✔
376
}
377

378
error::Error Client::AsyncCall(
287✔
379
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
380
        auto err = Initialize();
287✔
381
        if (err != error::NoError) {
287✔
382
                return err;
4✔
383
        }
384

385
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
283✔
386
                return error::Error(
UNCOV
387
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
388
        }
389

390
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
283✔
391
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
392
        }
393

394
        if (!header_handler || !body_handler) {
281✔
395
                return error::MakeError(
396
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
397
        }
398

399
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
280✔
400
                return error::Error(
401
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
402
        }
403

404
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
279✔
405

406
        request_ = req;
407

408
        err = HandleProxySetup();
279✔
409
        if (err != error::NoError) {
279✔
410
                return err;
4✔
411
        }
412

413
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
414
        // request to route to our k8s cluster. Set this in all cases.
415
        const string header_url = CreateHOSTAddress(req);
550✔
416
        req->SetHeader("HOST", header_url);
550✔
417

418
        log::Trace("Setting HOST address: " + header_url);
275✔
419

420
        header_handler_ = header_handler;
275✔
421
        body_handler_ = body_handler;
275✔
422
        status_ = TransactionStatus::None;
275✔
423

424
        cancelled_ = make_shared<bool>(false);
275✔
425

426
        auto &cancelled = cancelled_;
427

428
        resolver_.async_resolve(
550✔
429
                request_->address_.host,
430
                to_string(request_->address_.port),
550✔
431
                [this, cancelled](
546✔
432
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
270✔
433
                        if (!*cancelled) {
271✔
434
                                ResolveHandler(ec, results);
270✔
435
                        }
436
                });
271✔
437

438
        return error::NoError;
275✔
439
}
440

441
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
442
        if (proxy_address.username == "") {
22✔
443
                // nothing to do
444
                return error::NoError;
19✔
445
        }
446
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
447
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
448
        if (!ex_dec_username) {
3✔
UNCOV
449
                return ex_dec_username.error();
×
450
        }
451
        if (!ex_dec_password) {
3✔
UNCOV
452
                return ex_dec_password.error();
×
453
        }
454
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
455
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
456
        if (!ex_encoded_creds) {
3✔
UNCOV
457
                return ex_encoded_creds.error();
×
458
        }
459
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
460

461
        return error::NoError;
3✔
462
}
463

464
error::Error Client::HandleProxySetup() {
279✔
465
        secondary_req_.reset();
279✔
466

467
        if (request_->address_.protocol == "http") {
279✔
468
                socket_mode_ = SocketMode::Plain;
253✔
469

470
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
253✔
471
                        // Make a modified proxy request.
472
                        BrokenDownUrl proxy_address;
20✔
473
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
474
                        if (err != error::NoError) {
11✔
475
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
476
                        }
477
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
478
                                return MakeError(
479
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
480
                        }
481

482
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
483
                                                                          + ":" + to_string(request_->address_.port)
27✔
484
                                                                          + request_->address_.path;
27✔
485
                        request_->address_.host = proxy_address.host;
9✔
486
                        request_->address_.port = proxy_address.port;
9✔
487
                        request_->address_.protocol = proxy_address.protocol;
9✔
488

489
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
490
                        if (err != error::NoError) {
9✔
UNCOV
491
                                return err;
×
492
                        }
493

494
                        if (proxy_address.protocol == "https") {
9✔
495
                                socket_mode_ = SocketMode::Tls;
5✔
496
                        } else if (proxy_address.protocol == "http") {
4✔
497
                                socket_mode_ = SocketMode::Plain;
4✔
498
                        } else {
499
                                // Should never get here.
500
                                assert(false);
501
                        }
502
                }
503
        } else if (request_->address_.protocol == "https") {
26✔
504
                socket_mode_ = SocketMode::Tls;
26✔
505

506
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
507
                        // Save the original request for later, so that we can make a new request
508
                        // over the channel established by CONNECT.
509
                        secondary_req_ = std::move(request_);
510

511
                        request_ = make_shared<OutgoingRequest>();
30✔
512
                        request_->SetMethod(Method::CONNECT);
15✔
513
                        BrokenDownUrl proxy_address;
28✔
514
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
515
                        if (err != error::NoError) {
15✔
516
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
517
                        }
518
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
519
                                return MakeError(
520
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
521
                        }
522

523
                        request_->address_.path =
524
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
525
                        request_->address_.host = proxy_address.host;
13✔
526
                        request_->address_.port = proxy_address.port;
13✔
527
                        request_->address_.protocol = proxy_address.protocol;
13✔
528

529
                        // Set Host header for CONNECT request - required by some proxies (RFC 7230)
530
                        const string header_url = CreateHOSTAddress(request_);
26✔
531
                        request_->SetHeader("HOST", header_url);
26✔
532

533
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
534
                        if (err != error::NoError) {
13✔
UNCOV
535
                                return err;
×
536
                        }
537

538
                        if (proxy_address.protocol == "https") {
13✔
539
                                socket_mode_ = SocketMode::Tls;
7✔
540
                        } else if (proxy_address.protocol == "http") {
6✔
541
                                socket_mode_ = SocketMode::Plain;
6✔
542
                        } else {
543
                                // Should never get here.
544
                                assert(false);
545
                        }
546
                }
547
        } else {
548
                // Should never get here
549
                assert(false);
550
        }
551

552
        return error::NoError;
275✔
553
}
554

555
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
175✔
556
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
175✔
557
                return expected::unexpected(error::Error(
2✔
558
                        make_error_condition(errc::operation_in_progress),
4✔
559
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
560
        }
561

562
        if (GetContentLength(*response_data_.http_response_parser_) == 0
173✔
563
                && !response_data_.http_response_parser_->chunked()) {
173✔
564
                return expected::unexpected(
17✔
565
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
566
        }
567

568
        status_ = TransactionStatus::ReaderCreated;
156✔
569
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
312✔
570
}
571

572
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
573
        if (*cancelled_) {
7✔
UNCOV
574
                return expected::unexpected(error::Error(
×
UNCOV
575
                        make_error_condition(errc::not_connected),
×
576
                        "Cannot switch protocols if endpoint is not connected"));
×
577
        }
578

579
        // Rest of the connection is done directly on the socket, we are done here.
580
        status_ = TransactionStatus::Done;
7✔
581
        *cancelled_ = true;
7✔
582
        cancelled_ = make_shared<bool>(false);
14✔
583

584
        auto stream = stream_;
585
        // This no longer belongs to us.
586
        stream_.reset();
7✔
587

588
        switch (socket_mode_) {
7✔
UNCOV
589
        case SocketMode::TlsTls:
×
UNCOV
590
                return make_shared<RawSocket<ssl::stream<ssl::stream<beast::tcp_stream>>>>(
×
591
                        stream, response_data_.response_buffer_);
×
592
        case SocketMode::Tls:
×
593
                return make_shared<RawSocket<ssl::stream<beast::tcp_stream>>>(
×
594
                        make_shared<ssl::stream<beast::tcp_stream>>(std::move(stream->next_layer())),
×
595
                        response_data_.response_buffer_);
×
596
        case SocketMode::Plain:
7✔
597
                return make_shared<RawSocket<beast::tcp_stream>>(
7✔
598
                        make_shared<beast::tcp_stream>(std::move(stream->next_layer().next_layer())),
14✔
599
                        response_data_.response_buffer_);
7✔
600
        }
601

UNCOV
602
        AssertOrReturnUnexpected(false);
×
603
        // This should not happen. It's here to silence compiler warnings.
604
        return expected::unexpected(MakeError(error::ProgrammingError, "Invalid socket mode"));
605
}
606

607
void Client::CallHandler(ResponseHandler handler) {
362✔
608
        // This function exists to make sure we have a copy of the handler we're calling (in the
609
        // argument list). This is important in case the handler owns the client instance through a
610
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
611
        // does, then it destroys the final copy of the handler, and therefore also the client,
612
        // which is why we need to make a copy here, before calling it.
613
        handler(response_);
362✔
614
}
362✔
615

616
void Client::CallErrorHandler(
83✔
617
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
618
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
249✔
619
}
83✔
620

621
void Client::CallErrorHandler(
125✔
622
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
623
        status_ = TransactionStatus::Done;
125✔
624
        DoCancel();
125✔
625
        handler(expected::unexpected(
250✔
626
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
500✔
627
}
125✔
628

629
void Client::ResolveHandler(
270✔
630
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
631
        if (ec) {
270✔
632
                CallErrorHandler(ec, request_, header_handler_);
×
633
                return;
×
634
        }
635

636
        if (logger_.Level() >= log::LogLevel::Debug) {
270✔
637
                string ips = "[";
248✔
638
                string sep;
639
                for (auto r : results) {
1,040✔
640
                        ips += sep;
272✔
641
                        ips += r.endpoint().address().to_string();
272✔
642
                        sep = ", ";
272✔
643
                }
644
                ips += "]";
248✔
645
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
496✔
646
        }
647

648
        resolver_results_ = results;
649

650
        stream_ = make_shared<ssl::stream<ssl::stream<beast::tcp_stream>>>(
270✔
651
                ssl::stream<beast::tcp_stream>(
270✔
652
                        beast::tcp_stream(GetAsioIoContext(event_loop_)), ssl_ctx_[0]),
540✔
653
                ssl_ctx_[1]);
540✔
654

655
        if (response_data_.response_buffer_) {
270✔
656
                // We can reuse this if preexisting, just make sure we start with a
657
                // clean state (while avoiding shrinking/discarding the buffer, see
658
                // https://www.boost.org/doc/libs/1_70_0/libs/beast/doc/html/beast/ref/boost__beast__basic_flat_buffer/clear.html
659
                // for details).
660
                // Since there should be no leftover bytes from previous responses, we
661
                // log if there are some, but let's not bother all users with a warning,
662
                // there is nothing they could do about it. However, for
663
                // testing/debugging/CI, it can be useful to have this information.
664
                if (response_data_.response_buffer_->size() > 0) {
70✔
665
                        logger_.Debug(
1✔
666
                                "Leftover data from the previous response! ("
667
                                + to_string(response_data_.response_buffer_->size()) + " bytes)");
2✔
668
                }
669
                response_data_.response_buffer_->clear();
670
        } else {
671
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
400✔
672

673
                // This is equivalent to:
674
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
675
                // but compatible with Boost 1.67.
676
                response_data_.response_buffer_->prepare(
677
                        body_buffer_.size() - response_data_.response_buffer_->size());
200✔
678
        }
679

680
        auto &cancelled = cancelled_;
681

682
        // Set timeout to 5 minutes to ensure we don't hang during async connect
683
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
684
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
685
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
270✔
686

687
        asio::async_connect(
270✔
688
                stream_->lowest_layer(),
689
                resolver_results_,
270✔
690
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
540✔
691
                        if (!*cancelled) {
270✔
692
                                switch (socket_mode_) {
270✔
693
                                case SocketMode::TlsTls:
×
694
                                        // Should never happen because we always need to handshake
695
                                        // the innermost Tls first, then the outermost, but the
696
                                        // latter doesn't happen here.
697
                                        assert(false);
UNCOV
698
                                        CallErrorHandler(
×
UNCOV
699
                                                error::MakeError(
×
UNCOV
700
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
UNCOV
701
                                                request_,
×
UNCOV
702
                                                header_handler_);
×
703
                                case SocketMode::Tls:
21✔
704
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
705
                                case SocketMode::Plain:
249✔
706
                                        return ConnectHandler(ec, endpoint);
249✔
707
                                }
708
                        }
709
                });
710
}
711

712
template <typename StreamType>
713
void Client::HandshakeHandler(
25✔
714
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
715
        if (ec) {
25✔
716
                CallErrorHandler(ec, request_, header_handler_);
2✔
717
                return;
2✔
718
        }
719

720
        // Enable TCP keepalive
721
        boost::asio::socket_base::keep_alive option(true);
722
        stream_->lowest_layer().set_option(option);
23✔
723

724
        // We can't avoid a C style cast on this next line. The usual method by which system headers
725
        // are excluded from warnings doesn't work, because `SSL_set_tlsext_host_name` is a macro,
726
        // containing a cast, which expands here, not in the original file. So just disable the
727
        // warning here.
728
#ifdef __clang__
729
#pragma clang diagnostic push
730
#pragma clang diagnostic ignored "-Wold-style-cast"
731
#else
732
#pragma GCC diagnostic push
733
#pragma GCC diagnostic ignored "-Wold-style-cast"
734
#endif
735
        // Set SNI Hostname (many hosts need this to handshake successfully)
736
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
737
#ifdef __clang__
738
#pragma clang diagnostic pop
739
#else
740
#pragma GCC diagnostic pop
741
#endif
UNCOV
742
                beast::error_code ec2 {
×
UNCOV
743
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
UNCOV
744
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
745
        }
746

747
        // Enable host name verification (not done automatically and we don't have
748
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
749
        // hence the callback that boost provides).
750
        boost::system::error_code b_ec;
23✔
751
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
752
        if (b_ec) {
23✔
UNCOV
753
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
UNCOV
754
                CallErrorHandler(b_ec, request_, header_handler_);
×
UNCOV
755
                return;
×
756
        }
757

758
        auto &cancelled = cancelled_;
759

760
        stream.async_handshake(
46✔
761
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
762
                        if (*cancelled) {
26✔
763
                                return;
764
                        }
765
                        if (ec) {
26✔
766
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
767
                                CallErrorHandler(ec, request_, header_handler_);
10✔
768
                                return;
10✔
769
                        }
770
                        logger_.Debug("https: Successful SSL handshake");
32✔
771
                        ConnectHandler(ec, endpoint);
16✔
772
                });
773
}
774

775

776
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
265✔
777
        if (ec) {
265✔
778
                CallErrorHandler(ec, request_, header_handler_);
16✔
779
                return;
16✔
780
        }
781

782
        // Enable TCP keepalive
783
        boost::asio::socket_base::keep_alive option(true);
784
        stream_->lowest_layer().set_option(option);
249✔
785

786
        logger_.Debug("Connected to " + endpoint.address().to_string());
498✔
787

788
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
249✔
789
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
498✔
790

791
        for (const auto &header : request_->headers_) {
661✔
792
                request_data_.http_request_->set(header.first, header.second);
412✔
793
        }
794

795
        request_data_.http_request_serializer_ =
796
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
249✔
797

798
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
498✔
799

800
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
801
        // if they do, they should be handled higher up in the application logic.
802
        //
803
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
804
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
805
        // `has_value()` in their code, causing their subsequent comparison operation to
806
        // misbehave. So pass highest possible value instead.
807
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
808

809
        auto &cancelled = cancelled_;
810
        auto &request_data = request_data_;
249✔
811

812
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
249✔
813
                if (!*cancelled) {
249✔
814
                        WriteHeaderHandler(ec, num_written);
249✔
815
                }
816
        };
498✔
817

818
        // Set timeout to 5 minutes to ensure we don't hang during async write
819
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
820
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
821
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
249✔
822

823
        switch (socket_mode_) {
249✔
824
        case SocketMode::TlsTls:
2✔
825
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
826
                break;
827
        case SocketMode::Tls:
14✔
828
                http::async_write_header(
14✔
829
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
830
                break;
831
        case SocketMode::Plain:
233✔
832
                http::async_write_header(
233✔
833
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
834
                break;
835
        }
836
}
837

838
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
249✔
839
        if (num_written > 0) {
249✔
840
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
498✔
841
        }
842

843
        if (ec) {
249✔
UNCOV
844
                CallErrorHandler(ec, request_, header_handler_);
×
845
                return;
209✔
846
        }
847

848
        auto exp_has_body =
849
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
498✔
850
        if (!exp_has_body) {
249✔
UNCOV
851
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
UNCOV
852
                return;
×
853
        }
854
        if (!exp_has_body.value()) {
249✔
855
                ReadHeader();
208✔
856
                return;
857
        }
858

859
        if (!request_->body_gen_ && !request_->async_body_gen_) {
41✔
860
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
861
                CallErrorHandler(err, request_, header_handler_);
2✔
862
                return;
863
        }
864

865
        assert(!(request_->body_gen_ && request_->async_body_gen_));
866

867
        if (request_->body_gen_) {
40✔
868
                auto body_reader = request_->body_gen_();
34✔
869
                if (!body_reader) {
34✔
UNCOV
870
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
871
                        return;
872
                }
873
                request_->body_reader_ = body_reader.value();
34✔
874
        } else {
875
                auto body_reader = request_->async_body_gen_();
6✔
876
                if (!body_reader) {
6✔
UNCOV
877
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
878
                        return;
879
                }
880
                request_->async_body_reader_ = body_reader.value();
6✔
881
        }
882

883
        PrepareAndWriteNewBodyBuffer();
40✔
884
}
885

886
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,280✔
887
        if (num_written > 0) {
2,280✔
888
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,244✔
889
        }
890

891
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,280✔
892
                // Write next block of the body.
893
                PrepareAndWriteNewBodyBuffer();
1,121✔
894
        } else if (ec) {
1,159✔
895
                CallErrorHandler(ec, request_, header_handler_);
8✔
896
        } else if (num_written > 0) {
1,155✔
897
                // We are still writing the body.
898
                WriteBody();
1,122✔
899
        } else {
900
                // We are ready to receive the response.
901
                ReadHeader();
33✔
902
        }
903
}
2,280✔
904

905
void Client::PrepareAndWriteNewBodyBuffer() {
1,161✔
906
        // request_->body_reader_ XOR request_->async_body_reader_
907
        assert(
908
                (request_->body_reader_ || request_->async_body_reader_)
909
                && !(request_->body_reader_ && request_->async_body_reader_));
910

911
        auto cancelled = cancelled_;
912
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,594✔
913
                if (!*cancelled) {
1,161✔
914
                        if (!read) {
1,160✔
915
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
916
                                return;
2✔
917
                        }
918
                        WriteNewBodyBuffer(read.value());
1,158✔
919
                }
920
        };
1,161✔
921

922

923
        if (request_->body_reader_) {
1,161✔
924
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,472✔
925
        } else {
926
                auto err = request_->async_body_reader_->AsyncRead(
927
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
928
                if (err != error::NoError) {
425✔
UNCOV
929
                        CallErrorHandler(err, request_, header_handler_);
×
930
                }
931
        }
932
}
1,161✔
933

934
void Client::WriteNewBodyBuffer(size_t size) {
1,158✔
935
        request_data_.http_request_->body().data = body_buffer_.data();
1,158✔
936
        request_data_.http_request_->body().size = size;
1,158✔
937

938
        if (size > 0) {
1,158✔
939
                request_data_.http_request_->body().more = true;
1,125✔
940
        } else {
941
                // Release ownership of Body reader.
942
                request_->body_reader_.reset();
33✔
943
                request_->async_body_reader_.reset();
33✔
944
                request_data_.http_request_->body().more = false;
33✔
945
        }
946

947
        WriteBody();
1,158✔
948
}
1,158✔
949

950
void Client::WriteBody() {
2,280✔
951
        auto &cancelled = cancelled_;
952
        auto &request_data = request_data_;
2,280✔
953

954
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,280✔
955
                if (!*cancelled) {
2,280✔
956
                        WriteBodyHandler(ec, num_written);
2,280✔
957
                }
958
        };
4,560✔
959

960
        // Set timeout to 5 minutes to ensure we don't hang during async write
961
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
962
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
963
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
2,280✔
964

965
        switch (socket_mode_) {
2,280✔
UNCOV
966
        case SocketMode::TlsTls:
×
967
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
968
                break;
UNCOV
969
        case SocketMode::Tls:
×
970
                http::async_write_some(
971
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
972
                break;
973
        case SocketMode::Plain:
2,280✔
974
                http::async_write_some(
975
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
976
                break;
977
        }
978
}
2,280✔
979

980
void Client::ReadHeader() {
241✔
981
        auto &cancelled = cancelled_;
982
        auto &response_data = response_data_;
241✔
983

984
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
238✔
985
                if (!*cancelled) {
238✔
986
                        ReadHeaderHandler(ec, num_read);
238✔
987
                }
988
        };
482✔
989

990
        // Set timeout to 5 minutes to ensure we don't hang during async read
991
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
992
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
993
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
241✔
994

995
        switch (socket_mode_) {
241✔
996
        case SocketMode::TlsTls:
2✔
997
                http::async_read_some(
2✔
998
                        *stream_,
999
                        *response_data_.response_buffer_,
1000
                        *response_data_.http_response_parser_,
1001
                        handler);
1002
                break;
1003
        case SocketMode::Tls:
14✔
1004
                http::async_read_some(
14✔
1005
                        stream_->next_layer(),
1006
                        *response_data_.response_buffer_,
1007
                        *response_data_.http_response_parser_,
1008
                        handler);
1009
                break;
1010
        case SocketMode::Plain:
225✔
1011
                http::async_read_some(
225✔
1012
                        stream_->next_layer().next_layer(),
1013
                        *response_data_.response_buffer_,
1014
                        *response_data_.http_response_parser_,
1015
                        handler);
1016
                break;
1017
        }
1018
}
241✔
1019

1020
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
238✔
1021
        if (num_read > 0) {
238✔
1022
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
466✔
1023
        }
1024

1025
        if (ec) {
238✔
1026
                CallErrorHandler(ec, request_, header_handler_);
5✔
1027
                return;
65✔
1028
        }
1029

1030
        if (!response_data_.http_response_parser_->is_header_done()) {
233✔
UNCOV
1031
                ReadHeader();
×
UNCOV
1032
                return;
×
1033
        }
1034

1035
        if (secondary_req_) {
233✔
1036
                HandleSecondaryRequest();
9✔
1037
                return;
9✔
1038
        }
1039

1040
        response_.reset(new IncomingResponse(*this, cancelled_));
448✔
1041
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
224✔
1042
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
224✔
1043

1044
        logger_.Debug(
448✔
1045
                "Received response: " + to_string(response_->status_code_) + " "
448✔
1046
                + response_->status_message_);
672✔
1047

1048
        string debug_str;
1049
        for (auto header = response_data_.http_response_parser_->get().cbegin();
259✔
1050
                 header != response_data_.http_response_parser_->get().cend();
483✔
1051
                 header++) {
1052
                response_->headers_[string {header->name_string()}] = string {header->value()};
777✔
1053
                if (logger_.Level() >= log::LogLevel::Debug) {
259✔
1054
                        debug_str += string {header->name_string()};
244✔
1055
                        debug_str += ": ";
244✔
1056
                        debug_str += string {header->value()};
244✔
1057
                        debug_str += "\n";
244✔
1058
                }
1059
        }
1060

1061
        logger_.Debug("Received headers:\n" + debug_str);
448✔
1062
        debug_str.clear();
1063

1064
        if (GetContentLength(*response_data_.http_response_parser_) == 0
224✔
1065
                && !response_data_.http_response_parser_->chunked()) {
224✔
1066
                auto cancelled = cancelled_;
1067
                status_ = TransactionStatus::HeaderHandlerCalled;
48✔
1068
                CallHandler(header_handler_);
96✔
1069
                if (!*cancelled) {
48✔
1070
                        status_ = TransactionStatus::Done;
43✔
1071
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
43✔
1072
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1073
                                // is meant to be reused.
1074
                                DoCancel();
39✔
1075
                        }
1076
                        CallHandler(body_handler_);
86✔
1077
                }
1078
                return;
1079
        }
1080

1081
        auto cancelled = cancelled_;
1082
        status_ = TransactionStatus::HeaderHandlerCalled;
176✔
1083
        CallHandler(header_handler_);
352✔
1084
        if (*cancelled) {
176✔
1085
                return;
1086
        }
1087

1088
        // We know that a body reader is required here, because of the check for body above.
1089
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
173✔
1090
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1091
        }
1092
}
1093

1094
void Client::HandleSecondaryRequest() {
9✔
1095
        logger_.Debug(
18✔
1096
                "Received proxy response: "
1097
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1098
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1099

1100
        request_ = std::move(secondary_req_);
1101

1102
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1103
                auto err = MakeError(
1104
                        ProxyError,
1105
                        "Proxy returned unexpected response: "
1106
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1107
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1108
                CallErrorHandler(err, request_, header_handler_);
4✔
1109
                return;
1110
        }
1111

1112
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1113
                || response_data_.http_response_parser_->chunked()) {
7✔
1114
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
1115
                CallErrorHandler(err, request_, header_handler_);
×
1116
                return;
1117
        }
1118

1119
        // We are connected. Now repeat the request cycle with the original request. Pretend
1120
        // we were just connected.
1121

1122
        assert(request_->GetProtocol() == "https");
1123

1124
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1125
        // a different layer, this will get out of sync.
1126
        assert(response_data_.response_buffer_->size() == 0);
1127

1128
        switch (socket_mode_) {
7✔
UNCOV
1129
        case SocketMode::TlsTls:
×
1130
                // Should never get here, because this is the only place where TlsTls mode
1131
                // is supposed to be turned on.
1132
                assert(false);
UNCOV
1133
                CallErrorHandler(
×
UNCOV
1134
                        error::MakeError(
×
1135
                                error::ProgrammingError,
UNCOV
1136
                                "Any other mode than Tls is not valid when handling secondary request"),
×
UNCOV
1137
                        request_,
×
UNCOV
1138
                        header_handler_);
×
UNCOV
1139
                break;
×
1140
        case SocketMode::Tls:
3✔
1141
                // Upgrade to TLS inside TLS.
1142
                socket_mode_ = SocketMode::TlsTls;
3✔
1143
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1144
                break;
3✔
1145
        case SocketMode::Plain:
4✔
1146
                // Upgrade to TLS.
1147
                socket_mode_ = SocketMode::Tls;
4✔
1148
                HandshakeHandler(
4✔
1149
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1150
                break;
4✔
1151
        }
1152
}
1153

1154
void Client::AsyncReadNextBodyPart(
4,663✔
1155
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1156
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1157

1158
        if (status_ == TransactionStatus::ReaderCreated) {
4,663✔
1159
                status_ = TransactionStatus::BodyReadingInProgress;
154✔
1160
        }
1161

1162
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
4,663✔
1163
                auto cancelled = cancelled_;
1164
                handler(0);
190✔
1165
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
95✔
1166
                        status_ = TransactionStatus::Done;
95✔
1167
                        DoCancel();
95✔
1168
                        CallHandler(body_handler_);
190✔
1169
                }
1170
                return;
1171
        }
1172

1173
        reader_buf_start_ = start;
4,568✔
1174
        reader_buf_end_ = end;
4,568✔
1175
        reader_handler_ = handler;
4,568✔
1176
        size_t read_size = end - start;
4,568✔
1177
        size_t smallest = min(body_buffer_.size(), read_size);
6,681✔
1178

1179
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,568✔
1180
        response_data_.http_response_parser_->get().body().size = smallest;
4,568✔
1181
        response_data_.last_buffer_size_ = smallest;
4,568✔
1182

1183
        auto &cancelled = cancelled_;
1184
        auto &response_data = response_data_;
4,568✔
1185

1186
        // Set timeout to 5 minutes to ensure we don't hang during async read
1187
        // `next_layer().next_layer()` accesses the `beast::tcp_stream` from
1188
        // `ssl::stream<ssl::stream<beast::tcp_stream>>`
1189
        stream_->next_layer().next_layer().expires_after(chrono::minutes(5));
4,568✔
1190

1191
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,567✔
1192
                if (!*cancelled) {
4,567✔
1193
                        ReadBodyHandler(ec, num_read);
4,567✔
1194
                }
1195
        };
9,136✔
1196

1197
        switch (socket_mode_) {
4,568✔
1198
        case SocketMode::TlsTls:
2✔
1199
                http::async_read_some(
2✔
1200
                        *stream_,
1201
                        *response_data_.response_buffer_,
1202
                        *response_data_.http_response_parser_,
1203
                        async_handler);
1204
                break;
1205
        case SocketMode::Tls:
4✔
1206
                http::async_read_some(
4✔
1207
                        stream_->next_layer(),
1208
                        *response_data_.response_buffer_,
1209
                        *response_data_.http_response_parser_,
1210
                        async_handler);
1211
                break;
1212
        case SocketMode::Plain:
4,562✔
1213
                http::async_read_some(
4,562✔
1214
                        stream_->next_layer().next_layer(),
1215
                        *response_data_.response_buffer_,
1216
                        *response_data_.http_response_parser_,
1217
                        async_handler);
1218
                break;
1219
        }
1220
}
1221

1222
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,567✔
1223
        if (num_read > 0) {
4,567✔
1224
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
9,034✔
1225
        }
1226

1227
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,567✔
1228
                // This can be ignored. We always reset the buffer between reads anyway.
1229
                ec = error_code();
1,958✔
1230
        }
1231

1232
        assert(reader_handler_);
1233

1234
        if (response_data_.http_response_parser_->is_done()) {
4,567✔
1235
                status_ = TransactionStatus::BodyReadingFinished;
101✔
1236
        }
1237

1238
        auto cancelled = cancelled_;
1239

1240
        if (ec) {
4,567✔
1241
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
100✔
1242
                reader_handler_(expected::unexpected(err));
150✔
1243
                if (!*cancelled) {
50✔
1244
                        CallErrorHandler(ec, request_, body_handler_);
92✔
1245
                }
1246
                return;
1247
        }
1248

1249
        // The num_read from above includes out of band payload data, such as chunk headers, which
1250
        // we are not interested in. So we need to calculate the payload size from the remaining
1251
        // buffer space.
1252
        size_t payload_read =
1253
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,517✔
1254

1255
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,517✔
1256
        size_t smallest = min(payload_read, buf_size);
4,517✔
1257

1258
        if (smallest == 0) {
4,517✔
1259
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1260
                // return 0 to the handler however, because in `io::Reader` context this means
1261
                // EOF. So just repeat the request instead, until we get actual payload data.
1262
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
462✔
1263
        } else {
1264
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
4,286✔
1265
                reader_handler_(smallest);
8,572✔
1266
        }
1267
}
1268

1269
void Client::Cancel() {
205✔
1270
        auto cancelled = cancelled_;
1271

1272
        if (!*cancelled) {
205✔
1273
                auto err =
1274
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
40✔
1275
                switch (status_) {
20✔
1276
                case TransactionStatus::None:
3✔
1277
                        CallErrorHandler(err, request_, header_handler_);
3✔
1278
                        break;
3✔
1279
                case TransactionStatus::HeaderHandlerCalled:
16✔
1280
                case TransactionStatus::ReaderCreated:
1281
                case TransactionStatus::BodyReadingInProgress:
1282
                case TransactionStatus::BodyReadingFinished:
1283
                        CallErrorHandler(err, request_, body_handler_);
16✔
1284
                        break;
16✔
1285
                case TransactionStatus::Replying:
1286
                case TransactionStatus::SwitchingProtocol:
1287
                        // Not used by client.
1288
                        assert(false);
1289
                        break;
1290
                case TransactionStatus::BodyHandlerCalled:
1291
                case TransactionStatus::Done:
1292
                        break;
1293
                }
1294
        }
1295

1296
        if (!*cancelled) {
205✔
1297
                DoCancel();
1✔
1298
        }
1299
}
205✔
1300

1301
void Client::DoCancel() {
618✔
1302
        resolver_.cancel();
618✔
1303
        if (stream_) {
618✔
1304
                beast::error_code ec;
263✔
1305
                stream_->lowest_layer().cancel(ec);
263✔
1306
                stream_->lowest_layer().close(ec);
263✔
1307
                stream_.reset();
263✔
1308
        }
1309

1310
        // Reset logger to no connection.
1311
        logger_ = log::Logger(logger_name_);
618✔
1312

1313
        // Set cancel state and then make a new one. Those who are interested should have their own
1314
        // pointer to the old one.
1315
        *cancelled_ = true;
618✔
1316
        cancelled_ = make_shared<bool>(true);
618✔
1317
}
618✔
1318

1319
Stream::Stream(Server &server) :
447✔
1320
        server_ {server},
1321
        logger_ {"http"},
1322
        cancelled_(make_shared<bool>(true)),
447✔
1323
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
447✔
1324
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,341✔
1325
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
1,341✔
1326

1327
        // This is equivalent to:
1328
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1329
        // but compatible with Boost 1.67.
1330
        request_data_.request_buffer_->prepare(
1331
                body_buffer_.size() - request_data_.request_buffer_->size());
447✔
1332

1333
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
894✔
1334

1335
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1336
        // they do, they should be handled higher up in the application logic.
1337
        //
1338
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1339
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1340
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1341
        // possible value instead.
1342
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1343
}
447✔
1344

1345
Stream::~Stream() {
1,341✔
1346
        DoCancel();
447✔
1347
}
447✔
1348

1349
void Stream::Cancel() {
7✔
1350
        auto cancelled = cancelled_;
1351

1352
        if (!*cancelled) {
7✔
1353
                auto err =
1354
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1355
                switch (status_) {
7✔
UNCOV
1356
                case TransactionStatus::None:
×
1357
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1358
                        break;
×
1359
                case TransactionStatus::HeaderHandlerCalled:
5✔
1360
                case TransactionStatus::ReaderCreated:
1361
                case TransactionStatus::BodyReadingInProgress:
1362
                case TransactionStatus::BodyReadingFinished:
1363
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1364
                        break;
5✔
UNCOV
1365
                case TransactionStatus::BodyHandlerCalled:
×
1366
                        // In between body handler and reply finished. No one to handle the status
1367
                        // here.
UNCOV
1368
                        server_.RemoveStream(shared_from_this());
×
UNCOV
1369
                        break;
×
1370
                case TransactionStatus::Replying:
1✔
1371
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1372
                        break;
1✔
1373
                case TransactionStatus::SwitchingProtocol:
1✔
1374
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1375
                        break;
1✔
1376
                case TransactionStatus::Done:
1377
                        break;
1378
                }
1379
        }
1380

1381
        if (!*cancelled) {
7✔
UNCOV
1382
                DoCancel();
×
1383
        }
1384
}
7✔
1385

1386
void Stream::DoCancel() {
811✔
1387
        if (socket_.is_open()) {
811✔
1388
                socket_.cancel();
220✔
1389
                socket_.close();
220✔
1390
        }
1391

1392
        // Set cancel state and then make a new one. Those who are interested should have their own
1393
        // pointer to the old one.
1394
        *cancelled_ = true;
811✔
1395
        cancelled_ = make_shared<bool>(true);
811✔
1396
}
811✔
1397

1398
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1399
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
UNCOV
1400
}
×
1401

UNCOV
1402
void Stream::CallErrorHandler(
×
1403
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
UNCOV
1404
        status_ = TransactionStatus::Done;
×
UNCOV
1405
        DoCancel();
×
UNCOV
1406
        handler(expected::unexpected(err.WithContext(
×
UNCOV
1407
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1408

UNCOV
1409
        server_.RemoveStream(shared_from_this());
×
UNCOV
1410
}
×
1411

1412
void Stream::CallErrorHandler(
2✔
1413
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1414
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1415
}
2✔
1416

1417
void Stream::CallErrorHandler(
8✔
1418
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1419
        status_ = TransactionStatus::Done;
8✔
1420
        DoCancel();
8✔
1421
        handler(
8✔
1422
                req,
1423
                err.WithContext(
8✔
1424
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1425

1426
        server_.RemoveStream(shared_from_this());
8✔
1427
}
8✔
1428

1429
void Stream::CallErrorHandler(
4✔
1430
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1431
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1432
}
4✔
1433

1434
void Stream::CallErrorHandler(
7✔
1435
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1436
        status_ = TransactionStatus::Done;
7✔
1437
        DoCancel();
7✔
1438
        handler(err.WithContext(
14✔
1439
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1440

1441
        server_.RemoveStream(shared_from_this());
7✔
1442
}
7✔
1443

UNCOV
1444
void Stream::CallErrorHandler(
×
1445
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
UNCOV
1446
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
UNCOV
1447
}
×
1448

1449
void Stream::CallErrorHandler(
1✔
1450
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1451
        status_ = TransactionStatus::Done;
1✔
1452
        DoCancel();
1✔
1453
        handler(expected::unexpected(err.WithContext(
2✔
1454
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1455

1456
        server_.RemoveStream(shared_from_this());
1✔
1457
}
1✔
1458

1459
void Stream::AcceptHandler(const error_code &ec) {
228✔
1460
        if (ec) {
228✔
UNCOV
1461
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
UNCOV
1462
                return;
×
1463
        }
1464

1465
        auto ip = socket_.remote_endpoint().address().to_string();
456✔
1466

1467
        // Use IP as context for logging.
1468
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
228✔
1469

1470
        logger_.Debug("Accepted connection.");
456✔
1471

1472
        request_.reset(new IncomingRequest(*this, cancelled_));
456✔
1473

1474
        request_->address_.host = ip;
228✔
1475

1476
        *cancelled_ = false;
228✔
1477

1478
        ReadHeader();
228✔
1479
}
1480

1481
void Stream::ReadHeader() {
228✔
1482
        auto &cancelled = cancelled_;
1483
        auto &request_data = request_data_;
228✔
1484

1485
        http::async_read_some(
456✔
1486
                socket_,
228✔
1487
                *request_data_.request_buffer_,
1488
                *request_data_.http_request_parser_,
1489
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
228✔
1490
                        if (!*cancelled) {
228✔
1491
                                ReadHeaderHandler(ec, num_read);
228✔
1492
                        }
1493
                });
228✔
1494
}
228✔
1495

1496
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
228✔
1497
        if (num_read > 0) {
228✔
1498
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
456✔
1499
        }
1500

1501
        if (ec) {
228✔
UNCOV
1502
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1503
                return;
188✔
1504
        }
1505

1506
        if (!request_data_.http_request_parser_->is_header_done()) {
228✔
UNCOV
1507
                ReadHeader();
×
UNCOV
1508
                return;
×
1509
        }
1510

1511
        auto method_result = BeastVerbToMethod(
1512
                request_data_.http_request_parser_->get().base().method(),
1513
                string {request_data_.http_request_parser_->get().base().method_string()});
456✔
1514
        if (!method_result) {
228✔
UNCOV
1515
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
UNCOV
1516
                return;
×
1517
        }
1518
        request_->method_ = method_result.value();
228✔
1519
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
228✔
1520

1521
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
228✔
1522

1523
        string debug_str;
1524
        for (auto header = request_data_.http_request_parser_->get().cbegin();
394✔
1525
                 header != request_data_.http_request_parser_->get().cend();
622✔
1526
                 header++) {
1527
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,182✔
1528
                if (logger_.Level() >= log::LogLevel::Debug) {
394✔
1529
                        debug_str += string {header->name_string()};
329✔
1530
                        debug_str += ": ";
329✔
1531
                        debug_str += string {header->value()};
329✔
1532
                        debug_str += "\n";
329✔
1533
                }
1534
        }
1535

1536
        logger_.Debug("Received headers:\n" + debug_str);
456✔
1537
        debug_str.clear();
1538

1539
        if (GetContentLength(*request_data_.http_request_parser_) == 0
228✔
1540
                && !request_data_.http_request_parser_->chunked()) {
228✔
1541
                auto cancelled = cancelled_;
1542
                status_ = TransactionStatus::HeaderHandlerCalled;
187✔
1543
                server_.header_handler_(request_);
374✔
1544
                if (!*cancelled) {
187✔
1545
                        status_ = TransactionStatus::BodyHandlerCalled;
187✔
1546
                        CallBodyHandler();
187✔
1547
                }
1548
                return;
1549
        }
1550

1551
        assert(!request_data_.http_request_parser_->is_done());
1552

1553
        auto cancelled = cancelled_;
1554
        status_ = TransactionStatus::HeaderHandlerCalled;
41✔
1555
        server_.header_handler_(request_);
82✔
1556
        if (*cancelled) {
41✔
1557
                return;
1558
        }
1559

1560
        // We know that a body reader is required here, because of the check for body above.
1561
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
40✔
1562
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1563
        }
1564
}
1565

1566
void Stream::AsyncReadNextBodyPart(
2,264✔
1567
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1568
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1569

1570
        if (status_ == TransactionStatus::ReaderCreated) {
2,264✔
1571
                status_ = TransactionStatus::BodyReadingInProgress;
39✔
1572
        }
1573

1574
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,264✔
1575
                auto cancelled = cancelled_;
1576
                handler(0);
66✔
1577
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
33✔
1578
                        status_ = TransactionStatus::BodyHandlerCalled;
33✔
1579
                        CallBodyHandler();
33✔
1580
                }
1581
                return;
1582
        }
1583

1584
        reader_buf_start_ = start;
2,231✔
1585
        reader_buf_end_ = end;
2,231✔
1586
        reader_handler_ = handler;
2,231✔
1587
        size_t read_size = end - start;
2,231✔
1588
        size_t smallest = min(body_buffer_.size(), read_size);
3,287✔
1589

1590
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,231✔
1591
        request_data_.http_request_parser_->get().body().size = smallest;
2,231✔
1592
        request_data_.last_buffer_size_ = smallest;
2,231✔
1593

1594
        auto &cancelled = cancelled_;
1595
        auto &request_data = request_data_;
2,231✔
1596

1597
        http::async_read_some(
4,462✔
1598
                socket_,
2,231✔
1599
                *request_data_.request_buffer_,
1600
                *request_data_.http_request_parser_,
1601
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,231✔
1602
                        if (!*cancelled) {
2,231✔
1603
                                ReadBodyHandler(ec, num_read);
2,231✔
1604
                        }
1605
                });
2,231✔
1606
}
1607

1608
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,231✔
1609
        if (num_read > 0) {
2,231✔
1610
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,454✔
1611
        }
1612

1613
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,231✔
1614
                // This can be ignored. We always reset the buffer between reads anyway.
1615
                ec = error_code();
979✔
1616
        }
1617

1618
        assert(reader_handler_);
1619

1620
        if (request_data_.http_request_parser_->is_done()) {
2,231✔
1621
                status_ = TransactionStatus::BodyReadingFinished;
33✔
1622
        }
1623

1624
        auto cancelled = cancelled_;
1625

1626
        if (ec) {
2,231✔
1627
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1628
                reader_handler_(expected::unexpected(err));
12✔
1629
                if (!*cancelled) {
4✔
1630
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1631
                }
1632
                return;
1633
        }
1634

1635
        // The num_read from above includes out of band payload data, such as chunk headers, which
1636
        // we are not interested in. So we need to calculate the payload size from the remaining
1637
        // buffer space.
1638
        size_t payload_read =
1639
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,227✔
1640

1641
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,227✔
1642
        size_t smallest = min(payload_read, buf_size);
2,227✔
1643

1644
        if (smallest == 0) {
2,227✔
1645
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1646
                // return 0 to the handler however, because in `io::Reader` context this means
1647
                // EOF. So just repeat the request instead, until we get actual payload data.
1648
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1649
        } else {
1650
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,150✔
1651
                reader_handler_(smallest);
4,300✔
1652
        }
1653
}
1654

1655
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
205✔
1656
        SetupResponse();
205✔
1657

1658
        reply_finished_handler_ = reply_finished_handler;
205✔
1659

1660
        auto &cancelled = cancelled_;
1661
        auto &response_data = response_data_;
205✔
1662

1663
        http::async_write_header(
410✔
1664
                socket_,
205✔
1665
                *response_data_.http_response_serializer_,
1666
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
205✔
1667
                        if (!*cancelled) {
205✔
1668
                                WriteHeaderHandler(ec, num_written);
204✔
1669
                        }
1670
                });
205✔
1671
}
205✔
1672

1673
void Stream::SetupResponse() {
214✔
1674
        auto response = maybe_response_.lock();
214✔
1675
        // Only called from existing responses, so this should always be true.
1676
        assert(response);
1677

1678
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1679
        status_ = TransactionStatus::Replying;
214✔
1680

1681
        // From here on we take shared ownership.
1682
        response_ = response;
1683

1684
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
428✔
1685

1686
        for (const auto &header : response->headers_) {
444✔
1687
                response_data_.http_response_->base().set(header.first, header.second);
230✔
1688
        }
1689

1690
        response_data_.http_response_->result(response->GetStatusCode());
214✔
1691
        response_data_.http_response_->reason(response->GetStatusMessage());
428✔
1692

1693
        response_data_.http_response_serializer_ =
1694
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
428✔
1695
}
214✔
1696

1697
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
204✔
1698
        if (num_written > 0) {
204✔
1699
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
408✔
1700
        }
1701

1702
        if (ec) {
204✔
UNCOV
1703
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1704
                return;
37✔
1705
        }
1706

1707
        auto exp_has_body =
1708
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
408✔
1709
        if (!exp_has_body) {
204✔
UNCOV
1710
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
UNCOV
1711
                return;
×
1712
        }
1713
        if (!exp_has_body.value()) {
204✔
1714
                FinishReply();
36✔
1715
                return;
1716
        }
1717

1718
        if (!response_->body_reader_ && !response_->async_body_reader_) {
168✔
1719
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1720
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1721
                return;
1722
        }
1723

1724
        PrepareAndWriteNewBodyBuffer();
167✔
1725
}
1726

1727
void Stream::PrepareAndWriteNewBodyBuffer() {
2,229✔
1728
        // response_->body_reader_ XOR response_->async_body_reader_
1729
        assert(
1730
                (response_->body_reader_ || response_->async_body_reader_)
1731
                && !(response_->body_reader_ && response_->async_body_reader_));
1732

1733
        auto read_handler = [this](io::ExpectedSize read) {
2,230✔
1734
                if (!read) {
2,229✔
1735
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1736
                        return;
1✔
1737
                }
1738
                WriteNewBodyBuffer(read.value());
2,228✔
1739
        };
2,229✔
1740

1741
        if (response_->body_reader_) {
2,229✔
1742
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,910✔
1743
        } else {
1744
                auto err = response_->async_body_reader_->AsyncRead(
1745
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1746
                if (err != error::NoError) {
274✔
UNCOV
1747
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1748
                }
1749
        }
1750
}
2,229✔
1751

1752
void Stream::WriteNewBodyBuffer(size_t size) {
2,228✔
1753
        response_data_.http_response_->body().data = body_buffer_.data();
2,228✔
1754
        response_data_.http_response_->body().size = size;
2,228✔
1755

1756
        if (size > 0) {
2,228✔
1757
                response_data_.http_response_->body().more = true;
2,098✔
1758
        } else {
1759
                response_data_.http_response_->body().more = false;
130✔
1760
        }
1761

1762
        WriteBody();
2,228✔
1763
}
2,228✔
1764

1765
void Stream::WriteBody() {
4,308✔
1766
        auto &cancelled = cancelled_;
1767
        auto &response_data = response_data_;
4,308✔
1768

1769
        http::async_write_some(
8,616✔
1770
                socket_,
4,308✔
1771
                *response_data_.http_response_serializer_,
1772
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
4,268✔
1773
                        if (!*cancelled) {
4,268✔
1774
                                WriteBodyHandler(ec, num_written);
4,268✔
1775
                        }
1776
                });
4,268✔
1777
}
4,308✔
1778

1779
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,268✔
1780
        if (num_written > 0) {
4,268✔
1781
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
4,160✔
1782
        }
1783

1784
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,268✔
1785
                // Write next body block.
1786
                PrepareAndWriteNewBodyBuffer();
2,062✔
1787
        } else if (ec) {
2,206✔
1788
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1789
        } else if (num_written > 0) {
2,202✔
1790
                // We are still writing the body.
1791
                WriteBody();
2,080✔
1792
        } else {
1793
                // We are finished.
1794
                FinishReply();
122✔
1795
        }
1796
}
4,268✔
1797

1798
void Stream::FinishReply() {
158✔
1799
        // We are done.
1800
        status_ = TransactionStatus::Done;
158✔
1801
        DoCancel();
158✔
1802
        // Release ownership of Body reader.
1803
        response_->body_reader_.reset();
158✔
1804
        response_->async_body_reader_.reset();
158✔
1805
        reply_finished_handler_(error::NoError);
158✔
1806
        server_.RemoveStream(shared_from_this());
158✔
1807
}
158✔
1808

1809
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1810
        SetupResponse();
9✔
1811

1812
        switch_protocol_handler_ = handler;
9✔
1813
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1814

1815
        auto &cancelled = cancelled_;
1816
        auto &response_data = response_data_;
9✔
1817

1818
        http::async_write_header(
18✔
1819
                socket_,
9✔
1820
                *response_data_.http_response_serializer_,
1821
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1822
                        if (!*cancelled) {
9✔
1823
                                SwitchingProtocolHandler(ec, num_written);
8✔
1824
                        }
1825
                });
9✔
1826

1827
        return error::NoError;
9✔
1828
}
1829

1830
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1831
        if (num_written > 0) {
8✔
1832
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1833
        }
1834

1835
        if (ec) {
8✔
UNCOV
1836
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
UNCOV
1837
                return;
×
1838
        }
1839

1840
        auto socket = make_shared<RawSocket<tcp::socket>>(
1841
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1842

1843
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1844

1845
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1846
        *cancelled_ = true;
8✔
1847
        cancelled_ = make_shared<bool>(true);
8✔
1848
        server_.RemoveStream(shared_from_this());
16✔
1849

1850
        switch_protocol_handler(socket);
16✔
1851
}
1852

1853
void Stream::CallBodyHandler() {
220✔
1854
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1855
        // it immediately destroys, which would destroy this stream as well. At the end of this
1856
        // function, it's ok to destroy it.
1857
        auto stream_ref = shared_from_this();
1858

1859
        server_.body_handler_(request_, error::NoError);
660✔
1860

1861
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1862
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1863
        // request has not been handled correctly.
1864
        auto response = maybe_response_.lock();
220✔
1865
        if (!response) {
220✔
1866
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1867
                *cancelled_ = true;
3✔
1868
                cancelled_ = make_shared<bool>(true);
3✔
1869
                server_.RemoveStream(shared_from_this());
9✔
1870
        }
1871
}
220✔
1872

1873
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
240✔
1874
        event_loop_ {event_loop},
1875
        acceptor_(GetAsioIoContext(event_loop_)) {
433✔
1876
}
240✔
1877

1878
Server::~Server() {
480✔
1879
        Cancel();
240✔
1880
}
240✔
1881

1882
error::Error Server::AsyncServeUrl(
205✔
1883
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1884
        return AsyncServeUrl(
1885
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
834✔
1886
                        if (err != error::NoError) {
215✔
1887
                                body_handler(expected::unexpected(err));
12✔
1888
                        } else {
1889
                                body_handler(req);
418✔
1890
                        }
1891
                });
625✔
1892
}
1893

1894
error::Error Server::AsyncServeUrl(
220✔
1895
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1896
        auto err = BreakDownUrl(url, address_);
220✔
1897
        if (error::NoError != err) {
220✔
UNCOV
1898
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1899
        }
1900

1901
        if (address_.protocol != "http") {
220✔
1902
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1903
        }
1904

1905
        if (address_.path.size() > 0 && address_.path != "/") {
220✔
1906
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1907
        }
1908

1909
        boost::system::error_code ec;
219✔
1910
        auto address = asio::ip::make_address(address_.host, ec);
219✔
1911
        if (ec) {
219✔
1912
                return error::Error(
UNCOV
1913
                        ec.default_error_condition(),
×
UNCOV
1914
                        "Could not construct endpoint from address " + address_.host);
×
1915
        }
1916

1917
        asio::ip::tcp::endpoint endpoint(address, address_.port);
219✔
1918

1919
        ec.clear();
1920
        acceptor_.open(endpoint.protocol(), ec);
219✔
1921
        if (ec) {
219✔
UNCOV
1922
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1923
        }
1924

1925
        // Allow address reuse, otherwise we can't re-bind later.
1926
        ec.clear();
1927
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
219✔
1928
        if (ec) {
219✔
UNCOV
1929
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1930
        }
1931

1932
        ec.clear();
1933
        acceptor_.bind(endpoint, ec);
219✔
1934
        if (ec) {
219✔
UNCOV
1935
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1936
        }
1937

1938
        ec.clear();
1939
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
219✔
1940
        if (ec) {
219✔
UNCOV
1941
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1942
        }
1943

1944
        header_handler_ = header_handler;
219✔
1945
        body_handler_ = body_handler;
219✔
1946

1947
        PrepareNewStream();
219✔
1948

1949
        return error::NoError;
219✔
1950
}
1951

1952
void Server::Cancel() {
260✔
1953
        if (acceptor_.is_open()) {
260✔
1954
                acceptor_.cancel();
219✔
1955
                acceptor_.close();
219✔
1956
        }
1957
        streams_.clear();
1958
}
260✔
1959

1960
uint16_t Server::GetPort() const {
17✔
1961
        return acceptor_.local_endpoint().port();
17✔
1962
}
1963

1964
string Server::GetUrl() const {
16✔
1965
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1966
}
1967

1968
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
219✔
1969
        if (*req->cancelled_) {
219✔
UNCOV
1970
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1971
        }
1972
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
438✔
1973
        req->stream_.maybe_response_ = response;
219✔
1974
        return response;
219✔
1975
}
1976

1977
error::Error Server::AsyncReply(
205✔
1978
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1979
        if (*resp->cancelled_) {
205✔
UNCOV
1980
                return MakeError(StreamCancelledError, "Cannot send response");
×
1981
        }
1982

1983
        resp->stream_.AsyncReply(reply_finished_handler);
205✔
1984
        return error::NoError;
205✔
1985
}
1986

1987
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
58✔
1988
        if (*req->cancelled_) {
58✔
UNCOV
1989
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1990
        }
1991

1992
        auto &stream = req->stream_;
58✔
1993
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
58✔
1994
                return expected::unexpected(error::Error(
1✔
1995
                        make_error_condition(errc::operation_in_progress),
2✔
1996
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1997
        }
1998

1999
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
57✔
2000
                && !stream.request_data_.http_request_parser_->chunked()) {
57✔
2001
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
2002
        }
2003

2004
        stream.status_ = TransactionStatus::ReaderCreated;
39✔
2005
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
78✔
2006
}
2007

2008
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
2009
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
2010
}
2011

2012
void Server::PrepareNewStream() {
447✔
2013
        StreamPtr new_stream {new Stream(*this)};
447✔
2014
        streams_.insert(new_stream);
2015
        AsyncAccept(new_stream);
894✔
2016
}
447✔
2017

2018
void Server::AsyncAccept(StreamPtr stream) {
447✔
2019
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
678✔
2020
                if (ec) {
231✔
2021
                        if (ec != errc::operation_canceled) {
3✔
UNCOV
2022
                                log::Error("Could not accept connection: " + ec.message());
×
2023
                        }
2024
                        return;
3✔
2025
                }
2026

2027
                stream->AcceptHandler(ec);
228✔
2028

2029
                this->PrepareNewStream();
228✔
2030
        });
2031
}
447✔
2032

2033
void Server::RemoveStream(StreamPtr stream) {
190✔
2034
        streams_.erase(stream);
190✔
2035

2036
        stream->DoCancel();
190✔
2037
}
190✔
2038

2039
} // namespace http
2040
} // namespace common
2041
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc