• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2075

27 Feb 2024 04:12PM UTC coverage: 90.97% (+0.05%) from 90.925%
2075

push

Evergreen

web-flow
Eliminate copies when accessing values from Bson types (#7377)

Returning things by value performs a deep copy, which is very expensive when
those things are also bson containers.

Re-align the naming with the convention names for the functions rather than
being weird and different.

93914 of 173104 branches covered (54.25%)

82 of 82 new or added lines in 9 files covered. (100.0%)

66 existing lines in 16 files now uncovered.

238508 of 262184 relevant lines covered (90.97%)

5724419.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.63
/src/realm/sync/noinst/protocol_codec.hpp
1
#ifndef REALM_NOINST_PROTOCOL_CODEC_HPP
2
#define REALM_NOINST_PROTOCOL_CODEC_HPP
3

4
#include <cstdint>
5
#include <algorithm>
6
#include <memory>
7
#include <vector>
8
#include <string>
9

10
#include <realm/util/buffer_stream.hpp>
11
#include <realm/util/compression.hpp>
12
#include <realm/util/from_chars.hpp>
13
#include <realm/util/logger.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/optional.hpp>
16
#include <realm/binary_data.hpp>
17
#include <realm/chunked_binary.hpp>
18
#include <realm/sync/changeset_parser.hpp>
19
#include <realm/sync/history.hpp>
20
#include <realm/sync/impl/clamped_hex_dump.hpp>
21
#include <realm/sync/noinst/integer_codec.hpp>
22
#include <realm/sync/protocol.hpp>
23
#include <realm/sync/transform.hpp>
24

25
#include <external/json/json.hpp>
26

27
namespace realm::_impl {
28
struct ProtocolCodecException : public std::runtime_error {
29
    using std::runtime_error::runtime_error;
30
};
31
class HeaderLineParser {
32
public:
33
    explicit HeaderLineParser(std::string_view line)
34
        : m_sv(line)
35
    {
150,770✔
36
    }
150,770✔
37

38
    template <typename T>
39
    T read_next(char expected_terminator = ' ')
40
    {
1,534,694✔
41
        const auto [tok, rest] = peek_token_impl<T>();
1,534,694✔
42
        if (rest.empty()) {
1,534,694✔
43
            throw ProtocolCodecException("header line ended prematurely without terminator");
×
44
        }
×
45
        if (rest.front() != expected_terminator) {
1,534,694✔
46
            throw ProtocolCodecException(util::format(
×
47
                "expected to find delimeter '%1' in header line, but found '%2'", expected_terminator, rest.front()));
×
48
        }
×
49
        m_sv = rest.substr(1);
1,534,694✔
50
        return tok;
1,534,694✔
51
    }
1,534,694✔
52

53
    template <typename T>
54
    T read_sized_data(size_t size)
55
    {
101,276✔
56
        auto ret = m_sv;
101,276✔
57
        advance(size);
101,276✔
58
        return T(ret.data(), size);
101,276✔
59
    }
101,276✔
60

61
    size_t bytes_remaining() const noexcept
62
    {
81,278✔
63
        return m_sv.size();
81,278✔
64
    }
81,278✔
65

66
    std::string_view remaining() const noexcept
67
    {
93,480✔
68
        return m_sv;
93,480✔
69
    }
93,480✔
70

71
    bool at_end() const noexcept
72
    {
1,774,816✔
73
        return m_sv.empty();
1,774,816✔
74
    }
1,774,816✔
75

76
    void advance(size_t size)
77
    {
101,260✔
78
        if (size > m_sv.size()) {
101,260✔
79
            throw ProtocolCodecException(
×
80
                util::format("cannot advance header by %1 characters, only %2 characters left", size, m_sv.size()));
×
81
        }
×
82
        m_sv.remove_prefix(size);
101,260✔
83
    }
101,260✔
84

85
private:
86
    template <typename T>
87
    std::pair<T, std::string_view> peek_token_impl() const
88
    {
1,534,606✔
89
        // We currently only support numeric, string, and boolean values in header lines.
792,744✔
90
        static_assert(std::is_integral_v<T> || is_any_v<T, std::string_view, std::string>);
1,534,606✔
91
        if (at_end()) {
1,534,606✔
92
            throw ProtocolCodecException("reached end of header line prematurely");
×
93
        }
×
94
        if constexpr (is_any_v<T, std::string_view, std::string>) {
1,534,606✔
95
            // Currently all string fields in wire protocol header lines appear at the beginning of the line and
719,438✔
96
            // should be delimited by a space.
719,438✔
97
            auto delim_at = m_sv.find(' ');
1,392,696✔
98
            if (delim_at == std::string_view::npos) {
788,030✔
99
                throw ProtocolCodecException("reached end of header line prematurely");
×
100
            }
×
101

73,290✔
102
            return {m_sv.substr(0, delim_at), m_sv.substr(delim_at)};
141,882✔
103
        }
141,882✔
104
        else if constexpr (std::is_integral_v<T> && !std::is_same_v<T, bool>) {
1,392,696✔
105
            T cur_arg = {};
101,430✔
106
            auto parse_res = util::from_chars(m_sv.data(), m_sv.data() + m_sv.size(), cur_arg, 10);
101,430✔
107
            if (parse_res.ec != std::errc{}) {
1,291,110✔
108
                throw ProtocolCodecException(util::format("error parsing integer in header line: %1",
×
109
                                                          std::make_error_code(parse_res.ec).message()));
×
110
            }
×
111

666,100✔
112
            return {cur_arg, m_sv.substr(parse_res.ptr - m_sv.data())};
1,291,110✔
113
        }
1,291,110✔
114
        else if constexpr (std::is_same_v<T, bool>) {
101,440✔
115
            int cur_arg;
101,436✔
116
            auto parse_res = util::from_chars(m_sv.data(), m_sv.data() + m_sv.size(), cur_arg, 10);
101,436✔
117
            if (parse_res.ec != std::errc{}) {
101,436✔
118
                throw ProtocolCodecException(util::format("error parsing boolean in header line: %1",
×
119
                                                          std::make_error_code(parse_res.ec).message()));
×
120
            }
×
121

53,192✔
122
            return {(cur_arg != 0), m_sv.substr(parse_res.ptr - m_sv.data())};
101,436✔
123
        }
101,436✔
124
    }
1,534,606✔
125

126
    std::string_view m_sv;
127
};
128

129
class ClientProtocol {
130
public:
131
    // clang-format off
132
    using file_ident_type    = sync::file_ident_type;
133
    using version_type       = sync::version_type;
134
    using salt_type          = sync::salt_type;
135
    using timestamp_type     = sync::timestamp_type;
136
    using session_ident_type = sync::session_ident_type;
137
    using request_ident_type = sync::request_ident_type;
138
    using milliseconds_type  = sync::milliseconds_type;
139
    using SaltedFileIdent    = sync::SaltedFileIdent;
140
    using SaltedVersion      = sync::SaltedVersion;
141
    using DownloadCursor     = sync::DownloadCursor;
142
    using UploadCursor       = sync::UploadCursor;
143
    using SyncProgress       = sync::SyncProgress;
144
    // clang-format on
145

146
    using OutputBuffer = util::ResettableExpandableBufferOutputStream;
147
    using RemoteChangeset = sync::RemoteChangeset;
148
    using ReceivedChangesets = std::vector<RemoteChangeset>;
149

150
    /// Messages sent by the client.
151

152
    void make_pbs_bind_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
153
                               const std::string& server_path, const std::string& signed_user_token,
154
                               bool need_client_file_ident, bool is_subserver);
155

156
    void make_flx_bind_message(int protocol_version, OutputBuffer& out, session_ident_type session_ident,
157
                               const nlohmann::json& json_data, const std::string& signed_user_token,
158
                               bool need_client_file_ident, bool is_subserver);
159

160
    void make_pbs_ident_message(OutputBuffer&, session_ident_type session_ident, SaltedFileIdent client_file_ident,
161
                                const SyncProgress& progress);
162

163
    void make_flx_ident_message(OutputBuffer&, session_ident_type session_ident, SaltedFileIdent client_file_ident,
164
                                const SyncProgress& progress, int64_t query_version, std::string_view query_body);
165

166
    void make_query_change_message(OutputBuffer&, session_ident_type, int64_t version, std::string_view query_body);
167

168
    void make_json_error_message(OutputBuffer&, session_ident_type, int error_code, std::string_view error_body);
169

170
    void make_test_command_message(OutputBuffer&, session_ident_type session, request_ident_type request_ident,
171
                                   std::string_view body);
172

173
    class UploadMessageBuilder {
174
    public:
175
        UploadMessageBuilder(OutputBuffer& body_buffer, std::vector<char>& compression_buffer,
176
                             util::compression::CompressMemoryArena& compress_memory_arena);
177

178
        void add_changeset(version_type client_version, version_type server_version, timestamp_type origin_timestamp,
179
                           file_ident_type origin_file_ident, ChunkedBinaryData changeset);
180

181
        void make_upload_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
182
                                 version_type progress_client_version, version_type progress_server_version,
183
                                 version_type locked_server_version);
184

185
    private:
186
        std::size_t m_num_changesets = 0;
187
        OutputBuffer& m_body_buffer;
188
        std::vector<char>& m_compression_buffer;
189
        util::compression::CompressMemoryArena& m_compress_memory_arena;
190
    };
191

192
    UploadMessageBuilder make_upload_message_builder();
193

194
    void make_unbind_message(OutputBuffer&, session_ident_type session_ident);
195

196
    void make_mark_message(OutputBuffer&, session_ident_type session_ident, request_ident_type request_ident);
197

198
    void make_ping(OutputBuffer&, milliseconds_type timestamp, milliseconds_type rtt);
199

200
    std::string compressed_hex_dump(BinaryData blob);
201

202
    // Messages received by the client.
203

204
    // parse_message_received takes a (WebSocket) message and parses it.
205
    // The result of the parsing is handled by an object of type Connection.
206
    // Typically, Connection would be the Connection class from client.cpp
207
    template <class Connection>
208
    void parse_message_received(Connection& connection, std::string_view msg_data)
209
    {
74,446✔
210
        util::Logger& logger = connection.logger;
74,446✔
211
        auto report_error = [&](const auto fmt, auto&&... args) {
39,044✔
212
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
213
            connection.handle_protocol_error(Status{ErrorCodes::SyncProtocolInvariantFailed, std::move(msg)});
×
214
        };
×
215

39,044✔
216
        HeaderLineParser msg(msg_data);
74,446✔
217
        std::string_view message_type;
74,446✔
218
        try {
74,446✔
219
            message_type = msg.read_next<std::string_view>();
74,446✔
220
        }
74,446✔
221
        catch (const ProtocolCodecException& e) {
39,044✔
222
            return report_error("Could not find message type in message: %1", e.what());
×
223
        }
×
224

39,048✔
225
        try {
74,450✔
226
            if (message_type == "download") {
74,450✔
227
                parse_download_message(connection, msg);
44,644✔
228
            }
44,644✔
229
            else if (message_type == "pong") {
29,806✔
230
                auto timestamp = msg.read_next<milliseconds_type>('\n');
184✔
231
                connection.receive_pong(timestamp);
184✔
232
            }
184✔
233
            else if (message_type == "unbound") {
29,622✔
234
                auto session_ident = msg.read_next<session_ident_type>('\n');
3,642✔
235
                connection.receive_unbound_message(session_ident); // Throws
3,642✔
236
            }
3,642✔
237
            else if (message_type == "error") {
25,980✔
238
                auto error_code = msg.read_next<int>();
86✔
239
                auto message_size = msg.read_next<size_t>();
86✔
240
                auto is_fatal = sync::IsFatal{!msg.read_next<bool>()};
86✔
241
                auto session_ident = msg.read_next<session_ident_type>('\n');
86✔
242
                auto message = msg.read_sized_data<StringData>(message_size);
86✔
243

42✔
244
                connection.receive_error_message(sync::ProtocolErrorInfo{error_code, message, is_fatal},
86✔
245
                                                 session_ident); // Throws
86✔
246
            }
86✔
247
            else if (message_type == "log_message") { // introduced in protocol version 10
25,894✔
248
                parse_log_message(connection, msg);
5,490✔
249
            }
5,490✔
250
            else if (message_type == "json_error") { // introduced in protocol 4
20,404✔
251
                sync::ProtocolErrorInfo info{};
676✔
252
                info.raw_error_code = msg.read_next<int>();
676✔
253
                auto message_size = msg.read_next<size_t>();
676✔
254
                auto session_ident = msg.read_next<session_ident_type>('\n');
676✔
255
                auto json_raw = msg.read_sized_data<std::string_view>(message_size);
676✔
256
                try {
676✔
257
                    auto json = nlohmann::json::parse(json_raw);
676✔
258
                    logger.trace(util::LogCategory::session, "Error message encoded as json: %1", json_raw);
676✔
259
                    info.client_reset_recovery_is_disabled = json["isRecoveryModeDisabled"];
676✔
260
                    info.is_fatal = sync::IsFatal{!json["tryAgain"]};
676✔
261
                    info.message = json["message"];
676✔
262
                    info.log_url = std::make_optional<std::string>(json["logURL"]);
676✔
263
                    info.should_client_reset = std::make_optional<bool>(json["shouldClientReset"]);
676✔
264
                    info.server_requests_action = string_to_action(json["action"]); // Throws
676✔
265

332✔
266
                    if (auto backoff_interval = json.find("backoffIntervalSec"); backoff_interval != json.end()) {
676✔
267
                        info.resumption_delay_interval.emplace();
540✔
268
                        info.resumption_delay_interval->resumption_delay_interval =
540✔
269
                            std::chrono::seconds{backoff_interval->get<int>()};
540✔
270
                        info.resumption_delay_interval->max_resumption_delay_interval =
540✔
271
                            std::chrono::seconds{json.at("backoffMaxDelaySec").get<int>()};
540✔
272
                        info.resumption_delay_interval->resumption_delay_backoff_multiplier =
540✔
273
                            json.at("backoffMultiplier").get<int>();
540✔
274
                    }
540✔
275

332✔
276
                    if (info.raw_error_code == static_cast<int>(sync::ProtocolError::migrate_to_flx)) {
676✔
277
                        auto query_string = json.find("partitionQuery");
36✔
278
                        if (query_string == json.end() || !query_string->is_string() ||
36✔
279
                            query_string->get<std::string_view>().empty()) {
36✔
280
                            return report_error(
×
281
                                "Missing/invalid partition query string in migrate to flexible sync error response");
×
282
                        }
×
283

18✔
284
                        info.migration_query_string.emplace(query_string->get<std::string_view>());
36✔
285
                    }
36✔
286

332✔
287
                    if (info.raw_error_code == static_cast<int>(sync::ProtocolError::schema_version_changed)) {
676✔
288
                        auto schema_version = json.find("previousSchemaVersion");
54✔
289
                        if (schema_version == json.end() || !schema_version->is_number_unsigned()) {
54✔
290
                            return report_error(
×
291
                                "Missing/invalid previous schema version in schema migration error response");
×
292
                        }
×
293

26✔
294
                        info.previous_schema_version.emplace(schema_version->get<uint64_t>());
54✔
295
                    }
54✔
296

332✔
297
                    if (auto rejected_updates = json.find("rejectedUpdates"); rejected_updates != json.end()) {
676✔
298
                        if (!rejected_updates->is_array()) {
52✔
299
                            return report_error(
×
300
                                "Compensating writes error list is not stored in an array as expected");
×
301
                        }
×
302

26✔
303
                        for (const auto& rejected_update : *rejected_updates) {
60✔
304
                            if (!rejected_update.is_object()) {
60✔
305
                                return report_error(
×
306
                                    "Compensating write error information is not stored in an object as expected");
×
307
                            }
×
308

30✔
309
                            sync::CompensatingWriteErrorInfo cwei;
60✔
310
                            cwei.reason = rejected_update["reason"];
60✔
311
                            cwei.object_name = rejected_update["table"];
60✔
312
                            std::string_view pk = rejected_update["pk"].get<std::string_view>();
60✔
313
                            cwei.primary_key = sync::parse_base64_encoded_primary_key(pk);
60✔
314
                            info.compensating_writes.push_back(std::move(cwei));
60✔
315
                        }
60✔
316

26✔
317
                        // Not provided when 'write_not_allowed' (230) error is received from the server.
26✔
318
                        if (auto server_version = json.find("compensatingWriteServerVersion");
52✔
319
                            server_version != json.end()) {
52✔
320
                            info.compensating_write_server_version =
48✔
321
                                std::make_optional<version_type>(server_version->get<int64_t>());
48✔
322
                        }
48✔
323
                        info.compensating_write_rejected_client_version =
52✔
324
                            json.at("rejectedClientVersion").get<int64_t>();
52✔
325
                    }
52✔
326
                }
676✔
327
                catch (const nlohmann::json::exception& e) {
332✔
328
                    // If any of the above json fields are not present, this is a fatal error
329
                    // however, additional optional fields may be added in the future.
330
                    return report_error("Failed to parse 'json_error' with error_code %1: '%2'", info.raw_error_code,
×
331
                                        e.what());
×
332
                }
×
333
                connection.receive_error_message(info, session_ident); // Throws
676✔
334
            }
676✔
335
            else if (message_type == "query_error") {
19,728✔
336
                auto error_code = msg.read_next<int>();
20✔
337
                auto message_size = msg.read_next<size_t>();
20✔
338
                auto session_ident = msg.read_next<session_ident_type>();
20✔
339
                auto query_version = msg.read_next<int64_t>('\n');
20✔
340

10✔
341
                auto message = msg.read_sized_data<std::string_view>(message_size);
20✔
342

10✔
343
                connection.receive_query_error_message(error_code, message, query_version, session_ident); // throws
20✔
344
            }
20✔
345
            else if (message_type == "mark") {
19,708✔
346
                auto session_ident = msg.read_next<session_ident_type>();
16,148✔
347
                auto request_ident = msg.read_next<request_ident_type>('\n');
16,148✔
348

7,990✔
349
                connection.receive_mark_message(session_ident, request_ident); // Throws
16,148✔
350
            }
16,148✔
351
            else if (message_type == "ident") {
3,560✔
352
                session_ident_type session_ident = msg.read_next<session_ident_type>();
3,508✔
353
                SaltedFileIdent client_file_ident;
3,508✔
354
                client_file_ident.ident = msg.read_next<file_ident_type>();
3,508✔
355
                client_file_ident.salt = msg.read_next<salt_type>('\n');
3,508✔
356

1,704✔
357
                connection.receive_ident_message(session_ident, client_file_ident); // Throws
3,508✔
358
            }
3,508✔
359
            else if (message_type == "test_command") {
52✔
360
                session_ident_type session_ident = msg.read_next<session_ident_type>();
52✔
361
                request_ident_type request_ident = msg.read_next<request_ident_type>();
52✔
362
                auto body_size = msg.read_next<size_t>('\n');
52✔
363
                auto body = msg.read_sized_data<std::string_view>(body_size);
52✔
364

26✔
365
                connection.receive_test_command_response(session_ident, request_ident, body);
52✔
366
            }
52✔
UNCOV
367
            else {
×
UNCOV
368
                return report_error("Unknown input message type '%1'", msg_data);
×
UNCOV
369
            }
×
370
        }
×
371
        catch (const ProtocolCodecException& e) {
×
372
            return report_error("Bad syntax in %1 message: %2", message_type, e.what());
×
373
        }
×
374
        if (!msg.at_end()) {
74,448✔
375
            return report_error("wire protocol message had leftover data after being parsed");
×
376
        }
×
377
    }
74,448✔
378

379
private:
380
    template <typename Connection>
381
    void parse_download_message(Connection& connection, HeaderLineParser& msg)
382
    {
44,644✔
383
        util::Logger& logger = connection.logger;
44,644✔
384
        auto report_error = [&](ErrorCodes::Error code, const auto fmt, auto&&... args) {
24,156✔
385
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
386
            connection.handle_protocol_error(Status{code, std::move(msg)});
×
387
        };
×
388

24,156✔
389
        auto msg_with_header = msg.remaining();
44,644✔
390
        auto session_ident = msg.read_next<session_ident_type>();
44,644✔
391
        SyncProgress progress;
44,644✔
392
        progress.download.server_version = msg.read_next<version_type>();
44,644✔
393
        progress.download.last_integrated_client_version = msg.read_next<version_type>();
44,644✔
394
        progress.latest_server_version.version = msg.read_next<version_type>();
44,644✔
395
        progress.latest_server_version.salt = msg.read_next<salt_type>();
44,644✔
396
        progress.upload.client_version = msg.read_next<version_type>();
44,644✔
397
        progress.upload.last_integrated_server_version = msg.read_next<version_type>();
44,644✔
398
        auto query_version = connection.is_flx_sync_connection() ? msg.read_next<int64_t>() : 0;
43,138✔
399

24,156✔
400
        // If this is a PBS connection, then every download message is its own complete batch.
24,156✔
401
        auto last_in_batch = connection.is_flx_sync_connection() ? msg.read_next<bool>() : true;
43,138✔
402
        auto downloadable_bytes = msg.read_next<int64_t>();
44,644✔
403
        auto is_body_compressed = msg.read_next<bool>();
44,644✔
404
        auto uncompressed_body_size = msg.read_next<size_t>();
44,644✔
405
        auto compressed_body_size = msg.read_next<size_t>('\n');
44,644✔
406

24,156✔
407
        if (uncompressed_body_size > s_max_body_size) {
44,644✔
408
            auto header = msg_with_header.substr(0, msg_with_header.size() - msg.remaining().size());
×
409
            return report_error(ErrorCodes::LimitExceeded, "Limits exceeded in input message '%1'", header);
×
410
        }
×
411

24,156✔
412
        std::unique_ptr<char[]> uncompressed_body_buffer;
44,644✔
413
        // if is_body_compressed == true, we must decompress the received body.
24,156✔
414
        if (is_body_compressed) {
44,644✔
415
            uncompressed_body_buffer = std::make_unique<char[]>(uncompressed_body_size);
4,398✔
416
            std::error_code ec =
4,398✔
417
                util::compression::decompress({msg.remaining().data(), compressed_body_size},
4,398✔
418
                                              {uncompressed_body_buffer.get(), uncompressed_body_size});
4,398✔
419

2,184✔
420
            if (ec) {
4,398✔
421
                return report_error(ErrorCodes::RuntimeError, "compression::inflate: %1", ec.message());
×
422
            }
×
423

2,184✔
424
            msg = HeaderLineParser(std::string_view(uncompressed_body_buffer.get(), uncompressed_body_size));
4,398✔
425
        }
4,398✔
426

24,156✔
427
        logger.debug(util::LogCategory::changeset,
44,644✔
428
                     "Download message compression: session_ident=%1, is_body_compressed=%2, "
44,644✔
429
                     "compressed_body_size=%3, uncompressed_body_size=%4",
44,644✔
430
                     session_ident, is_body_compressed, compressed_body_size, uncompressed_body_size);
44,644✔
431

24,156✔
432
        ReceivedChangesets received_changesets;
44,644✔
433

24,156✔
434
        // Loop through the body and find the changesets.
24,156✔
435
        while (!msg.at_end()) {
89,426✔
436
            RemoteChangeset cur_changeset;
44,782✔
437
            cur_changeset.remote_version = msg.read_next<version_type>();
44,782✔
438
            cur_changeset.last_integrated_local_version = msg.read_next<version_type>();
44,782✔
439
            cur_changeset.origin_timestamp = msg.read_next<timestamp_type>();
44,782✔
440
            cur_changeset.origin_file_ident = msg.read_next<file_ident_type>();
44,782✔
441
            cur_changeset.original_changeset_size = msg.read_next<size_t>();
44,782✔
442
            auto changeset_size = msg.read_next<size_t>();
44,782✔
443

23,182✔
444
            if (changeset_size > msg.bytes_remaining()) {
44,782✔
445
                return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Bad changeset size %1 > %2",
×
446
                                    changeset_size, msg.bytes_remaining());
×
447
            }
×
448
            if (cur_changeset.remote_version == 0) {
44,782✔
449
                return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
450
                                    "Server version in downloaded changeset cannot be zero");
×
451
            }
×
452
            auto changeset_data = msg.read_sized_data<BinaryData>(changeset_size);
44,782✔
453
            logger.debug(util::LogCategory::changeset,
44,782✔
454
                         "Received: DOWNLOAD CHANGESET(session_ident=%1, server_version=%2, "
44,782✔
455
                         "client_version=%3, origin_timestamp=%4, origin_file_ident=%5, "
44,782✔
456
                         "original_changeset_size=%6, changeset_size=%7)",
44,782✔
457
                         session_ident, cur_changeset.remote_version, cur_changeset.last_integrated_local_version,
44,782✔
458
                         cur_changeset.origin_timestamp, cur_changeset.origin_file_ident,
44,782✔
459
                         cur_changeset.original_changeset_size, changeset_size); // Throws
44,782✔
460
            if (logger.would_log(util::LogCategory::changeset, util::Logger::Level::trace)) {
44,782✔
461
                if (changeset_data.size() < 1056) {
×
462
                    logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
463
                                 clamped_hex_dump(changeset_data)); // Throws
×
464
                }
×
465
                else {
×
466
                    logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
467
                                 compressed_hex_dump(changeset_data)); // Throws
×
468
                }
×
469
#if REALM_DEBUG
×
470
                ChunkedBinaryInputStream in{changeset_data};
×
471
                sync::Changeset log;
×
472
                sync::parse_changeset(in, log);
×
473
                std::stringstream ss;
×
474
                log.print(ss);
×
475
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
476
#endif
×
477
            }
×
478

23,182✔
479
            cur_changeset.data = changeset_data;
44,782✔
480
            received_changesets.push_back(std::move(cur_changeset)); // Throws
44,782✔
481
        }
44,782✔
482

24,156✔
483
        auto batch_state =
44,644✔
484
            last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;
44,554✔
485
        connection.receive_download_message(session_ident, progress, downloadable_bytes, query_version, batch_state,
44,644✔
486
                                            received_changesets); // Throws
44,644✔
487
    }
44,644✔
488

489
    static sync::ProtocolErrorInfo::Action string_to_action(const std::string& action_string)
490
    {
676✔
491
        using action = sync::ProtocolErrorInfo::Action;
676✔
492
        static const std::unordered_map<std::string, action> mapping{
676✔
493
            {"ProtocolViolation", action::ProtocolViolation},
676✔
494
            {"ApplicationBug", action::ApplicationBug},
676✔
495
            {"Warning", action::Warning},
676✔
496
            {"Transient", action::Transient},
676✔
497
            {"DeleteRealm", action::DeleteRealm},
676✔
498
            {"ClientReset", action::ClientReset},
676✔
499
            {"ClientResetNoRecovery", action::ClientResetNoRecovery},
676✔
500
            {"MigrateToFLX", action::MigrateToFLX},
676✔
501
            {"RevertToPBS", action::RevertToPBS},
676✔
502
            {"RefreshUser", action::RefreshUser},
676✔
503
            {"RefreshLocation", action::RefreshLocation},
676✔
504
            {"LogOutUser", action::LogOutUser},
676✔
505
            {"MigrateSchema", action::MigrateSchema},
676✔
506
        };
676✔
507

332✔
508
        if (auto action_it = mapping.find(action_string); action_it != mapping.end()) {
676✔
509
            return action_it->second;
668✔
510
        }
668✔
511
        return action::ApplicationBug;
8✔
512
    }
8✔
513

514
    template <typename Connection>
515
    void parse_log_message(Connection& connection, HeaderLineParser& msg)
516
    {
5,490✔
517
        auto report_error = [&](const auto fmt, auto&&... args) {
2,750✔
518
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
519
            connection.handle_protocol_error(Status{ErrorCodes::SyncProtocolInvariantFailed, std::move(msg)});
×
520
        };
×
521

2,750✔
522
        auto session_ident = msg.read_next<session_ident_type>();
5,490✔
523
        auto message_length = msg.read_next<size_t>('\n');
5,490✔
524
        auto message_body_str = msg.read_sized_data<std::string_view>(message_length);
5,490✔
525
        nlohmann::json message_body;
5,490✔
526
        try {
5,490✔
527
            message_body = nlohmann::json::parse(message_body_str);
5,490✔
528
        }
5,490✔
529
        catch (const nlohmann::json::exception& e) {
2,750✔
530
            return report_error("Malformed json in log_message message: \"%1\": %2", message_body_str, e.what());
×
531
        }
×
532
        static const std::unordered_map<std::string_view, util::Logger::Level> name_to_level = {
5,488✔
533
            {"fatal", util::Logger::Level::fatal},   {"error", util::Logger::Level::error},
5,488✔
534
            {"warn", util::Logger::Level::warn},     {"info", util::Logger::Level::info},
5,488✔
535
            {"detail", util::Logger::Level::detail}, {"debug", util::Logger::Level::debug},
5,488✔
536
            {"trace", util::Logger::Level::trace},
5,488✔
537
        };
5,488✔
538

2,748✔
539
        // See if the log_message contains the appservices_request_id
2,748✔
540
        if (auto it = message_body.find("co_id"); it != message_body.end() && it->is_string()) {
5,488✔
541
            connection.receive_appservices_request_id(it->get<std::string_view>());
1,746✔
542
        }
1,746✔
543

2,748✔
544
        std::string_view log_level;
5,488✔
545
        bool has_level = false;
5,488✔
546
        if (auto it = message_body.find("level"); it != message_body.end() && it->is_string()) {
5,490✔
547
            log_level = it->get<std::string_view>();
5,488✔
548
            has_level = !log_level.empty();
5,488✔
549
        }
5,488✔
550

2,748✔
551
        std::string_view msg_text;
5,488✔
552
        if (auto it = message_body.find("msg"); it != message_body.end() && it->is_string()) {
5,490✔
553
            msg_text = it->get<std::string_view>();
5,490✔
554
        }
5,490✔
555

2,748✔
556
        // If there is no message text, then we're done
2,748✔
557
        if (msg_text.empty()) {
5,488✔
558
            return;
×
559
        }
×
560

2,748✔
561
        // If a log level wasn't provided, default to debug
2,748✔
562
        util::Logger::Level parsed_level = util::Logger::Level::debug;
5,488✔
563
        if (has_level) {
5,490✔
564
            if (auto it = name_to_level.find(log_level); it != name_to_level.end()) {
5,490✔
565
                parsed_level = it->second;
5,488✔
566
            }
5,488✔
567
            else {
2✔
568
                return report_error("Unknown log level found in log_message: \"%1\"", log_level);
2✔
569
            }
2✔
570
        }
5,486✔
571
        connection.receive_server_log_message(session_ident, parsed_level, msg_text);
5,486✔
572
    }
5,486✔
573

574
    static constexpr std::size_t s_max_body_size = std::numeric_limits<std::size_t>::max();
575

576
    // Permanent buffer to use for building messages.
577
    OutputBuffer m_output_buffer;
578

579
    // Permanent buffers to use for internal purposes such as compression.
580
    std::vector<char> m_buffer;
581

582
    util::compression::CompressMemoryArena m_compress_memory_arena;
583
};
584

585

586
class ServerProtocol {
587
public:
588
    // clang-format off
589
    using file_ident_type    = sync::file_ident_type;
590
    using version_type       = sync::version_type;
591
    using salt_type          = sync::salt_type;
592
    using timestamp_type     = sync::timestamp_type;
593
    using session_ident_type = sync::session_ident_type;
594
    using request_ident_type = sync::request_ident_type;
595
    using SaltedFileIdent    = sync::SaltedFileIdent;
596
    using SaltedVersion      = sync::SaltedVersion;
597
    using milliseconds_type  = sync::milliseconds_type;
598
    using UploadCursor       = sync::UploadCursor;
599
    // clang-format on
600

601
    using OutputBuffer = util::ResettableExpandableBufferOutputStream;
602

603
    // Messages sent by the server to the client
604

605
    void make_ident_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
606
                            file_ident_type client_file_ident, salt_type client_file_ident_salt);
607

608
    void make_alloc_message(OutputBuffer&, session_ident_type session_ident, file_ident_type file_ident);
609

610
    void make_unbound_message(OutputBuffer&, session_ident_type session_ident);
611

612

613
    struct ChangesetInfo {
614
        version_type server_version;
615
        version_type client_version;
616
        sync::HistoryEntry entry;
617
        std::size_t original_size;
618
    };
619

620
    void make_download_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
621
                               version_type download_server_version, version_type download_client_version,
622
                               version_type latest_server_version, salt_type latest_server_version_salt,
623
                               version_type upload_client_version, version_type upload_server_version,
624
                               std::uint_fast64_t downloadable_bytes, std::size_t num_changesets, const char* body,
625
                               std::size_t uncompressed_body_size, std::size_t compressed_body_size,
626
                               bool body_is_compressed, util::Logger&);
627

628
    void make_mark_message(OutputBuffer&, session_ident_type session_ident, request_ident_type request_ident);
629

630
    void make_error_message(int protocol_version, OutputBuffer&, sync::ProtocolError error_code, const char* message,
631
                            std::size_t message_size, bool try_again, session_ident_type session_ident);
632

633
    void make_pong(OutputBuffer&, milliseconds_type timestamp);
634

635
    void make_log_message(OutputBuffer& out, util::Logger::Level level, std::string message,
636
                          session_ident_type sess_id = 0, std::optional<std::string> co_id = std::nullopt);
637

638
    // Messages received by the server.
639

640
    // parse_ping_received takes a (WebSocket) ping and parses it.
641
    // The result of the parsing is handled by an object of type Connection.
642
    // Typically, Connection would be the Connection class from server.cpp
643
    template <typename Connection>
644
    void parse_ping_received(Connection& connection, std::string_view msg_data)
645
    {
×
646
        try {
×
647
            HeaderLineParser msg(msg_data);
×
648
            auto timestamp = msg.read_next<milliseconds_type>();
×
649
            auto rtt = msg.read_next<milliseconds_type>('\n');
×
650

651
            connection.receive_ping(timestamp, rtt);
×
652
        }
×
653
        catch (const ProtocolCodecException& e) {
×
654
            connection.handle_protocol_error(Status{ErrorCodes::SyncProtocolInvariantFailed,
×
655
                                                    util::format("Bad syntax in PING message: %1", e.what())});
×
656
        }
×
657
    }
×
658

659
    // UploadChangeset is used to store received changesets in
660
    // the UPLOAD message.
661
    struct UploadChangeset {
662
        UploadCursor upload_cursor;
663
        timestamp_type origin_timestamp;
664
        file_ident_type origin_file_ident; // Zero when originating from connected client file
665
        BinaryData changeset;
666
    };
667

668
    // parse_message_received takes a (WebSocket) message and parses it.
669
    // The result of the parsing is handled by an object of type Connection.
670
    // Typically, Connection would be the Connection class from server.cpp
671
    template <class Connection>
672
    void parse_message_received(Connection& connection, std::string_view msg_data)
673
    {
67,478✔
674
        auto& logger = connection.logger;
67,478✔
675

34,274✔
676
        auto report_error = [&](ErrorCodes::Error err, const auto fmt, auto&&... args) {
34,274✔
677
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
678
            connection.handle_protocol_error(Status{err, std::move(msg)});
×
679
        };
×
680

34,274✔
681
        HeaderLineParser msg(msg_data);
67,478✔
682
        std::string_view message_type;
67,478✔
683
        try {
67,478✔
684
            message_type = msg.read_next<std::string_view>();
67,478✔
685
        }
67,478✔
686
        catch (const ProtocolCodecException& e) {
34,274✔
687
            return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Could not find message type in message: %1",
×
688
                                e.what());
×
689
        }
×
690

34,290✔
691
        try {
67,494✔
692
            if (message_type == "upload") {
67,494✔
693
                auto msg_with_header = msg.remaining();
44,500✔
694
                auto session_ident = msg.read_next<session_ident_type>();
44,500✔
695
                auto is_body_compressed = msg.read_next<bool>();
44,500✔
696
                auto uncompressed_body_size = msg.read_next<size_t>();
44,500✔
697
                auto compressed_body_size = msg.read_next<size_t>();
44,500✔
698
                auto progress_client_version = msg.read_next<version_type>();
44,500✔
699
                auto progress_server_version = msg.read_next<version_type>();
44,500✔
700
                auto locked_server_version = msg.read_next<version_type>('\n');
44,500✔
701

22,486✔
702
                std::size_t body_size = (is_body_compressed ? compressed_body_size : uncompressed_body_size);
42,154✔
703
                if (body_size > s_max_body_size) {
44,500✔
704
                    auto header = msg_with_header.substr(0, msg_with_header.size() - msg.bytes_remaining());
×
705

706
                    return report_error(ErrorCodes::LimitExceeded,
×
707
                                        "Body size of upload message is too large. Raw header: %1", header);
×
708
                }
×
709

22,486✔
710

22,486✔
711
                std::unique_ptr<char[]> uncompressed_body_buffer;
44,500✔
712
                // if is_body_compressed == true, we must decompress the received body.
22,486✔
713
                if (is_body_compressed) {
44,500✔
714
                    uncompressed_body_buffer = std::make_unique<char[]>(uncompressed_body_size);
4,440✔
715
                    auto compressed_body = msg.read_sized_data<BinaryData>(compressed_body_size);
4,440✔
716

2,094✔
717
                    std::error_code ec = util::compression::decompress(
4,440✔
718
                        compressed_body, {uncompressed_body_buffer.get(), uncompressed_body_size});
4,440✔
719

2,094✔
720
                    if (ec) {
4,440✔
721
                        return report_error(ErrorCodes::RuntimeError, "compression::inflate: %1", ec.message());
×
722
                    }
×
723

2,094✔
724
                    msg = HeaderLineParser(std::string_view(uncompressed_body_buffer.get(), uncompressed_body_size));
4,440✔
725
                }
4,440✔
726

22,486✔
727
                logger.debug(util::LogCategory::changeset,
44,500✔
728
                             "Upload message compression: is_body_compressed = %1, "
44,500✔
729
                             "compressed_body_size=%2, uncompressed_body_size=%3, "
44,500✔
730
                             "progress_client_version=%4, progress_server_version=%5, "
44,500✔
731
                             "locked_server_version=%6",
44,500✔
732
                             is_body_compressed, compressed_body_size, uncompressed_body_size,
44,500✔
733
                             progress_client_version, progress_server_version, locked_server_version); // Throws
44,500✔
734

22,486✔
735

22,486✔
736
                std::vector<UploadChangeset> upload_changesets;
44,500✔
737

22,486✔
738
                // Loop through the body and find the changesets.
22,486✔
739
                while (!msg.at_end()) {
80,962✔
740
                    UploadChangeset upload_changeset;
36,462✔
741
                    size_t changeset_size;
36,462✔
742
                    try {
36,462✔
743
                        upload_changeset.upload_cursor.client_version = msg.read_next<version_type>();
36,462✔
744
                        upload_changeset.upload_cursor.last_integrated_server_version = msg.read_next<version_type>();
36,462✔
745
                        upload_changeset.origin_timestamp = msg.read_next<timestamp_type>();
36,462✔
746
                        upload_changeset.origin_file_ident = msg.read_next<file_ident_type>();
36,462✔
747
                        changeset_size = msg.read_next<size_t>();
36,462✔
748
                    }
36,462✔
749
                    catch (const ProtocolCodecException& e) {
17,686✔
750
                        return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
751
                                            "Bad changeset header syntax: %1", e.what());
×
752
                    }
×
753

17,684✔
754
                    if (changeset_size > msg.bytes_remaining()) {
36,462✔
755
                        return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Bad changeset size");
×
756
                    }
×
757

17,684✔
758
                    upload_changeset.changeset = msg.read_sized_data<BinaryData>(changeset_size);
36,462✔
759

17,684✔
760
                    if (logger.would_log(util::Logger::Level::trace)) {
36,462✔
761
                        logger.trace(util::LogCategory::changeset,
×
762
                                     "Received: UPLOAD CHANGESET(client_version=%1, server_version=%2, "
×
763
                                     "origin_timestamp=%3, origin_file_ident=%4, changeset_size=%5)",
×
764
                                     upload_changeset.upload_cursor.client_version,
×
765
                                     upload_changeset.upload_cursor.last_integrated_server_version,
×
766
                                     upload_changeset.origin_timestamp, upload_changeset.origin_file_ident,
×
767
                                     changeset_size); // Throws
×
768
                        logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
769
                                     clamped_hex_dump(upload_changeset.changeset)); // Throws
×
770
                    }
×
771
                    upload_changesets.push_back(std::move(upload_changeset)); // Throws
36,462✔
772
                }
36,462✔
773

22,486✔
774
                connection.receive_upload_message(session_ident, progress_client_version, progress_server_version,
44,502✔
775
                                                  locked_server_version,
44,500✔
776
                                                  upload_changesets); // Throws
44,500✔
777
            }
44,500✔
778
            else if (message_type == "mark") {
22,994✔
779
                auto session_ident = msg.read_next<session_ident_type>();
12,112✔
780
                auto request_ident = msg.read_next<request_ident_type>('\n');
12,112✔
781

5,988✔
782
                connection.receive_mark_message(session_ident, request_ident); // Throws
12,112✔
783
            }
12,112✔
784
            else if (message_type == "ping") {
10,882✔
785
                auto timestamp = msg.read_next<milliseconds_type>();
152✔
786
                auto rtt = msg.read_next<milliseconds_type>('\n');
152✔
787

52✔
788
                connection.receive_ping(timestamp, rtt);
152✔
789
            }
152✔
790
            else if (message_type == "bind") {
10,730✔
791
                auto session_ident = msg.read_next<session_ident_type>();
4,616✔
792
                auto path_size = msg.read_next<size_t>();
4,616✔
793
                auto signed_user_token_size = msg.read_next<size_t>();
4,616✔
794
                auto need_client_file_ident = msg.read_next<bool>();
4,616✔
795
                auto is_subserver = msg.read_next<bool>('\n');
4,616✔
796

2,516✔
797
                if (path_size == 0) {
4,616✔
798
                    return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Path size in BIND message is zero");
×
799
                }
×
800
                if (path_size > s_max_path_size) {
4,616✔
801
                    return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
802
                                        "Path size in BIND message is too large");
×
803
                }
×
804
                if (signed_user_token_size > s_max_signed_user_token_size) {
4,616✔
805
                    return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
806
                                        "Signed user token size in BIND message is too large");
×
807
                }
×
808

2,516✔
809
                auto path = msg.read_sized_data<std::string>(path_size);
4,616✔
810
                auto signed_user_token = msg.read_sized_data<std::string>(signed_user_token_size);
4,616✔
811

2,516✔
812
                connection.receive_bind_message(session_ident, std::move(path), std::move(signed_user_token),
4,616✔
813
                                                need_client_file_ident, is_subserver); // Throws
4,616✔
814
            }
4,616✔
815
            else if (message_type == "ident") {
6,114✔
816
                auto session_ident = msg.read_next<session_ident_type>();
3,894✔
817
                auto client_file_ident = msg.read_next<file_ident_type>();
3,894✔
818
                auto client_file_ident_salt = msg.read_next<salt_type>();
3,894✔
819
                auto scan_server_version = msg.read_next<version_type>();
3,894✔
820
                auto scan_client_version = msg.read_next<version_type>();
3,894✔
821
                auto latest_server_version = msg.read_next<version_type>();
3,894✔
822
                auto latest_server_version_salt = msg.read_next<salt_type>('\n');
3,894✔
823

1,956✔
824
                connection.receive_ident_message(session_ident, client_file_ident, client_file_ident_salt,
3,894✔
825
                                                 scan_server_version, scan_client_version, latest_server_version,
3,894✔
826
                                                 latest_server_version_salt); // Throws
3,894✔
827
            }
3,894✔
828
            else if (message_type == "unbind") {
2,224✔
829
                auto session_ident = msg.read_next<session_ident_type>('\n');
2,200✔
830

1,268✔
831
                connection.receive_unbind_message(session_ident); // Throws
2,200✔
832
            }
2,200✔
833
            else if (message_type == "json_error") {
2,147,483,671✔
834
                auto error_code = msg.read_next<int>();
×
835
                auto message_size = msg.read_next<size_t>();
×
836
                auto session_ident = msg.read_next<session_ident_type>('\n');
×
837
                auto json_raw = msg.read_sized_data<std::string_view>(message_size);
×
838

839
                connection.receive_error_message(session_ident, error_code, json_raw);
×
840
            }
×
841
            else {
2,147,483,671✔
842
                return report_error(ErrorCodes::SyncProtocolInvariantFailed, "unknown message type %1", message_type);
2,147,483,671✔
843
            }
2,147,483,671✔
844
        }
×
845
        catch (const ProtocolCodecException& e) {
×
846
            return report_error(ErrorCodes::SyncProtocolInvariantFailed, "bad syntax in %1 message: %2", message_type,
×
847
                                e.what());
×
848
        }
×
849
    }
67,494✔
850

851
    void insert_single_changeset_download_message(OutputBuffer&, const ChangesetInfo&, util::Logger&);
852

853
private:
854
    // clang-format off
855
    static constexpr std::size_t s_max_head_size              =  256;
856
    static constexpr std::size_t s_max_signed_user_token_size = 2048;
857
    static constexpr std::size_t s_max_client_info_size       = 1024;
858
    static constexpr std::size_t s_max_path_size              = 1024;
859
    static constexpr std::size_t s_max_changeset_size         = std::numeric_limits<std::size_t>::max(); // FIXME: What is a reasonable value here?
860
    static constexpr std::size_t s_max_body_size              = std::numeric_limits<std::size_t>::max();
861
    // clang-format on
862
};
863

864
// make_authorization_header() makes the value of the Authorization header used in the
865
// sync Websocket handshake.
866
std::string make_authorization_header(const std::string& signed_user_token);
867

868
// parse_authorization_header() parses the value of the Authorization header and returns
869
// the signed_user_token. None is returned in case of syntax error.
870
util::Optional<StringData> parse_authorization_header(const std::string& authorization_header);
871

872
} // namespace realm::_impl
873

874
#endif // REALM_NOINST_PROTOCOL_CODEC_HPP
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc