• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 2016950616

02 Sep 2025 11:00AM UTC coverage: 75.919% (-0.01%) from 75.933%
2016950616

push

gitlab-ci

web-flow
Merge branch 'mendersoftware:master' into master

7390 of 9734 relevant lines covered (75.92%)

13932.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.92
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
#include <mender-version.h>
28

29
namespace mender {
30
namespace common {
31
namespace http {
32

33
namespace common = mender::common;
34
namespace crypto = mender::common::crypto;
35

36
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
37
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
38
const unsigned int BeastHttpVersion = 11;
39

40
namespace asio = boost::asio;
41
namespace http = boost::beast::http;
42

43
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
44

45
static http::verb MethodToBeastVerb(Method method) {
249✔
46
        switch (method) {
249✔
47
        case Method::GET:
48
                return http::verb::get;
49
        case Method::HEAD:
50
                return http::verb::head;
51
        case Method::POST:
52
                return http::verb::post;
53
        case Method::PUT:
54
                return http::verb::put;
55
        case Method::PATCH:
56
                return http::verb::patch;
57
        case Method::CONNECT:
58
                return http::verb::connect;
59
        case Method::Invalid:
60
                // Fallthrough to end (no-op).
61
                break;
62
        }
63
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
64
        // still assert here for safety.
65
        assert(false);
66
        return http::verb::get;
67
}
68

69
static expected::expected<Method, error::Error> BeastVerbToMethod(
228✔
70
        http::verb verb, const string &verb_string) {
71
        switch (verb) {
228✔
72
        case http::verb::get:
192✔
73
                return Method::GET;
74
        case http::verb::head:
×
75
                return Method::HEAD;
76
        case http::verb::post:
13✔
77
                return Method::POST;
78
        case http::verb::put:
23✔
79
                return Method::PUT;
80
        case http::verb::patch:
×
81
                return Method::PATCH;
82
        case http::verb::connect:
×
83
                return Method::CONNECT;
84
        default:
×
85
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
86
        }
87
}
88

89
template <typename StreamType>
90
class BodyAsyncReader : virtual public io::AsyncReader {
91
public:
92
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
156✔
93
                stream_ {stream},
94
                cancelled_ {cancelled} {
312✔
95
        }
156✔
96
        ~BodyAsyncReader() {
39✔
97
                Cancel();
39✔
98
        }
78✔
99

100
        error::Error AsyncRead(
2,187✔
101
                vector<uint8_t>::iterator start,
102
                vector<uint8_t>::iterator end,
103
                io::AsyncIoHandler handler) override {
104
                if (eof_) {
2,187✔
105
                        handler(0);
×
106
                        return error::NoError;
×
107
                }
108

109
                if (*cancelled_) {
2,187✔
110
                        return error::MakeError(
×
111
                                error::ProgrammingError,
112
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
113
                }
114
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
13,726✔
115
                        if (size && size.value() == 0) {
6,618✔
116
                                eof_ = true;
128✔
117
                        }
118
                        handler(size);
13,236✔
119
                });
120
                return error::NoError;
2,187✔
121
        }
122

123
        void Cancel() override {
39✔
124
                if (!*cancelled_) {
39✔
125
                        stream_.Cancel();
4✔
126
                }
127
        }
39✔
128

129
private:
130
        StreamType &stream_;
131
        shared_ptr<bool> cancelled_;
132
        bool eof_ {false};
133

134
        friend class Client;
135
        friend class Server;
136
};
137

138
template <typename StreamType>
139
class RawSocket : virtual public io::AsyncReadWriter {
140
public:
141
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
142
                destroying_ {make_shared<bool>(false)},
×
143
                stream_ {stream},
144
                buffered_ {buffered} {
×
145
                // If there are no buffered bytes, then we don't need it.
146
                if (buffered_ && buffered_->size() == 0) {
×
147
                        buffered_.reset();
×
148
                }
149
        }
×
150

151
        ~RawSocket() {
15✔
152
                *destroying_ = true;
15✔
153
                Cancel();
15✔
154
        }
30✔
155

156
        error::Error AsyncRead(
320✔
157
                vector<uint8_t>::iterator start,
158
                vector<uint8_t>::iterator end,
159
                io::AsyncIoHandler handler) override {
160
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
161
                // header and parts of the body in one block, return those first.
162
                if (buffered_) {
320✔
163
                        return DrainPrebufferedData(start, end, handler);
8✔
164
                }
165

166
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
167
                auto &destroying = destroying_;
168
                stream_->async_read_some(
632✔
169
                        read_buffer_,
316✔
170
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
171
                                if (*destroying) {
313✔
172
                                        return;
173
                                }
174

175
                                if (ec == asio::error::operation_aborted) {
313✔
176
                                        handler(expected::unexpected(error::Error(
12✔
177
                                                make_error_condition(errc::operation_canceled),
6✔
178
                                                "Could not read from socket")));
179
                                } else if (ec) {
310✔
180
                                        handler(expected::unexpected(
12✔
181
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
182
                                } else {
183
                                        handler(num_read);
608✔
184
                                }
185
                        });
186
                return error::NoError;
316✔
187
        }
188

189
        error::Error AsyncWrite(
309✔
190
                vector<uint8_t>::const_iterator start,
191
                vector<uint8_t>::const_iterator end,
192
                io::AsyncIoHandler handler) override {
193
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
194
                auto &destroying = destroying_;
195
                stream_->async_write_some(
618✔
196
                        write_buffer_,
309✔
197
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
198
                                if (*destroying) {
306✔
199
                                        return;
200
                                }
201

202
                                if (ec == asio::error::operation_aborted) {
306✔
203
                                        handler(expected::unexpected(error::Error(
×
204
                                                make_error_condition(errc::operation_canceled),
×
205
                                                "Could not write to socket")));
206
                                } else if (ec) {
306✔
207
                                        handler(expected::unexpected(
×
208
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
209
                                } else {
210
                                        handler(num_written);
612✔
211
                                }
212
                        });
213
                return error::NoError;
309✔
214
        }
215

216
        void Cancel() override {
28✔
217
                if (stream_->lowest_layer().is_open()) {
28✔
218
                        stream_->lowest_layer().cancel();
15✔
219
                        stream_->lowest_layer().close();
15✔
220
                }
221
        }
28✔
222

223
private:
224
        error::Error DrainPrebufferedData(
4✔
225
                vector<uint8_t>::iterator start,
226
                vector<uint8_t>::iterator end,
227
                io::AsyncIoHandler handler) {
228
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
229

230
                // These two lines are equivalent to:
231
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
232
                // but compatible with Boost 1.67.
233
                const beast::flat_buffer &cbuffered = *buffered_;
234
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
235
                buffered_->consume(to_copy);
4✔
236
                if (buffered_->size() == 0) {
4✔
237
                        // We don't need it anymore.
238
                        buffered_.reset();
4✔
239
                }
240
                handler(to_copy);
4✔
241
                return error::NoError;
4✔
242
        }
243

244
        shared_ptr<bool> destroying_;
245
        shared_ptr<StreamType> stream_;
246
        shared_ptr<beast::flat_buffer> buffered_;
247
        asio::mutable_buffer read_buffer_;
248
        asio::const_buffer write_buffer_;
249
};
250

251
template <typename PARSER>
252
int64_t GetContentLength(const PARSER &parser) {
404✔
253
        auto content_length = parser.content_length();
404✔
254
        if (content_length) {
404✔
255
                return content_length.value();
353✔
256
        } else {
257
                return 0;
258
        }
259
}
260

261
expected::ExpectedBool HasBody(
453✔
262
        const expected::ExpectedString &content_length,
263
        const expected::ExpectedString &transfer_encoding) {
264
        if (transfer_encoding) {
453✔
265
                if (transfer_encoding.value() != "chunked") {
4✔
266
                        return expected::unexpected(error::Error(
×
267
                                make_error_condition(errc::not_supported),
×
268
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
269
                }
270
                return true;
271
        }
272

273
        if (content_length) {
449✔
274
                auto length = common::StringToLongLong(content_length.value());
220✔
275
                if (!length || length.value() < 0) {
220✔
276
                        return expected::unexpected(error::Error(
×
277
                                length.error().code,
278
                                "Content-Length contains invalid number: " + content_length.value()));
×
279
                }
280
                return length.value() > 0;
220✔
281
        }
282

283
        return false;
284
}
285

286
Client::Client(
358✔
287
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
288
        event_loop_ {event_loop},
289
        logger_name_ {logger_name},
290
        client_config_ {client},
291
        http_proxy_ {client.http_proxy},
358✔
292
        https_proxy_ {client.https_proxy},
358✔
293
        no_proxy_ {client.no_proxy},
358✔
294
        cancelled_ {make_shared<bool>(true)},
×
295
        resolver_(GetAsioIoContext(event_loop)),
296
        body_buffer_(HTTP_BEAST_BUFFER_SIZE),
297
        read_timeout_timer_(event_loop) {
1,432✔
298
}
358✔
299

300
Client::~Client() {
2,506✔
301
        if (!*cancelled_) {
358✔
302
                logger_.Warning("Client destroyed while request is still active!");
30✔
303
        }
304
        DoCancel();
358✔
305
}
358✔
306

307
error::Error Client::Initialize() {
287✔
308
        if (initialized_) {
287✔
309
                return error::NoError;
72✔
310
        }
311

312
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
637✔
313
                ssl_ctx_[i].set_verify_mode(
850✔
314
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
315

316
                beast::error_code ec {};
426✔
317
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
426✔
318
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
319
                        ssl_ctx_[i].use_certificate_file(
320
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
321
                        if (ec) {
4✔
322
                                return error::Error(
323
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
324
                        }
325
                        auto exp_key = crypto::PrivateKey::Load(
326
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
327
                        if (!exp_key) {
3✔
328
                                return exp_key.error().WithContext(
329
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
330
                        }
331

332
                        const int ret =
333
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value()->Get());
2✔
334
                        if (ret != 1) {
2✔
335
                                return MakeError(
336
                                        HTTPInitError,
337
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
338
                                                + " to the SSL CTX");
×
339
                        }
340
                } else if (
341
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
422✔
342
                        return error::Error(
343
                                make_error_condition(errc::invalid_argument),
4✔
344
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
345
                }
346

347
                bool cert_loaded = true;
348
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
422✔
349
                if (ec) {
422✔
350
                        auto err = error::Error(
351
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
352
                        if (client_config_.server_cert_path == "") {
×
353
                                // We aren't going to have any valid certificates then.
354
                                return err;
×
355
                        } else {
356
                                // We have a dedicated certificate, so this is not fatal.
357
                                log::Info(err.String());
×
358
                                cert_loaded = false;
359
                        }
360
                }
361
                if (client_config_.server_cert_path != "") {
422✔
362
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
363
                        if (ec) {
52✔
364
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
365
                                if (!cert_loaded) {
2✔
366
                                        return error::Error(
367
                                                ec.default_error_condition(),
×
368
                                                "Failed to load SSL default directory and server certificate");
×
369
                                }
370
                        }
371
                }
372
        }
373

374
        initialized_ = true;
211✔
375

376
        return error::NoError;
211✔
377
}
378

379
// Create the HOST header according to:
380
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
381
// In short: Add the port-number if it is non-standard HTTP
382
static string CreateHOSTAddress(OutgoingRequestPtr req) {
275✔
383
        if (req->GetPort() == 80 || req->GetPort() == 443) {
275✔
384
                return req->GetHost();
4✔
385
        }
386
        return req->GetHost() + ":" + to_string(req->GetPort());
542✔
387
}
388

389
error::Error Client::AsyncCall(
287✔
390
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
391
        auto err = Initialize();
287✔
392
        if (err != error::NoError) {
287✔
393
                return err;
4✔
394
        }
395

396
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
283✔
397
                return error::Error(
398
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
399
        }
400

401
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
283✔
402
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
403
        }
404

405
        if (!header_handler || !body_handler) {
281✔
406
                return error::MakeError(
407
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
408
        }
409

410
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
280✔
411
                return error::Error(
412
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
413
        }
414

415
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
279✔
416

417
        request_ = req;
418

419
        err = HandleProxySetup();
279✔
420
        if (err != error::NoError) {
279✔
421
                return err;
4✔
422
        }
423

424
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
425
        // request to route to our k8s cluster. Set this in all cases.
426
        const string header_url = CreateHOSTAddress(req);
550✔
427
        req->SetHeader("HOST", header_url);
550✔
428

429
        log::Trace("Setting HOST address: " + header_url);
550✔
430

431
        // Add User-Agent header for all requests
432
        req->SetHeader("User-Agent", "Mender/" MENDER_VERSION);
550✔
433

434
        header_handler_ = header_handler;
275✔
435
        body_handler_ = body_handler;
275✔
436
        status_ = TransactionStatus::None;
275✔
437

438
        cancelled_ = make_shared<bool>(false);
275✔
439

440
        auto &cancelled = cancelled_;
441

442
        resolver_.async_resolve(
550✔
443
                request_->address_.host,
444
                to_string(request_->address_.port),
550✔
445
                [this, cancelled](
546✔
446
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
270✔
447
                        if (!*cancelled) {
271✔
448
                                ResolveHandler(ec, results);
270✔
449
                        }
450
                });
271✔
451

452
        return error::NoError;
275✔
453
}
454

455
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
456
        if (proxy_address.username == "") {
22✔
457
                // nothing to do
458
                return error::NoError;
19✔
459
        }
460
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
461
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
462
        if (!ex_dec_username) {
3✔
463
                return ex_dec_username.error();
×
464
        }
465
        if (!ex_dec_password) {
3✔
466
                return ex_dec_password.error();
×
467
        }
468
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
469
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
470
        if (!ex_encoded_creds) {
3✔
471
                return ex_encoded_creds.error();
×
472
        }
473
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
474
        log::Warning(
3✔
475
                "Avoid using basic authentication if possible, and make sure if it's used, it's through HTTPS");
6✔
476

477
        return error::NoError;
3✔
478
}
479

480
error::Error Client::HandleProxySetup() {
279✔
481
        secondary_req_.reset();
279✔
482

483
        if (request_->address_.protocol == "http") {
279✔
484
                socket_mode_ = SocketMode::Plain;
253✔
485

486
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
253✔
487
                        // Make a modified proxy request.
488
                        BrokenDownUrl proxy_address;
20✔
489
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
490
                        if (err != error::NoError) {
11✔
491
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
492
                        }
493
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
494
                                return MakeError(
495
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
496
                        }
497

498
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
499
                                                                          + ":" + to_string(request_->address_.port)
27✔
500
                                                                          + request_->address_.path;
27✔
501
                        request_->address_.host = proxy_address.host;
9✔
502
                        request_->address_.port = proxy_address.port;
9✔
503
                        request_->address_.protocol = proxy_address.protocol;
9✔
504

505
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
506
                        if (err != error::NoError) {
9✔
507
                                return err;
×
508
                        }
509

510
                        if (proxy_address.protocol == "https") {
9✔
511
                                socket_mode_ = SocketMode::Tls;
5✔
512
                        } else if (proxy_address.protocol == "http") {
4✔
513
                                socket_mode_ = SocketMode::Plain;
4✔
514
                        } else {
515
                                // Should never get here.
516
                                assert(false);
517
                        }
518
                }
519
        } else if (request_->address_.protocol == "https") {
26✔
520
                socket_mode_ = SocketMode::Tls;
26✔
521

522
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
523
                        // Save the original request for later, so that we can make a new request
524
                        // over the channel established by CONNECT.
525
                        secondary_req_ = std::move(request_);
526

527
                        request_ = make_shared<OutgoingRequest>();
30✔
528
                        request_->SetMethod(Method::CONNECT);
15✔
529
                        BrokenDownUrl proxy_address;
28✔
530
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
531
                        if (err != error::NoError) {
15✔
532
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
533
                        }
534
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
535
                                return MakeError(
536
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
537
                        }
538

539
                        request_->address_.path =
540
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
541
                        request_->address_.host = proxy_address.host;
13✔
542
                        request_->address_.port = proxy_address.port;
13✔
543
                        request_->address_.protocol = proxy_address.protocol;
13✔
544

545
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
546
                        if (err != error::NoError) {
13✔
547
                                return err;
×
548
                        }
549

550
                        if (proxy_address.protocol == "https") {
13✔
551
                                socket_mode_ = SocketMode::Tls;
7✔
552
                        } else if (proxy_address.protocol == "http") {
6✔
553
                                socket_mode_ = SocketMode::Plain;
6✔
554
                        } else {
555
                                // Should never get here.
556
                                assert(false);
557
                        }
558
                }
559
        } else {
560
                // Should never get here
561
                assert(false);
562
        }
563

564
        return error::NoError;
275✔
565
}
566

567
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
175✔
568
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
175✔
569
                return expected::unexpected(error::Error(
2✔
570
                        make_error_condition(errc::operation_in_progress),
4✔
571
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
572
        }
573

574
        if (GetContentLength(*response_data_.http_response_parser_) == 0
173✔
575
                && !response_data_.http_response_parser_->chunked()) {
173✔
576
                return expected::unexpected(
17✔
577
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
578
        }
579

580
        status_ = TransactionStatus::ReaderCreated;
156✔
581
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
312✔
582
}
583

584
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
585
        if (*cancelled_) {
7✔
586
                return expected::unexpected(error::Error(
×
587
                        make_error_condition(errc::not_connected),
×
588
                        "Cannot switch protocols if endpoint is not connected"));
×
589
        }
590

591
        // Rest of the connection is done directly on the socket, we are done here.
592
        status_ = TransactionStatus::Done;
7✔
593
        *cancelled_ = true;
7✔
594
        cancelled_ = make_shared<bool>(false);
14✔
595

596
        auto stream = stream_;
597
        // This no longer belongs to us.
598
        stream_.reset();
7✔
599

600
        switch (socket_mode_) {
7✔
601
        case SocketMode::TlsTls:
×
602
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
603
                        stream, response_data_.response_buffer_);
×
604
        case SocketMode::Tls:
×
605
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
606
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
607
                        response_data_.response_buffer_);
×
608
        case SocketMode::Plain:
7✔
609
                return make_shared<RawSocket<tcp::socket>>(
7✔
610
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
611
                        response_data_.response_buffer_);
7✔
612
        }
613

614
        AssertOrReturnUnexpected(false);
×
615
}
616

617
void Client::CallHandler(ResponseHandler handler) {
362✔
618
        // This function exists to make sure we have a copy of the handler we're calling (in the
619
        // argument list). This is important in case the handler owns the client instance through a
620
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
621
        // does, then it destroys the final copy of the handler, and therefore also the client,
622
        // which is why we need to make a copy here, before calling it.
623
        handler(response_);
362✔
624
}
362✔
625

626
void Client::CallErrorHandler(
83✔
627
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
628
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
249✔
629
}
83✔
630

631
void Client::CallErrorHandler(
125✔
632
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
633
        status_ = TransactionStatus::Done;
125✔
634
        DoCancel();
125✔
635
        handler(expected::unexpected(
250✔
636
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
500✔
637
}
125✔
638

639
void Client::ResolveHandler(
270✔
640
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
641
        if (ec) {
270✔
642
                CallErrorHandler(ec, request_, header_handler_);
×
643
                return;
×
644
        }
645

646
        if (logger_.Level() >= log::LogLevel::Debug) {
270✔
647
                string ips = "[";
248✔
648
                string sep;
649
                for (auto r : results) {
1,040✔
650
                        ips += sep;
272✔
651
                        ips += r.endpoint().address().to_string();
272✔
652
                        sep = ", ";
272✔
653
                }
654
                ips += "]";
248✔
655
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
496✔
656
        }
657

658
        resolver_results_ = results;
659

660
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
270✔
661
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
540✔
662

663
        if (response_data_.response_buffer_) {
270✔
664
                // We can reuse this if preexisting, just make sure we start with a
665
                // clean state (while avoiding shrinking/discarding the buffer, see
666
                // https://www.boost.org/doc/libs/1_70_0/libs/beast/doc/html/beast/ref/boost__beast__basic_flat_buffer/clear.html
667
                // for details).
668
                // Since there should be no leftover bytes from previous responses, we
669
                // log if there are some, but let's not bother all users with a warning,
670
                // there is nothing they could do about it. However, for
671
                // testing/debugging/CI, it can be useful to have this information.
672
                if (response_data_.response_buffer_->size() > 0) {
70✔
673
                        logger_.Debug(
1✔
674
                                "Leftover data from the previous response! ("
675
                                + to_string(response_data_.response_buffer_->size()) + " bytes)");
2✔
676
                }
677
                response_data_.response_buffer_->clear();
678
        } else {
679
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
400✔
680

681
                // This is equivalent to:
682
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
683
                // but compatible with Boost 1.67.
684
                response_data_.response_buffer_->prepare(
685
                        body_buffer_.size() - response_data_.response_buffer_->size());
200✔
686
        }
687

688
        auto &cancelled = cancelled_;
689

690
        asio::async_connect(
270✔
691
                stream_->lowest_layer(),
692
                resolver_results_,
270✔
693
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
540✔
694
                        if (!*cancelled) {
270✔
695
                                switch (socket_mode_) {
270✔
696
                                case SocketMode::TlsTls:
×
697
                                        // Should never happen because we always need to handshake
698
                                        // the innermost Tls first, then the outermost, but the
699
                                        // latter doesn't happen here.
700
                                        assert(false);
701
                                        CallErrorHandler(
×
702
                                                error::MakeError(
×
703
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
704
                                                request_,
×
705
                                                header_handler_);
×
706
                                case SocketMode::Tls:
21✔
707
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
708
                                case SocketMode::Plain:
249✔
709
                                        return ConnectHandler(ec, endpoint);
249✔
710
                                }
711
                        }
712
                });
713
}
714

715
template <typename StreamType>
716
void Client::HandshakeHandler(
25✔
717
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
718
        if (ec) {
25✔
719
                CallErrorHandler(ec, request_, header_handler_);
2✔
720
                return;
2✔
721
        }
722

723
        // Enable TCP keepalive
724
        boost::asio::socket_base::keep_alive option(true);
725
        stream_->lowest_layer().set_option(option);
23✔
726

727
        // We can't avoid a C style cast on this next line. The usual method by which system headers
728
        // are excluded from warnings doesn't work, because `SSL_set_tlsext_host_name` is a macro,
729
        // containing a cast, which expands here, not in the original file. So just disable the
730
        // warning here.
731
#ifdef __clang__
732
#pragma clang diagnostic push
733
#pragma clang diagnostic ignored "-Wold-style-cast"
734
#else
735
#pragma GCC diagnostic push
736
#pragma GCC diagnostic ignored "-Wold-style-cast"
737
#endif
738
        // Set SNI Hostname (many hosts need this to handshake successfully)
739
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
740
#ifdef __clang__
741
#pragma clang diagnostic pop
742
#else
743
#pragma GCC diagnostic pop
744
#endif
745
                beast::error_code ec2 {
×
746
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
747
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
748
        }
749

750
        // Enable host name verification (not done automatically and we don't have
751
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
752
        // hence the callback that boost provides).
753
        boost::system::error_code b_ec;
23✔
754
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
755
        if (b_ec) {
23✔
756
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
757
                CallErrorHandler(b_ec, request_, header_handler_);
×
758
                return;
×
759
        }
760

761
        auto &cancelled = cancelled_;
762

763
        stream.async_handshake(
46✔
764
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
765
                        if (*cancelled) {
26✔
766
                                return;
767
                        }
768
                        if (ec) {
26✔
769
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
770
                                CallErrorHandler(ec, request_, header_handler_);
10✔
771
                                return;
10✔
772
                        }
773
                        logger_.Debug("https: Successful SSL handshake");
32✔
774
                        ConnectHandler(ec, endpoint);
16✔
775
                });
776
}
777

778

779
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
265✔
780
        if (ec) {
265✔
781
                CallErrorHandler(ec, request_, header_handler_);
16✔
782
                return;
16✔
783
        }
784

785
        // Enable TCP keepalive
786
        boost::asio::socket_base::keep_alive option(true);
787
        stream_->lowest_layer().set_option(option);
249✔
788

789
        logger_.Debug("Connected to " + endpoint.address().to_string());
498✔
790

791
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
249✔
792
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
498✔
793

794
        for (const auto &header : request_->headers_) {
892✔
795
                request_data_.http_request_->set(header.first, header.second);
643✔
796
        }
797

798
        request_data_.http_request_serializer_ =
799
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
249✔
800

801
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
498✔
802

803
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
804
        // if they do, they should be handled higher up in the application logic.
805
        //
806
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
807
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
808
        // `has_value()` in their code, causing their subsequent comparison operation to
809
        // misbehave. So pass highest possible value instead.
810
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
811

812
        auto &cancelled = cancelled_;
813
        auto &request_data = request_data_;
249✔
814

815
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
249✔
816
                if (!*cancelled) {
249✔
817
                        WriteHeaderHandler(ec, num_written);
249✔
818
                }
819
        };
498✔
820

821
        switch (socket_mode_) {
249✔
822
        case SocketMode::TlsTls:
2✔
823
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
824
                break;
825
        case SocketMode::Tls:
14✔
826
                http::async_write_header(
14✔
827
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
828
                break;
829
        case SocketMode::Plain:
233✔
830
                http::async_write_header(
233✔
831
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
832
                break;
833
        }
834
}
835

836
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
249✔
837
        if (num_written > 0) {
249✔
838
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
498✔
839
        }
840

841
        if (ec) {
249✔
842
                CallErrorHandler(ec, request_, header_handler_);
×
843
                return;
209✔
844
        }
845

846
        auto exp_has_body =
847
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
498✔
848
        if (!exp_has_body) {
249✔
849
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
850
                return;
×
851
        }
852
        if (!exp_has_body.value()) {
249✔
853
                ReadHeader();
208✔
854
                return;
855
        }
856

857
        if (!request_->body_gen_ && !request_->async_body_gen_) {
41✔
858
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
859
                CallErrorHandler(err, request_, header_handler_);
2✔
860
                return;
861
        }
862

863
        assert(!(request_->body_gen_ && request_->async_body_gen_));
864

865
        if (request_->body_gen_) {
40✔
866
                auto body_reader = request_->body_gen_();
34✔
867
                if (!body_reader) {
34✔
868
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
869
                        return;
870
                }
871
                request_->body_reader_ = body_reader.value();
34✔
872
        } else {
873
                auto body_reader = request_->async_body_gen_();
6✔
874
                if (!body_reader) {
6✔
875
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
876
                        return;
877
                }
878
                request_->async_body_reader_ = body_reader.value();
6✔
879
        }
880

881
        PrepareAndWriteNewBodyBuffer();
40✔
882
}
883

884
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,280✔
885
        if (num_written > 0) {
2,280✔
886
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,244✔
887
        }
888

889
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,280✔
890
                // Write next block of the body.
891
                PrepareAndWriteNewBodyBuffer();
1,121✔
892
        } else if (ec) {
1,159✔
893
                CallErrorHandler(ec, request_, header_handler_);
8✔
894
        } else if (num_written > 0) {
1,155✔
895
                // We are still writing the body.
896
                WriteBody();
1,122✔
897
        } else {
898
                // We are ready to receive the response.
899
                ReadHeader();
33✔
900
        }
901
}
2,280✔
902

903
void Client::PrepareAndWriteNewBodyBuffer() {
1,161✔
904
        // request_->body_reader_ XOR request_->async_body_reader_
905
        assert(
906
                (request_->body_reader_ || request_->async_body_reader_)
907
                && !(request_->body_reader_ && request_->async_body_reader_));
908

909
        auto cancelled = cancelled_;
910
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,594✔
911
                if (!*cancelled) {
1,161✔
912
                        if (!read) {
1,160✔
913
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
914
                                return;
2✔
915
                        }
916
                        WriteNewBodyBuffer(read.value());
1,158✔
917
                }
918
        };
1,161✔
919

920

921
        if (request_->body_reader_) {
1,161✔
922
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,472✔
923
        } else {
924
                auto err = request_->async_body_reader_->AsyncRead(
925
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
926
                if (err != error::NoError) {
425✔
927
                        CallErrorHandler(err, request_, header_handler_);
×
928
                }
929
        }
930
}
1,161✔
931

932
void Client::WriteNewBodyBuffer(size_t size) {
1,158✔
933
        request_data_.http_request_->body().data = body_buffer_.data();
1,158✔
934
        request_data_.http_request_->body().size = size;
1,158✔
935

936
        if (size > 0) {
1,158✔
937
                request_data_.http_request_->body().more = true;
1,125✔
938
        } else {
939
                // Release ownership of Body reader.
940
                request_->body_reader_.reset();
33✔
941
                request_->async_body_reader_.reset();
33✔
942
                request_data_.http_request_->body().more = false;
33✔
943
        }
944

945
        WriteBody();
1,158✔
946
}
1,158✔
947

948
void Client::WriteBody() {
2,280✔
949
        auto &cancelled = cancelled_;
950
        auto &request_data = request_data_;
2,280✔
951

952
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,280✔
953
                if (!*cancelled) {
2,280✔
954
                        WriteBodyHandler(ec, num_written);
2,280✔
955
                }
956
        };
4,560✔
957

958
        switch (socket_mode_) {
2,280✔
959
        case SocketMode::TlsTls:
×
960
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
961
                break;
962
        case SocketMode::Tls:
×
963
                http::async_write_some(
964
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
965
                break;
966
        case SocketMode::Plain:
2,280✔
967
                http::async_write_some(
968
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
969
                break;
970
        }
971
}
2,280✔
972

973
void Client::ReadHeader() {
241✔
974
        auto &cancelled = cancelled_;
975
        auto &response_data = response_data_;
241✔
976

977
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
238✔
978
                if (!*cancelled) {
238✔
979
                        ReadHeaderHandler(ec, num_read);
238✔
980
                }
981
        };
482✔
982

983
        switch (socket_mode_) {
241✔
984
        case SocketMode::TlsTls:
2✔
985
                http::async_read_some(
2✔
986
                        *stream_,
987
                        *response_data_.response_buffer_,
988
                        *response_data_.http_response_parser_,
989
                        handler);
990
                break;
991
        case SocketMode::Tls:
14✔
992
                http::async_read_some(
14✔
993
                        stream_->next_layer(),
994
                        *response_data_.response_buffer_,
995
                        *response_data_.http_response_parser_,
996
                        handler);
997
                break;
998
        case SocketMode::Plain:
225✔
999
                http::async_read_some(
225✔
1000
                        stream_->next_layer().next_layer(),
1001
                        *response_data_.response_buffer_,
1002
                        *response_data_.http_response_parser_,
1003
                        handler);
1004
                break;
1005
        }
1006
}
241✔
1007

1008
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
238✔
1009
        if (num_read > 0) {
238✔
1010
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
466✔
1011
        }
1012

1013
        if (ec) {
238✔
1014
                CallErrorHandler(ec, request_, header_handler_);
5✔
1015
                return;
65✔
1016
        }
1017

1018
        if (!response_data_.http_response_parser_->is_header_done()) {
233✔
1019
                ReadHeader();
×
1020
                return;
×
1021
        }
1022

1023
        if (secondary_req_) {
233✔
1024
                HandleSecondaryRequest();
9✔
1025
                return;
9✔
1026
        }
1027

1028
        response_.reset(new IncomingResponse(*this, cancelled_));
448✔
1029
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
224✔
1030
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
224✔
1031

1032
        logger_.Debug(
448✔
1033
                "Received response: " + to_string(response_->status_code_) + " "
448✔
1034
                + response_->status_message_);
672✔
1035

1036
        string debug_str;
1037
        for (auto header = response_data_.http_response_parser_->get().cbegin();
258✔
1038
                 header != response_data_.http_response_parser_->get().cend();
482✔
1039
                 header++) {
1040
                response_->headers_[string {header->name_string()}] = string {header->value()};
774✔
1041
                if (logger_.Level() >= log::LogLevel::Debug) {
258✔
1042
                        debug_str += string {header->name_string()};
243✔
1043
                        debug_str += ": ";
243✔
1044
                        debug_str += string {header->value()};
243✔
1045
                        debug_str += "\n";
243✔
1046
                }
1047
        }
1048

1049
        logger_.Debug("Received headers:\n" + debug_str);
448✔
1050
        debug_str.clear();
1051

1052
        if (GetContentLength(*response_data_.http_response_parser_) == 0
224✔
1053
                && !response_data_.http_response_parser_->chunked()) {
224✔
1054
                auto cancelled = cancelled_;
1055
                status_ = TransactionStatus::HeaderHandlerCalled;
48✔
1056
                CallHandler(header_handler_);
96✔
1057
                if (!*cancelled) {
48✔
1058
                        status_ = TransactionStatus::Done;
43✔
1059
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
43✔
1060
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1061
                                // is meant to be reused.
1062
                                DoCancel();
39✔
1063
                        }
1064
                        CallHandler(body_handler_);
86✔
1065
                }
1066
                return;
1067
        }
1068

1069
        auto cancelled = cancelled_;
1070
        status_ = TransactionStatus::HeaderHandlerCalled;
176✔
1071
        CallHandler(header_handler_);
352✔
1072
        if (*cancelled) {
176✔
1073
                return;
1074
        }
1075

1076
        // We know that a body reader is required here, because of the check for body above.
1077
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
173✔
1078
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1079
        }
1080
}
1081

1082
void Client::HandleSecondaryRequest() {
9✔
1083
        logger_.Debug(
18✔
1084
                "Received proxy response: "
1085
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1086
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1087

1088
        request_ = std::move(secondary_req_);
1089

1090
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1091
                auto err = MakeError(
1092
                        ProxyError,
1093
                        "Proxy returned unexpected response: "
1094
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1095
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1096
                CallErrorHandler(err, request_, header_handler_);
4✔
1097
                return;
1098
        }
1099

1100
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1101
                || response_data_.http_response_parser_->chunked()) {
7✔
1102
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
1103
                CallErrorHandler(err, request_, header_handler_);
×
1104
                return;
1105
        }
1106

1107
        // We are connected. Now repeat the request cycle with the original request. Pretend
1108
        // we were just connected.
1109

1110
        assert(request_->GetProtocol() == "https");
1111

1112
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1113
        // a different layer, this will get out of sync.
1114
        assert(response_data_.response_buffer_->size() == 0);
1115

1116
        switch (socket_mode_) {
7✔
1117
        case SocketMode::TlsTls:
×
1118
                // Should never get here, because this is the only place where TlsTls mode
1119
                // is supposed to be turned on.
1120
                assert(false);
1121
                CallErrorHandler(
×
1122
                        error::MakeError(
×
1123
                                error::ProgrammingError,
1124
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1125
                        request_,
×
1126
                        header_handler_);
×
1127
                break;
×
1128
        case SocketMode::Tls:
3✔
1129
                // Upgrade to TLS inside TLS.
1130
                socket_mode_ = SocketMode::TlsTls;
3✔
1131
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1132
                break;
3✔
1133
        case SocketMode::Plain:
4✔
1134
                // Upgrade to TLS.
1135
                socket_mode_ = SocketMode::Tls;
4✔
1136
                HandshakeHandler(
4✔
1137
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1138
                break;
4✔
1139
        }
1140
}
1141

1142
void Client::AsyncReadNextBodyPart(
4,663✔
1143
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1144
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1145

1146
        if (status_ == TransactionStatus::ReaderCreated) {
4,663✔
1147
                status_ = TransactionStatus::BodyReadingInProgress;
154✔
1148
        }
1149

1150
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
4,663✔
1151
                auto cancelled = cancelled_;
1152
                handler(0);
190✔
1153
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
95✔
1154
                        status_ = TransactionStatus::Done;
95✔
1155
                        DoCancel();
95✔
1156
                        CallHandler(body_handler_);
190✔
1157
                }
1158
                return;
1159
        }
1160

1161
        reader_buf_start_ = start;
4,568✔
1162
        reader_buf_end_ = end;
4,568✔
1163
        reader_handler_ = handler;
4,568✔
1164
        size_t read_size = end - start;
4,568✔
1165
        size_t smallest = min(body_buffer_.size(), read_size);
6,681✔
1166

1167
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,568✔
1168
        response_data_.http_response_parser_->get().body().size = smallest;
4,568✔
1169
        response_data_.last_buffer_size_ = smallest;
4,568✔
1170

1171
        auto &cancelled = cancelled_;
1172
        auto &response_data = response_data_;
4,568✔
1173

1174
        // Add a timeout timer to ensure we don't get stuck if we lose connection
1175
        read_timeout_timer_.AsyncWait(chrono::minutes(5), [this, cancelled](error::Error err) {
18,231✔
1176
                if (!*cancelled) {
4,527✔
1177
                        if (err != error::NoError) {
4,469✔
1178
                                if (err.code != make_error_condition(errc::operation_canceled)) {
4,469✔
1179
                                        log::Error("Read timeout timer caused error: " + err.String());
×
1180
                                        CallErrorHandler(err, request_, body_handler_);
×
1181
                                }
1182
                        } else {
1183
                                CallErrorHandler(
×
1184
                                        MakeError(DownloadResumerError, "Read timed out"), request_, body_handler_);
×
1185
                        }
1186
                }
1187
        });
13,663✔
1188

1189
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,567✔
1190
                if (!*cancelled) {
4,567✔
1191
                        read_timeout_timer_.Cancel();
4,567✔
1192
                        ReadBodyHandler(ec, num_read);
4,567✔
1193
                }
1194
        };
9,136✔
1195

1196
        switch (socket_mode_) {
4,568✔
1197
        case SocketMode::TlsTls:
2✔
1198
                http::async_read_some(
2✔
1199
                        *stream_,
1200
                        *response_data_.response_buffer_,
1201
                        *response_data_.http_response_parser_,
1202
                        async_handler);
1203
                break;
1204
        case SocketMode::Tls:
4✔
1205
                http::async_read_some(
4✔
1206
                        stream_->next_layer(),
1207
                        *response_data_.response_buffer_,
1208
                        *response_data_.http_response_parser_,
1209
                        async_handler);
1210
                break;
1211
        case SocketMode::Plain:
4,562✔
1212
                http::async_read_some(
4,562✔
1213
                        stream_->next_layer().next_layer(),
1214
                        *response_data_.response_buffer_,
1215
                        *response_data_.http_response_parser_,
1216
                        async_handler);
1217
                break;
1218
        }
1219
}
1220

1221
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,567✔
1222
        if (num_read > 0) {
4,567✔
1223
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
9,034✔
1224
        }
1225

1226
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,567✔
1227
                // This can be ignored. We always reset the buffer between reads anyway.
1228
                ec = error_code();
1,958✔
1229
        }
1230

1231
        assert(reader_handler_);
1232

1233
        if (response_data_.http_response_parser_->is_done()) {
4,567✔
1234
                status_ = TransactionStatus::BodyReadingFinished;
101✔
1235
        }
1236

1237
        auto cancelled = cancelled_;
1238

1239
        if (ec) {
4,567✔
1240
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
100✔
1241
                reader_handler_(expected::unexpected(err));
150✔
1242
                if (!*cancelled) {
50✔
1243
                        CallErrorHandler(ec, request_, body_handler_);
92✔
1244
                }
1245
                return;
1246
        }
1247

1248
        // The num_read from above includes out of band payload data, such as chunk headers, which
1249
        // we are not interested in. So we need to calculate the payload size from the remaining
1250
        // buffer space.
1251
        size_t payload_read =
1252
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,517✔
1253

1254
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,517✔
1255
        size_t smallest = min(payload_read, buf_size);
4,517✔
1256

1257
        if (smallest == 0) {
4,517✔
1258
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1259
                // return 0 to the handler however, because in `io::Reader` context this means
1260
                // EOF. So just repeat the request instead, until we get actual payload data.
1261
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
462✔
1262
        } else {
1263
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
4,286✔
1264
                reader_handler_(smallest);
8,572✔
1265
        }
1266
}
1267

1268
void Client::Cancel() {
205✔
1269
        auto cancelled = cancelled_;
1270

1271
        if (!*cancelled) {
205✔
1272
                auto err =
1273
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
40✔
1274
                switch (status_) {
20✔
1275
                case TransactionStatus::None:
3✔
1276
                        CallErrorHandler(err, request_, header_handler_);
3✔
1277
                        break;
3✔
1278
                case TransactionStatus::HeaderHandlerCalled:
16✔
1279
                case TransactionStatus::ReaderCreated:
1280
                case TransactionStatus::BodyReadingInProgress:
1281
                case TransactionStatus::BodyReadingFinished:
1282
                        CallErrorHandler(err, request_, body_handler_);
16✔
1283
                        break;
16✔
1284
                case TransactionStatus::Replying:
1285
                case TransactionStatus::SwitchingProtocol:
1286
                        // Not used by client.
1287
                        assert(false);
1288
                        break;
1289
                case TransactionStatus::BodyHandlerCalled:
1290
                case TransactionStatus::Done:
1291
                        break;
1292
                }
1293
        }
1294

1295
        if (!*cancelled) {
205✔
1296
                DoCancel();
1✔
1297
        }
1298
}
205✔
1299

1300
void Client::DoCancel() {
618✔
1301
        resolver_.cancel();
618✔
1302
        read_timeout_timer_.Cancel();
618✔
1303
        if (stream_) {
618✔
1304
                stream_->lowest_layer().cancel();
263✔
1305
                stream_->lowest_layer().close();
263✔
1306
                stream_.reset();
263✔
1307
        }
1308

1309
        // Reset logger to no connection.
1310
        logger_ = log::Logger(logger_name_);
618✔
1311

1312
        // Set cancel state and then make a new one. Those who are interested should have their own
1313
        // pointer to the old one.
1314
        *cancelled_ = true;
618✔
1315
        cancelled_ = make_shared<bool>(true);
618✔
1316
}
618✔
1317

1318
Stream::Stream(Server &server) :
447✔
1319
        server_ {server},
1320
        logger_ {"http"},
1321
        cancelled_(make_shared<bool>(true)),
447✔
1322
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
447✔
1323
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,341✔
1324
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
894✔
1325

1326
        // This is equivalent to:
1327
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1328
        // but compatible with Boost 1.67.
1329
        request_data_.request_buffer_->prepare(
1330
                body_buffer_.size() - request_data_.request_buffer_->size());
447✔
1331

1332
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
894✔
1333

1334
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1335
        // they do, they should be handled higher up in the application logic.
1336
        //
1337
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1338
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1339
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1340
        // possible value instead.
1341
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1342
}
447✔
1343

1344
Stream::~Stream() {
1,341✔
1345
        DoCancel();
447✔
1346
}
447✔
1347

1348
void Stream::Cancel() {
7✔
1349
        auto cancelled = cancelled_;
1350

1351
        if (!*cancelled) {
7✔
1352
                auto err =
1353
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1354
                switch (status_) {
7✔
1355
                case TransactionStatus::None:
×
1356
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1357
                        break;
×
1358
                case TransactionStatus::HeaderHandlerCalled:
5✔
1359
                case TransactionStatus::ReaderCreated:
1360
                case TransactionStatus::BodyReadingInProgress:
1361
                case TransactionStatus::BodyReadingFinished:
1362
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1363
                        break;
5✔
1364
                case TransactionStatus::BodyHandlerCalled:
×
1365
                        // In between body handler and reply finished. No one to handle the status
1366
                        // here.
1367
                        server_.RemoveStream(shared_from_this());
×
1368
                        break;
×
1369
                case TransactionStatus::Replying:
1✔
1370
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1371
                        break;
1✔
1372
                case TransactionStatus::SwitchingProtocol:
1✔
1373
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1374
                        break;
1✔
1375
                case TransactionStatus::Done:
1376
                        break;
1377
                }
1378
        }
1379

1380
        if (!*cancelled) {
7✔
1381
                DoCancel();
×
1382
        }
1383
}
7✔
1384

1385
void Stream::DoCancel() {
807✔
1386
        if (socket_.is_open()) {
807✔
1387
                socket_.cancel();
220✔
1388
                socket_.close();
220✔
1389
        }
1390

1391
        // Set cancel state and then make a new one. Those who are interested should have their own
1392
        // pointer to the old one.
1393
        *cancelled_ = true;
807✔
1394
        cancelled_ = make_shared<bool>(true);
807✔
1395
}
807✔
1396

1397
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1398
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1399
}
×
1400

1401
void Stream::CallErrorHandler(
×
1402
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1403
        status_ = TransactionStatus::Done;
×
1404
        DoCancel();
×
1405
        handler(expected::unexpected(err.WithContext(
×
1406
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1407

1408
        server_.RemoveStream(shared_from_this());
×
1409
}
×
1410

1411
void Stream::CallErrorHandler(
2✔
1412
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1413
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1414
}
2✔
1415

1416
void Stream::CallErrorHandler(
8✔
1417
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1418
        status_ = TransactionStatus::Done;
8✔
1419
        DoCancel();
8✔
1420
        handler(
8✔
1421
                req,
1422
                err.WithContext(
8✔
1423
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1424

1425
        server_.RemoveStream(shared_from_this());
8✔
1426
}
8✔
1427

1428
void Stream::CallErrorHandler(
4✔
1429
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1430
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1431
}
4✔
1432

1433
void Stream::CallErrorHandler(
7✔
1434
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1435
        status_ = TransactionStatus::Done;
7✔
1436
        DoCancel();
7✔
1437
        handler(err.WithContext(
14✔
1438
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1439

1440
        server_.RemoveStream(shared_from_this());
7✔
1441
}
7✔
1442

1443
void Stream::CallErrorHandler(
×
1444
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1445
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1446
}
×
1447

1448
void Stream::CallErrorHandler(
1✔
1449
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1450
        status_ = TransactionStatus::Done;
1✔
1451
        DoCancel();
1✔
1452
        handler(expected::unexpected(err.WithContext(
2✔
1453
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1454

1455
        server_.RemoveStream(shared_from_this());
1✔
1456
}
1✔
1457

1458
void Stream::AcceptHandler(const error_code &ec) {
228✔
1459
        if (ec) {
228✔
1460
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1461
                return;
×
1462
        }
1463

1464
        auto ip = socket_.remote_endpoint().address().to_string();
456✔
1465

1466
        // Use IP as context for logging.
1467
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
228✔
1468

1469
        logger_.Debug("Accepted connection.");
456✔
1470

1471
        request_.reset(new IncomingRequest(*this, cancelled_));
456✔
1472

1473
        request_->address_.host = ip;
228✔
1474

1475
        *cancelled_ = false;
228✔
1476

1477
        ReadHeader();
228✔
1478
}
1479

1480
void Stream::ReadHeader() {
228✔
1481
        auto &cancelled = cancelled_;
1482
        auto &request_data = request_data_;
228✔
1483

1484
        http::async_read_some(
456✔
1485
                socket_,
228✔
1486
                *request_data_.request_buffer_,
1487
                *request_data_.http_request_parser_,
1488
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
228✔
1489
                        if (!*cancelled) {
228✔
1490
                                ReadHeaderHandler(ec, num_read);
228✔
1491
                        }
1492
                });
228✔
1493
}
228✔
1494

1495
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
228✔
1496
        if (num_read > 0) {
228✔
1497
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
456✔
1498
        }
1499

1500
        if (ec) {
228✔
1501
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1502
                return;
188✔
1503
        }
1504

1505
        if (!request_data_.http_request_parser_->is_header_done()) {
228✔
1506
                ReadHeader();
×
1507
                return;
×
1508
        }
1509

1510
        auto method_result = BeastVerbToMethod(
1511
                request_data_.http_request_parser_->get().base().method(),
1512
                string {request_data_.http_request_parser_->get().base().method_string()});
456✔
1513
        if (!method_result) {
228✔
1514
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1515
                return;
×
1516
        }
1517
        request_->method_ = method_result.value();
228✔
1518
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
228✔
1519

1520
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
228✔
1521

1522
        string debug_str;
1523
        for (auto header = request_data_.http_request_parser_->get().cbegin();
622✔
1524
                 header != request_data_.http_request_parser_->get().cend();
850✔
1525
                 header++) {
1526
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,866✔
1527
                if (logger_.Level() >= log::LogLevel::Debug) {
622✔
1528
                        debug_str += string {header->name_string()};
537✔
1529
                        debug_str += ": ";
537✔
1530
                        debug_str += string {header->value()};
537✔
1531
                        debug_str += "\n";
537✔
1532
                }
1533
        }
1534

1535
        logger_.Debug("Received headers:\n" + debug_str);
456✔
1536
        debug_str.clear();
1537

1538
        if (GetContentLength(*request_data_.http_request_parser_) == 0
228✔
1539
                && !request_data_.http_request_parser_->chunked()) {
228✔
1540
                auto cancelled = cancelled_;
1541
                status_ = TransactionStatus::HeaderHandlerCalled;
187✔
1542
                server_.header_handler_(request_);
374✔
1543
                if (!*cancelled) {
187✔
1544
                        status_ = TransactionStatus::BodyHandlerCalled;
187✔
1545
                        CallBodyHandler();
187✔
1546
                }
1547
                return;
1548
        }
1549

1550
        assert(!request_data_.http_request_parser_->is_done());
1551

1552
        auto cancelled = cancelled_;
1553
        status_ = TransactionStatus::HeaderHandlerCalled;
41✔
1554
        server_.header_handler_(request_);
82✔
1555
        if (*cancelled) {
41✔
1556
                return;
1557
        }
1558

1559
        // We know that a body reader is required here, because of the check for body above.
1560
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
40✔
1561
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1562
        }
1563
}
1564

1565
void Stream::AsyncReadNextBodyPart(
2,264✔
1566
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1567
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1568

1569
        if (status_ == TransactionStatus::ReaderCreated) {
2,264✔
1570
                status_ = TransactionStatus::BodyReadingInProgress;
39✔
1571
        }
1572

1573
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,264✔
1574
                auto cancelled = cancelled_;
1575
                handler(0);
66✔
1576
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
33✔
1577
                        status_ = TransactionStatus::BodyHandlerCalled;
33✔
1578
                        CallBodyHandler();
33✔
1579
                }
1580
                return;
1581
        }
1582

1583
        reader_buf_start_ = start;
2,231✔
1584
        reader_buf_end_ = end;
2,231✔
1585
        reader_handler_ = handler;
2,231✔
1586
        size_t read_size = end - start;
2,231✔
1587
        size_t smallest = min(body_buffer_.size(), read_size);
3,287✔
1588

1589
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,231✔
1590
        request_data_.http_request_parser_->get().body().size = smallest;
2,231✔
1591
        request_data_.last_buffer_size_ = smallest;
2,231✔
1592

1593
        auto &cancelled = cancelled_;
1594
        auto &request_data = request_data_;
2,231✔
1595

1596
        http::async_read_some(
4,462✔
1597
                socket_,
2,231✔
1598
                *request_data_.request_buffer_,
1599
                *request_data_.http_request_parser_,
1600
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,231✔
1601
                        if (!*cancelled) {
2,231✔
1602
                                ReadBodyHandler(ec, num_read);
2,231✔
1603
                        }
1604
                });
2,231✔
1605
}
1606

1607
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,231✔
1608
        if (num_read > 0) {
2,231✔
1609
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,454✔
1610
        }
1611

1612
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,231✔
1613
                // This can be ignored. We always reset the buffer between reads anyway.
1614
                ec = error_code();
979✔
1615
        }
1616

1617
        assert(reader_handler_);
1618

1619
        if (request_data_.http_request_parser_->is_done()) {
2,231✔
1620
                status_ = TransactionStatus::BodyReadingFinished;
33✔
1621
        }
1622

1623
        auto cancelled = cancelled_;
1624

1625
        if (ec) {
2,231✔
1626
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1627
                reader_handler_(expected::unexpected(err));
12✔
1628
                if (!*cancelled) {
4✔
1629
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1630
                }
1631
                return;
1632
        }
1633

1634
        // The num_read from above includes out of band payload data, such as chunk headers, which
1635
        // we are not interested in. So we need to calculate the payload size from the remaining
1636
        // buffer space.
1637
        size_t payload_read =
1638
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,227✔
1639

1640
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,227✔
1641
        size_t smallest = min(payload_read, buf_size);
2,227✔
1642

1643
        if (smallest == 0) {
2,227✔
1644
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1645
                // return 0 to the handler however, because in `io::Reader` context this means
1646
                // EOF. So just repeat the request instead, until we get actual payload data.
1647
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1648
        } else {
1649
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,150✔
1650
                reader_handler_(smallest);
4,300✔
1651
        }
1652
}
1653

1654
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
205✔
1655
        SetupResponse();
205✔
1656

1657
        reply_finished_handler_ = reply_finished_handler;
205✔
1658

1659
        auto &cancelled = cancelled_;
1660
        auto &response_data = response_data_;
205✔
1661

1662
        http::async_write_header(
410✔
1663
                socket_,
205✔
1664
                *response_data_.http_response_serializer_,
1665
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
205✔
1666
                        if (!*cancelled) {
205✔
1667
                                WriteHeaderHandler(ec, num_written);
204✔
1668
                        }
1669
                });
205✔
1670
}
205✔
1671

1672
void Stream::SetupResponse() {
214✔
1673
        auto response = maybe_response_.lock();
214✔
1674
        // Only called from existing responses, so this should always be true.
1675
        assert(response);
1676

1677
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1678
        status_ = TransactionStatus::Replying;
214✔
1679

1680
        // From here on we take shared ownership.
1681
        response_ = response;
1682

1683
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
428✔
1684

1685
        for (const auto &header : response->headers_) {
444✔
1686
                response_data_.http_response_->base().set(header.first, header.second);
230✔
1687
        }
1688

1689
        response_data_.http_response_->result(response->GetStatusCode());
214✔
1690
        response_data_.http_response_->reason(response->GetStatusMessage());
428✔
1691

1692
        response_data_.http_response_serializer_ =
1693
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
428✔
1694
}
214✔
1695

1696
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
204✔
1697
        if (num_written > 0) {
204✔
1698
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
408✔
1699
        }
1700

1701
        if (ec) {
204✔
1702
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1703
                return;
37✔
1704
        }
1705

1706
        auto exp_has_body =
1707
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
408✔
1708
        if (!exp_has_body) {
204✔
1709
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
1710
                return;
×
1711
        }
1712
        if (!exp_has_body.value()) {
204✔
1713
                FinishReply();
36✔
1714
                return;
1715
        }
1716

1717
        if (!response_->body_reader_ && !response_->async_body_reader_) {
168✔
1718
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1719
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1720
                return;
1721
        }
1722

1723
        PrepareAndWriteNewBodyBuffer();
167✔
1724
}
1725

1726
void Stream::PrepareAndWriteNewBodyBuffer() {
2,229✔
1727
        // response_->body_reader_ XOR response_->async_body_reader_
1728
        assert(
1729
                (response_->body_reader_ || response_->async_body_reader_)
1730
                && !(response_->body_reader_ && response_->async_body_reader_));
1731

1732
        auto read_handler = [this](io::ExpectedSize read) {
2,230✔
1733
                if (!read) {
2,229✔
1734
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1735
                        return;
1✔
1736
                }
1737
                WriteNewBodyBuffer(read.value());
2,228✔
1738
        };
2,229✔
1739

1740
        if (response_->body_reader_) {
2,229✔
1741
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,910✔
1742
        } else {
1743
                auto err = response_->async_body_reader_->AsyncRead(
1744
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1745
                if (err != error::NoError) {
274✔
1746
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1747
                }
1748
        }
1749
}
2,229✔
1750

1751
void Stream::WriteNewBodyBuffer(size_t size) {
2,228✔
1752
        response_data_.http_response_->body().data = body_buffer_.data();
2,228✔
1753
        response_data_.http_response_->body().size = size;
2,228✔
1754

1755
        if (size > 0) {
2,228✔
1756
                response_data_.http_response_->body().more = true;
2,098✔
1757
        } else {
1758
                response_data_.http_response_->body().more = false;
130✔
1759
        }
1760

1761
        WriteBody();
2,228✔
1762
}
2,228✔
1763

1764
void Stream::WriteBody() {
4,308✔
1765
        auto &cancelled = cancelled_;
1766
        auto &response_data = response_data_;
4,308✔
1767

1768
        http::async_write_some(
8,616✔
1769
                socket_,
4,308✔
1770
                *response_data_.http_response_serializer_,
1771
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
4,266✔
1772
                        if (!*cancelled) {
4,266✔
1773
                                WriteBodyHandler(ec, num_written);
4,266✔
1774
                        }
1775
                });
4,266✔
1776
}
4,308✔
1777

1778
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,266✔
1779
        if (num_written > 0) {
4,266✔
1780
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
4,160✔
1781
        }
1782

1783
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,266✔
1784
                // Write next body block.
1785
                PrepareAndWriteNewBodyBuffer();
2,062✔
1786
        } else if (ec) {
2,204✔
1787
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1788
        } else if (num_written > 0) {
2,200✔
1789
                // We are still writing the body.
1790
                WriteBody();
2,080✔
1791
        } else {
1792
                // We are finished.
1793
                FinishReply();
120✔
1794
        }
1795
}
4,266✔
1796

1797
void Stream::FinishReply() {
156✔
1798
        // We are done.
1799
        status_ = TransactionStatus::Done;
156✔
1800
        DoCancel();
156✔
1801
        // Release ownership of Body reader.
1802
        response_->body_reader_.reset();
156✔
1803
        response_->async_body_reader_.reset();
156✔
1804
        reply_finished_handler_(error::NoError);
156✔
1805
        server_.RemoveStream(shared_from_this());
156✔
1806
}
156✔
1807

1808
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1809
        SetupResponse();
9✔
1810

1811
        switch_protocol_handler_ = handler;
9✔
1812
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1813

1814
        auto &cancelled = cancelled_;
1815
        auto &response_data = response_data_;
9✔
1816

1817
        http::async_write_header(
18✔
1818
                socket_,
9✔
1819
                *response_data_.http_response_serializer_,
1820
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1821
                        if (!*cancelled) {
9✔
1822
                                SwitchingProtocolHandler(ec, num_written);
8✔
1823
                        }
1824
                });
9✔
1825

1826
        return error::NoError;
9✔
1827
}
1828

1829
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1830
        if (num_written > 0) {
8✔
1831
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1832
        }
1833

1834
        if (ec) {
8✔
1835
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1836
                return;
×
1837
        }
1838

1839
        auto socket = make_shared<RawSocket<tcp::socket>>(
1840
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1841

1842
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1843

1844
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1845
        *cancelled_ = true;
8✔
1846
        cancelled_ = make_shared<bool>(true);
8✔
1847
        server_.RemoveStream(shared_from_this());
16✔
1848

1849
        switch_protocol_handler(socket);
16✔
1850
}
1851

1852
void Stream::CallBodyHandler() {
220✔
1853
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1854
        // it immediately destroys, which would destroy this stream as well. At the end of this
1855
        // function, it's ok to destroy it.
1856
        auto stream_ref = shared_from_this();
1857

1858
        server_.body_handler_(request_, error::NoError);
660✔
1859

1860
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1861
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1862
        // request has not been handled correctly.
1863
        auto response = maybe_response_.lock();
220✔
1864
        if (!response) {
220✔
1865
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1866
                *cancelled_ = true;
3✔
1867
                cancelled_ = make_shared<bool>(true);
3✔
1868
                server_.RemoveStream(shared_from_this());
9✔
1869
        }
1870
}
220✔
1871

1872
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
240✔
1873
        event_loop_ {event_loop},
1874
        acceptor_(GetAsioIoContext(event_loop_)) {
433✔
1875
}
240✔
1876

1877
Server::~Server() {
480✔
1878
        Cancel();
240✔
1879
}
240✔
1880

1881
error::Error Server::AsyncServeUrl(
205✔
1882
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1883
        return AsyncServeUrl(
1884
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
834✔
1885
                        if (err != error::NoError) {
215✔
1886
                                body_handler(expected::unexpected(err));
12✔
1887
                        } else {
1888
                                body_handler(req);
418✔
1889
                        }
1890
                });
625✔
1891
}
1892

1893
error::Error Server::AsyncServeUrl(
220✔
1894
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1895
        auto err = BreakDownUrl(url, address_);
220✔
1896
        if (error::NoError != err) {
220✔
1897
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1898
        }
1899

1900
        if (address_.protocol != "http") {
220✔
1901
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1902
        }
1903

1904
        if (address_.path.size() > 0 && address_.path != "/") {
220✔
1905
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1906
        }
1907

1908
        boost::system::error_code ec;
219✔
1909
        auto address = asio::ip::make_address(address_.host, ec);
219✔
1910
        if (ec) {
219✔
1911
                return error::Error(
1912
                        ec.default_error_condition(),
×
1913
                        "Could not construct endpoint from address " + address_.host);
×
1914
        }
1915

1916
        asio::ip::tcp::endpoint endpoint(address, address_.port);
219✔
1917

1918
        ec.clear();
1919
        acceptor_.open(endpoint.protocol(), ec);
219✔
1920
        if (ec) {
219✔
1921
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1922
        }
1923

1924
        // Allow address reuse, otherwise we can't re-bind later.
1925
        ec.clear();
1926
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
219✔
1927
        if (ec) {
219✔
1928
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1929
        }
1930

1931
        ec.clear();
1932
        acceptor_.bind(endpoint, ec);
219✔
1933
        if (ec) {
219✔
1934
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1935
        }
1936

1937
        ec.clear();
1938
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
219✔
1939
        if (ec) {
219✔
1940
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1941
        }
1942

1943
        header_handler_ = header_handler;
219✔
1944
        body_handler_ = body_handler;
219✔
1945

1946
        PrepareNewStream();
219✔
1947

1948
        return error::NoError;
219✔
1949
}
1950

1951
void Server::Cancel() {
260✔
1952
        if (acceptor_.is_open()) {
260✔
1953
                acceptor_.cancel();
219✔
1954
                acceptor_.close();
219✔
1955
        }
1956
        streams_.clear();
1957
}
260✔
1958

1959
uint16_t Server::GetPort() const {
17✔
1960
        return acceptor_.local_endpoint().port();
17✔
1961
}
1962

1963
string Server::GetUrl() const {
16✔
1964
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1965
}
1966

1967
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
219✔
1968
        if (*req->cancelled_) {
219✔
1969
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1970
        }
1971
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
438✔
1972
        req->stream_.maybe_response_ = response;
219✔
1973
        return response;
219✔
1974
}
1975

1976
error::Error Server::AsyncReply(
205✔
1977
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1978
        if (*resp->cancelled_) {
205✔
1979
                return MakeError(StreamCancelledError, "Cannot send response");
×
1980
        }
1981

1982
        resp->stream_.AsyncReply(reply_finished_handler);
205✔
1983
        return error::NoError;
205✔
1984
}
1985

1986
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
58✔
1987
        if (*req->cancelled_) {
58✔
1988
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1989
        }
1990

1991
        auto &stream = req->stream_;
58✔
1992
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
58✔
1993
                return expected::unexpected(error::Error(
1✔
1994
                        make_error_condition(errc::operation_in_progress),
2✔
1995
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1996
        }
1997

1998
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
57✔
1999
                && !stream.request_data_.http_request_parser_->chunked()) {
57✔
2000
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
2001
        }
2002

2003
        stream.status_ = TransactionStatus::ReaderCreated;
39✔
2004
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
78✔
2005
}
2006

2007
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
2008
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
2009
}
2010

2011
void Server::PrepareNewStream() {
447✔
2012
        StreamPtr new_stream {new Stream(*this)};
447✔
2013
        streams_.insert(new_stream);
2014
        AsyncAccept(new_stream);
894✔
2015
}
447✔
2016

2017
void Server::AsyncAccept(StreamPtr stream) {
447✔
2018
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
678✔
2019
                if (ec) {
231✔
2020
                        if (ec != errc::operation_canceled) {
3✔
2021
                                log::Error("Could not accept connection: " + ec.message());
×
2022
                        }
2023
                        return;
3✔
2024
                }
2025

2026
                stream->AcceptHandler(ec);
228✔
2027

2028
                this->PrepareNewStream();
228✔
2029
        });
2030
}
447✔
2031

2032
void Server::RemoveStream(StreamPtr stream) {
188✔
2033
        streams_.erase(stream);
188✔
2034

2035
        stream->DoCancel();
188✔
2036
}
188✔
2037

2038
} // namespace http
2039
} // namespace common
2040
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc