• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_301264

30 Jul 2024 07:11PM UTC coverage: 91.111% (+0.009%) from 91.102%
github_pull_request_301264

Pull #7936

Evergreen

web-flow
Add support for multi-process subscription state change notifications (#7862)

As with the other multi-process notifications, the core idea here is to
eliminate the in-memory state and produce notifications based entirely on the
current state of the Realm file.

SubscriptionStore::update_state() has been replaced with separate functions for
the specific legal state transitions, which also take a write transaction as a
parameter. These functions are called by PendingBootstrapStore inside the same
write transaction as the bootstrap updates which changed the subscription
state. This is both a minor performance optimization (due to fewer writes) and
eliminates a brief window between the two writes where the Realm file was in an
inconsistent state.

There's a minor functional change here: previously old subscription sets were
superseded when the new one reached the Completed state, and now they are
superseded on AwaitingMark. This aligns it with when the new subscription set
becomes the one which is returned by get_active().
Pull Request #7936: Fix connection callback crashes when reloading with React Native

102800 of 181570 branches covered (56.62%)

216840 of 237996 relevant lines covered (91.11%)

5918493.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.27
/src/realm/sync/network/http.hpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#pragma once
20

21
#include <cstdint>
22
#include <cstdlib>
23
#include <type_traits>
24
#include <map>
25
#include <system_error>
26
#include <iosfwd>
27
#include <locale>
28
#include <sstream>
29

30
#include <realm/util/optional.hpp>
31
#include <realm/util/basic_system_errors.hpp>
32
#include <realm/util/logger.hpp>
33
#include <realm/string_data.hpp>
34

35
namespace realm::sync {
36
enum class HTTPParserError {
37
    None = 0,
38
    ContentTooLong,
39
    HeaderLineTooLong,
40
    MalformedResponse,
41
    MalformedRequest,
42
    BadRequest,
43
};
44
std::error_code make_error_code(HTTPParserError);
45
} // namespace realm::sync
46

47
namespace std {
48
template <>
49
struct is_error_code_enum<realm::sync::HTTPParserError> : std::true_type {};
50
} // namespace std
51

52
namespace realm::sync {
53

54
/// See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
55
///
56
/// It is guaranteed that the backing integer value of this enum corresponds
57
/// to the numerical code representing the status.
58
enum class HTTPStatus {
59
    Unknown = 0,
60

61
    Continue = 100,
62
    SwitchingProtocols = 101,
63

64
    Ok = 200,
65
    Created = 201,
66
    Accepted = 202,
67
    NonAuthoritative = 203,
68
    NoContent = 204,
69
    ResetContent = 205,
70
    PartialContent = 206,
71

72
    MultipleChoices = 300,
73
    MovedPermanently = 301,
74
    Found = 302,
75
    SeeOther = 303,
76
    NotModified = 304,
77
    UseProxy = 305,
78
    SwitchProxy = 306,
79
    TemporaryRedirect = 307,
80
    PermanentRedirect = 308,
81

82
    BadRequest = 400,
83
    Unauthorized = 401,
84
    PaymentRequired = 402,
85
    Forbidden = 403,
86
    NotFound = 404,
87
    MethodNotAllowed = 405,
88
    NotAcceptable = 406,
89
    ProxyAuthenticationRequired = 407,
90
    RequestTimeout = 408,
91
    Conflict = 409,
92
    Gone = 410,
93
    LengthRequired = 411,
94
    PreconditionFailed = 412,
95
    PayloadTooLarge = 413,
96
    UriTooLong = 414,
97
    UnsupportedMediaType = 415,
98
    RangeNotSatisfiable = 416,
99
    ExpectationFailed = 417,
100
    ImATeapot = 418,
101
    MisdirectedRequest = 421,
102
    UpgradeRequired = 426,
103
    PreconditionRequired = 428,
104
    TooManyRequests = 429,
105
    RequestHeaderFieldsTooLarge = 431,
106
    UnavailableForLegalReasons = 451,
107

108
    InternalServerError = 500,
109
    NotImplemented = 501,
110
    BadGateway = 502,
111
    ServiceUnavailable = 503,
112
    GatewayTimeout = 504,
113
    HttpVersionNotSupported = 505,
114
    VariantAlsoNegotiates = 506,
115
    NotExtended = 510,
116
    NetworkAuthenticationRequired = 511,
117
};
118

119
bool valid_http_status_code(unsigned int code);
120

121
/// See: https://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html
122
enum class HTTPMethod {
123
    Options,
124
    Get,
125
    Head,
126
    Post,
127
    Put,
128
    Patch,
129
    Delete,
130
    Trace,
131
    Connect,
132
};
133

134
struct HTTPAuthorization {
135
    std::string scheme;
136
    std::map<std::string, std::string> values;
137
};
138

139
HTTPAuthorization parse_authorization(const std::string&);
140

141
class HeterogeneousCaseInsensitiveCompare {
142
public:
143
    using is_transparent = std::true_type;
144
    template <class A, class B>
145
    bool operator()(const A& a, const B& b) const noexcept
146
    {
472,640✔
147
        return comp(std::string_view(a), std::string_view(b));
472,640✔
148
    }
472,640✔
149

150
private:
151
    bool comp(std::string_view a, std::string_view b) const noexcept
152
    {
472,552✔
153
        auto cmp = [](char lhs, char rhs) {
4,174,780✔
154
            return std::tolower(lhs, std::locale::classic()) < std::tolower(rhs, std::locale::classic());
4,174,780✔
155
        };
4,174,780✔
156
        return std::lexicographical_compare(begin(a), end(a), begin(b), end(b), cmp);
472,552✔
157
    }
472,552✔
158
};
159

160
/// Case-insensitive map suitable for storing HTTP headers.
161
using HTTPHeaders = std::map<std::string, std::string, HeterogeneousCaseInsensitiveCompare>;
162

163
struct HTTPRequest {
164
    HTTPMethod method = HTTPMethod::Get;
165
    HTTPHeaders headers;
166
    std::string path;
167

168
    /// If the request object has a body, the Content-Length header MUST be
169
    /// set to a string representation of the number of bytes in the body.
170
    /// FIXME: Relax this restriction, and also support Transfer-Encoding
171
    /// and other HTTP/1.1 features.
172
    util::Optional<std::string> body;
173
};
174

175
struct HTTPResponse {
176
    HTTPStatus status = HTTPStatus::Unknown;
177
    std::string reason;
178
    HTTPHeaders headers;
179

180
    // A body is only read from the response stream if the server sent the
181
    // Content-Length header.
182
    // FIXME: Support other transfer methods, including Transfer-Encoding and
183
    // HTTP/1.1 features.
184
    util::Optional<std::string> body;
185
};
186

187

188
/// Serialize HTTP request to output stream.
189
std::ostream& operator<<(std::ostream&, const HTTPRequest&);
190
/// Serialize HTTP response to output stream.
191
std::ostream& operator<<(std::ostream&, const HTTPResponse&);
192
/// Serialize HTTP method to output stream ("GET", "POST", etc.).
193
std::ostream& operator<<(std::ostream&, HTTPMethod);
194
/// Serialize HTTP status to output stream, include reason string ("200 OK" etc.)
195
std::ostream& operator<<(std::ostream&, HTTPStatus);
196

197

198
struct HTTPParserBase {
199
    const std::shared_ptr<util::Logger> logger_ptr;
200
    util::Logger& network_logger;
201

202
    // FIXME: Generally useful?
203
    struct CallocDeleter {
204
        void operator()(void* ptr)
205
        {
7,728✔
206
            std::free(ptr);
7,728✔
207
        }
7,728✔
208
    };
209

210
    HTTPParserBase(const std::shared_ptr<util::Logger>& logger_ptr)
211
        : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::network, logger_ptr)}
3,908✔
212
        , network_logger{*logger_ptr}
3,908✔
213
    {
7,712✔
214
        // Allocating read buffer with calloc to avoid accidentally spilling
215
        // data from other sessions in case of a buffer overflow exploit.
216
        m_read_buffer.reset(static_cast<char*>(std::calloc(read_buffer_size, 1)));
7,712✔
217
    }
7,712✔
218
    virtual ~HTTPParserBase() {}
7,714✔
219

220
    std::string m_write_buffer;
221
    std::unique_ptr<char[], CallocDeleter> m_read_buffer;
222
    util::Optional<size_t> m_found_content_length;
223
    bool m_has_chunked_encoding = false;
224
    std::optional<std::stringstream> m_chunked_encoding_ss;
225
    static const size_t read_buffer_size = 8192;
226
    static const size_t max_header_line_length = read_buffer_size;
227

228
    /// Parses the contents of m_read_buffer as a HTTP header line,
229
    /// and calls on_header() as appropriate. on_header() will be called at
230
    /// most once per invocation.
231
    /// Returns false if the contents of m_read_buffer is not a valid HTTP
232
    /// header line.
233
    bool parse_header_line(size_t len);
234

235
    virtual std::error_code on_first_line(StringData line) = 0;
236
    virtual void on_header(StringData key, StringData value) = 0;
237
    virtual void on_body(StringData body) = 0;
238
    virtual void on_complete(std::error_code = std::error_code{}) = 0;
239

240
    /// If the input matches a known HTTP method string, return the appropriate
241
    /// HTTPMethod enum value. Otherwise, returns none.
242
    static util::Optional<HTTPMethod> parse_method_string(StringData method);
243

244
    /// Interpret line as the first line of an HTTP request. If the return value
245
    /// is true, out_method and out_uri have been assigned the appropriate
246
    /// values found in the request line.
247
    static bool parse_first_line_of_request(StringData line, HTTPMethod& out_method, StringData& out_uri);
248

249
    /// Interpret line as the first line of an HTTP response. If the return
250
    /// value is true, out_status and out_reason have been assigned the
251
    /// appropriate values found in the response line.
252
    static bool parse_first_line_of_response(StringData line, HTTPStatus& out_status, StringData& out_reason,
253
                                             util::Logger& logger);
254

255
    void set_write_buffer(const HTTPRequest&);
256
    void set_write_buffer(const HTTPResponse&);
257
};
258

259

260
template <class Socket>
261
struct HTTPParser : protected HTTPParserBase {
262
    explicit HTTPParser(Socket& socket, const std::shared_ptr<util::Logger>& logger_ptr)
263
        : HTTPParserBase(logger_ptr)
3,633✔
264
        , m_socket(socket)
3,633✔
265
    {
7,428✔
266
    }
7,428✔
267

268
    void read_first_line()
269
    {
6,197✔
270
        auto handler = [this](std::error_code ec, size_t n) {
6,197✔
271
            if (ec == util::error::operation_aborted) {
6,197✔
272
                return;
54✔
273
            }
54✔
274
            if (ec) {
6,143✔
275
                on_complete(ec);
12✔
276
                return;
12✔
277
            }
12✔
278
            ec = on_first_line(StringData(m_read_buffer.get(), n));
6,131✔
279
            if (ec) {
6,131✔
280
                on_complete(ec);
×
281
                return;
×
282
            }
×
283
            read_headers();
6,131✔
284
        };
6,131✔
285
        m_socket.async_read_until(m_read_buffer.get(), max_header_line_length, '\n', std::move(handler));
6,197✔
286
    }
6,197✔
287

288
    void read_headers()
289
    {
48,100✔
290
        auto handler = [this](std::error_code ec, size_t n) {
48,100✔
291
            if (ec == util::error::operation_aborted) {
48,100✔
292
                return;
×
293
            }
×
294
            if (ec) {
48,100✔
295
                on_complete(ec);
×
296
                return;
×
297
            }
×
298
            if (n <= 2) {
48,100✔
299
                read_body();
6,123✔
300
                return;
6,123✔
301
            }
6,123✔
302
            if (!parse_header_line(n)) {
41,977✔
303
                on_complete(HTTPParserError::BadRequest);
8✔
304
                return;
8✔
305
            }
8✔
306

307
            // FIXME: Limit the total size of headers. Apache uses 8K.
308
            read_headers();
41,969✔
309
        };
41,969✔
310
        m_socket.async_read_until(m_read_buffer.get(), max_header_line_length, '\n', std::move(handler));
48,100✔
311
    }
48,100✔
312

313
    void read_body()
314
    {
6,143✔
315
        if (m_found_content_length) {
6,143✔
316
            // FIXME: Support longer bodies.
317
            // FIXME: Support multipart and other body types (no body shaming).
318
            if (*m_found_content_length > read_buffer_size) {
12!
319
                on_complete(HTTPParserError::ContentTooLong);
×
320
                return;
×
321
            }
×
322

323
            auto handler = [this](std::error_code ec, size_t n) {
12✔
324
                if (ec == util::error::operation_aborted) {
12!
325
                    return;
×
326
                }
×
327
                if (!ec) {
12!
328
                    on_body(std::string_view(m_read_buffer.get(), n));
12✔
329
                }
12✔
330
                on_complete(ec);
12✔
331
            };
12✔
332
            m_socket.async_read(m_read_buffer.get(), *m_found_content_length, std::move(handler));
12✔
333
        }
12✔
334
        else if (m_has_chunked_encoding) {
6,131✔
335
            auto content_length_handler = [this](std::error_code ec, size_t chunk_start_index) {
20✔
336
                if (ec == util::error::operation_aborted) {
20!
337
                    on_complete(ec);
×
338
                    return;
×
339
                }
×
340

341
                auto content_length =
20✔
342
                    std::strtoul(std::string(m_read_buffer.get(), chunk_start_index - 2).c_str(), nullptr, 16);
20✔
343

344
                if (content_length == 0) {
20!
345
                    on_body(m_chunked_encoding_ss->str());
8✔
346
                    on_complete(ec);
8✔
347
                    return;
8✔
348
                }
8✔
349

350
                auto handler = [this](std::error_code ec, size_t n) {
12✔
351
                    if (ec == util::error::operation_aborted) {
12!
352
                        on_complete(ec);
×
353
                        return;
×
354
                    }
×
355
                    auto chunk_data = std::string_view(m_read_buffer.get(), n - 2); // -2 to strip \r\n
12✔
356
                    *m_chunked_encoding_ss << chunk_data;
12✔
357
                    read_body();
12✔
358
                };
12✔
359
                m_socket.async_read(m_read_buffer.get(), content_length + 2,
12✔
360
                                    std::move(handler)); // +2 to account for \r\n
12✔
361
            };
12✔
362

363
            // First get the content-length
364
            m_socket.async_read_until(m_read_buffer.get(), 8, '\n',
20✔
365
                                      content_length_handler); // buffer of 8 is enough to read hex value
20✔
366
        }
20✔
367
        else {
6,111✔
368
            // No body, just finish.
369
            on_complete();
6,111✔
370
        }
6,111✔
371
    }
6,143✔
372

373
    void write_buffer(util::UniqueFunction<void(std::error_code, size_t)> handler)
374
    {
6,185✔
375
        m_socket.async_write(m_write_buffer.data(), m_write_buffer.size(), std::move(handler));
6,185✔
376
    }
6,185✔
377

378
    Socket& m_socket;
379
};
380

381

382
template <class Socket>
383
struct HTTPClient : protected HTTPParser<Socket> {
384
    using Handler = void(HTTPResponse, std::error_code);
385

386
    explicit HTTPClient(Socket& socket, const std::shared_ptr<util::Logger>& logger_ptr)
387
        : HTTPParser<Socket>(socket, logger_ptr)
1,802✔
388
    {
3,784✔
389
    }
3,784✔
390

391
    /// Serialize and send \a request over the connected socket asynchronously.
392
    ///
393
    /// When the response has been received, or an error occurs, \a handler will
394
    /// be invoked with the appropriate parameters. The HTTPResponse object
395
    /// passed to \a handler will only be complete in non-error conditions, but
396
    /// may be partially populated.
397
    ///
398
    /// It is an error to start a request before the \a handler of a previous
399
    /// request has been invoked. It is permitted to call async_request() from
400
    /// the handler, unless an error has been reported representing a condition
401
    /// where the underlying socket is no longer able to communicate (for
402
    /// example, if it has been closed).
403
    ///
404
    /// If a request is already in progress, an exception will be thrown.
405
    ///
406
    /// This method is *NOT* thread-safe.
407
    void async_request(const HTTPRequest& request, util::UniqueFunction<Handler> handler)
408
    {
3,772✔
409
        if (REALM_UNLIKELY(m_handler)) {
3,772✔
410
            throw LogicError(ErrorCodes::LogicError, "Request already in progress.");
×
411
        }
×
412
        this->set_write_buffer(request);
3,772✔
413
        m_handler = std::move(handler);
3,772✔
414
        this->write_buffer([this](std::error_code ec, size_t bytes_written) {
3,772✔
415
            static_cast<void>(bytes_written);
3,772✔
416
            if (ec == util::error::operation_aborted) {
3,772✔
417
                return;
×
418
            }
×
419
            if (ec) {
3,772✔
420
                this->on_complete(ec);
×
421
                return;
×
422
            }
×
423
            this->read_first_line();
3,772✔
424
        });
3,772✔
425
    }
3,772✔
426

427
private:
428
    util::UniqueFunction<Handler> m_handler;
429
    HTTPResponse m_response;
430

431
    std::error_code on_first_line(StringData line) override final
432
    {
3,718✔
433
        HTTPStatus status;
3,718✔
434
        StringData reason;
3,718✔
435
        if (this->parse_first_line_of_response(line, status, reason, this->network_logger)) {
3,718✔
436
            m_response.status = status;
3,718✔
437
            m_response.reason = reason;
3,718✔
438
            return std::error_code{};
3,718✔
439
        }
3,718✔
440
        return HTTPParserError::MalformedResponse;
×
441
    }
3,718✔
442

443
    void on_header(StringData key, StringData value) override final
444
    {
26,550✔
445
        // FIXME: Multiple headers with the same key should show up as a
446
        // comma-separated list of their values, rather than overwriting.
447
        m_response.headers[std::string(key)] = std::string(value);
26,550✔
448
    }
26,550✔
449

450
    void on_body(StringData body) override final
451
    {
4✔
452
        m_response.body = std::string(body);
4✔
453
    }
4✔
454

455
    void on_complete(std::error_code ec) override final
456
    {
3,718✔
457
        auto handler = std::move(m_handler);
3,718✔
458
        m_handler = nullptr;
3,718✔
459
        handler(std::move(m_response), ec);
3,718✔
460
    }
3,718✔
461
};
462

463

464
template <class Socket>
465
struct HTTPServer : protected HTTPParser<Socket> {
466
    using RequestHandler = void(HTTPRequest, std::error_code);
467
    using RespondHandler = void(std::error_code);
468

469
    explicit HTTPServer(Socket& socket, const std::shared_ptr<util::Logger>& logger_ptr)
470
        : HTTPParser<Socket>(socket, logger_ptr)
1,827✔
471
    {
3,636✔
472
    }
3,636✔
473

474
    /// Receive a request on the underlying socket asynchronously.
475
    ///
476
    /// This function starts an asynchronous read operation and keeps reading
477
    /// until an HTTP request has been received. \a handler is invoked when a
478
    /// request has been received, or an error occurs.
479
    ///
480
    /// After a request is received, callers MUST invoke async_send_response()
481
    /// to provide the client with a valid HTTP response, unless the error
482
    /// passed to the handler represents a condition where the underlying socket
483
    /// is no longer able to communicate (for example, if it has been closed).
484
    ///
485
    /// It is an error to attempt to receive a request before any previous
486
    /// requests have been fully responded to, i.e. the \a handler argument of
487
    /// async_send_response() must have been invoked before attempting to
488
    /// receive the next request.
489
    ///
490
    /// This function is *NOT* thread-safe.
491
    void async_receive_request(util::UniqueFunction<RequestHandler> handler)
492
    {
2,425✔
493
        if (REALM_UNLIKELY(m_request_handler)) {
2,425✔
494
            throw LogicError(ErrorCodes::LogicError, "Request already in progress");
×
495
        }
×
496
        m_request_handler = std::move(handler);
2,425✔
497
        this->read_first_line();
2,425✔
498
    }
2,425✔
499

500
    /// Send an HTTP response to a client asynchronously.
501
    ///
502
    /// This function starts an asynchronous write operation on the underlying
503
    /// socket. \a handler is invoked when the response has been written to the
504
    /// socket, or an error occurs.
505
    ///
506
    /// It is an error to call async_receive_request() again before \a handler
507
    /// has been invoked, and it is an error to call async_send_response()
508
    /// before the \a handler of a previous invocation has been invoked.
509
    ///
510
    /// This function is *NOT* thread-safe.
511
    void async_send_response(const HTTPResponse& response, util::UniqueFunction<RespondHandler> handler)
512
    {
2,413✔
513
        if (REALM_UNLIKELY(!m_request_handler)) {
2,413✔
514
            throw LogicError(ErrorCodes::LogicError, "No request in progress");
×
515
        }
×
516
        if (m_respond_handler) {
2,413✔
517
            // FIXME: Proper exception type.
518
            throw LogicError(ErrorCodes::LogicError, "Already responding to request");
×
519
        }
×
520
        m_respond_handler = std::move(handler);
2,413✔
521
        this->set_write_buffer(response);
2,413✔
522
        this->write_buffer([this](std::error_code ec, size_t) {
2,413✔
523
            if (ec == util::error::operation_aborted) {
2,413✔
524
                return;
×
525
            }
×
526
            m_request_handler = nullptr;
2,413✔
527
            auto handler = std::move(m_respond_handler);
2,413✔
528
            handler(ec);
2,413✔
529
        });
2,413✔
530
        ;
2,413✔
531
    }
2,413✔
532

533
private:
534
    util::UniqueFunction<RequestHandler> m_request_handler;
535
    util::UniqueFunction<RespondHandler> m_respond_handler;
536
    HTTPRequest m_request;
537

538
    std::error_code on_first_line(StringData line) override final
539
    {
2,413✔
540
        HTTPMethod method;
2,413✔
541
        StringData uri;
2,413✔
542
        if (this->parse_first_line_of_request(line, method, uri)) {
2,413✔
543
            m_request.method = method;
2,413✔
544
            m_request.path = uri;
2,413✔
545
            return std::error_code{};
2,413✔
546
        }
2,413✔
547
        return HTTPParserError::MalformedRequest;
×
548
    }
2,413✔
549

550
    void on_header(StringData key, StringData value) override final
551
    {
15,419✔
552
        // FIXME: Multiple headers with the same key should show up as a
553
        // comma-separated list of their values, rather than overwriting.
554
        m_request.headers[std::string(key)] = std::string(value);
15,419✔
555
    }
15,419✔
556

557
    void on_body(StringData body) override final
558
    {
8✔
559
        m_request.body = std::string(body);
8✔
560
    }
8✔
561

562
    void on_complete(std::error_code ec) override final
563
    {
2,425✔
564
        // Deliberately not nullifying m_request_handler so that we can
565
        // check for invariants in async_send_response.
566
        m_request_handler(std::move(m_request), ec);
2,425✔
567
    }
2,425✔
568
};
569

570
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc