• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1853

20 Nov 2023 07:46PM UTC coverage: 91.683% (-0.02%) from 91.699%
1853

push

Evergreen

web-flow
Fix client reset cycle detection for PBS recovery errors (#7149)

Tracking that a client reset was in progress was done in the same write
transaction as the recovery operation, so if recovery failed the tracking was
rolled back too. This worked for FLX due to that codepath committing before
beginning recovery.

92262 of 169120 branches covered (0.0%)

31 of 31 new or added lines in 3 files covered. (100.0%)

143 existing lines in 15 files now uncovered.

231277 of 252257 relevant lines covered (91.68%)

6495482.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.21
/src/realm/sync/noinst/protocol_codec.hpp
1
#ifndef REALM_NOINST_PROTOCOL_CODEC_HPP
2
#define REALM_NOINST_PROTOCOL_CODEC_HPP
3

4
#include <cstdint>
5
#include <algorithm>
6
#include <memory>
7
#include <vector>
8
#include <string>
9

10
#include <realm/util/buffer_stream.hpp>
11
#include <realm/util/compression.hpp>
12
#include <realm/util/from_chars.hpp>
13
#include <realm/util/logger.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/optional.hpp>
16
#include <realm/binary_data.hpp>
17
#include <realm/chunked_binary.hpp>
18
#include <realm/sync/changeset_parser.hpp>
19
#include <realm/sync/history.hpp>
20
#include <realm/sync/impl/clamped_hex_dump.hpp>
21
#include <realm/sync/noinst/integer_codec.hpp>
22
#include <realm/sync/protocol.hpp>
23
#include <realm/sync/transform.hpp>
24

25
#include <external/json/json.hpp>
26

27
namespace realm::_impl {
28
struct ProtocolCodecException : public std::runtime_error {
29
    using std::runtime_error::runtime_error;
30
};
31
class HeaderLineParser {
32
public:
33
    explicit HeaderLineParser(std::string_view line)
34
        : m_sv(line)
35
    {
159,468✔
36
    }
159,468✔
37

38
    template <typename T>
39
    T read_next(char expected_terminator = ' ')
40
    {
1,560,734✔
41
        const auto [tok, rest] = peek_token_impl<T>();
1,560,734✔
42
        if (rest.empty()) {
1,560,734✔
43
            throw ProtocolCodecException("header line ended prematurely without terminator");
×
44
        }
×
45
        if (rest.front() != expected_terminator) {
1,560,734✔
46
            throw ProtocolCodecException(util::format(
×
47
                "expected to find delimeter '%1' in header line, but found '%2'", expected_terminator, rest.front()));
×
48
        }
×
49
        m_sv = rest.substr(1);
1,560,734✔
50
        return tok;
1,560,734✔
51
    }
1,560,734✔
52

53
    template <typename T>
54
    T read_sized_data(size_t size)
55
    {
105,102✔
56
        auto ret = m_sv;
105,102✔
57
        advance(size);
105,102✔
58
        return T(ret.data(), size);
105,102✔
59
    }
105,102✔
60

61
    size_t bytes_remaining() const noexcept
62
    {
80,262✔
63
        return m_sv.size();
80,262✔
64
    }
80,262✔
65

66
    std::string_view remaining() const noexcept
67
    {
94,138✔
68
        return m_sv;
94,138✔
69
    }
94,138✔
70

71
    bool at_end() const noexcept
72
    {
1,802,998✔
73
        return m_sv.empty();
1,802,998✔
74
    }
1,802,998✔
75

76
    void advance(size_t size)
77
    {
105,122✔
78
        if (size > m_sv.size()) {
105,122✔
79
            throw ProtocolCodecException(
×
80
                util::format("cannot advance header by %1 characters, only %2 characters left", size, m_sv.size()));
×
81
        }
×
82
        m_sv.remove_prefix(size);
105,122✔
83
    }
105,122✔
84

85
private:
86
    template <typename T>
87
    std::pair<T, std::string_view> peek_token_impl() const
88
    {
1,560,298✔
89
        // We currently only support numeric, string, and boolean values in header lines.
782,794✔
90
        static_assert(std::is_integral_v<T> || is_any_v<T, std::string_view, std::string>);
1,560,298✔
91
        if (at_end()) {
1,560,298✔
92
            throw ProtocolCodecException("reached end of header line prematurely");
×
93
        }
×
94
        if constexpr (is_any_v<T, std::string_view, std::string>) {
1,560,298✔
95
            // Currently all string fields in wire protocol header lines appear at the beginning of the line and
709,356✔
96
            // should be delimited by a space.
709,356✔
97
            auto delim_at = m_sv.find(' ');
1,409,746✔
98
            if (delim_at == std::string_view::npos) {
786,480✔
99
                throw ProtocolCodecException("reached end of header line prematurely");
×
100
            }
×
101

73,442✔
102
            return {m_sv.substr(0, delim_at), m_sv.substr(delim_at)};
150,566✔
103
        }
150,566✔
104
        else if constexpr (std::is_integral_v<T> && !std::is_same_v<T, bool>) {
1,409,746✔
105
            T cur_arg = {};
104,992✔
106
            auto parse_res = util::from_chars(m_sv.data(), m_sv.data() + m_sv.size(), cur_arg, 10);
104,992✔
107
            if (parse_res.ec != std::errc{}) {
1,304,110✔
108
                throw ProtocolCodecException(util::format("error parsing integer in header line: %1",
×
109
                                                          std::make_error_code(parse_res.ec).message()));
×
110
            }
×
111

655,626✔
112
            return {cur_arg, m_sv.substr(parse_res.ptr - m_sv.data())};
1,304,110✔
113
        }
1,304,110✔
114
        else if constexpr (std::is_same_v<T, bool>) {
104,992✔
115
            int cur_arg;
104,992✔
116
            auto parse_res = util::from_chars(m_sv.data(), m_sv.data() + m_sv.size(), cur_arg, 10);
104,992✔
117
            if (parse_res.ec != std::errc{}) {
104,992✔
118
                throw ProtocolCodecException(util::format("error parsing boolean in header line: %1",
×
119
                                                          std::make_error_code(parse_res.ec).message()));
×
120
            }
×
121

53,260✔
122
            return {(cur_arg != 0), m_sv.substr(parse_res.ptr - m_sv.data())};
104,992✔
123
        }
104,992✔
124
    }
1,560,298✔
125

126
    std::string_view m_sv;
127
};
128

129
class ClientProtocol {
130
public:
131
    // clang-format off
132
    using file_ident_type    = sync::file_ident_type;
133
    using version_type       = sync::version_type;
134
    using salt_type          = sync::salt_type;
135
    using timestamp_type     = sync::timestamp_type;
136
    using session_ident_type = sync::session_ident_type;
137
    using request_ident_type = sync::request_ident_type;
138
    using milliseconds_type  = sync::milliseconds_type;
139
    using SaltedFileIdent    = sync::SaltedFileIdent;
140
    using SaltedVersion      = sync::SaltedVersion;
141
    using DownloadCursor     = sync::DownloadCursor;
142
    using UploadCursor       = sync::UploadCursor;
143
    using SyncProgress       = sync::SyncProgress;
144
    // clang-format on
145

146
    using OutputBuffer = util::ResettableExpandableBufferOutputStream;
147
    using RemoteChangeset = sync::RemoteChangeset;
148
    using ReceivedChangesets = std::vector<RemoteChangeset>;
149

150
    /// Messages sent by the client.
151

152
    void make_pbs_bind_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
153
                               const std::string& server_path, const std::string& signed_user_token,
154
                               bool need_client_file_ident, bool is_subserver);
155

156
    void make_flx_bind_message(int protocol_version, OutputBuffer& out, session_ident_type session_ident,
157
                               const nlohmann::json& json_data, const std::string& signed_user_token,
158
                               bool need_client_file_ident, bool is_subserver);
159

160
    void make_pbs_ident_message(OutputBuffer&, session_ident_type session_ident, SaltedFileIdent client_file_ident,
161
                                const SyncProgress& progress);
162

163
    void make_flx_ident_message(OutputBuffer&, session_ident_type session_ident, SaltedFileIdent client_file_ident,
164
                                const SyncProgress& progress, int64_t query_version, std::string_view query_body);
165

166
    void make_query_change_message(OutputBuffer&, session_ident_type, int64_t version, std::string_view query_body);
167

168
    void make_json_error_message(OutputBuffer&, session_ident_type, int error_code, std::string_view error_body);
169

170
    void make_test_command_message(OutputBuffer&, session_ident_type session, request_ident_type request_ident,
171
                                   std::string_view body);
172

173
    class UploadMessageBuilder {
174
    public:
175
        UploadMessageBuilder(OutputBuffer& body_buffer, std::vector<char>& compression_buffer,
176
                             util::compression::CompressMemoryArena& compress_memory_arena);
177

178
        void add_changeset(version_type client_version, version_type server_version, timestamp_type origin_timestamp,
179
                           file_ident_type origin_file_ident, ChunkedBinaryData changeset);
180

181
        void make_upload_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
182
                                 version_type progress_client_version, version_type progress_server_version,
183
                                 version_type locked_server_version);
184

185
    private:
186
        std::size_t m_num_changesets = 0;
187
        OutputBuffer& m_body_buffer;
188
        std::vector<char>& m_compression_buffer;
189
        util::compression::CompressMemoryArena& m_compress_memory_arena;
190
    };
191

192
    UploadMessageBuilder make_upload_message_builder();
193

194
    void make_unbind_message(OutputBuffer&, session_ident_type session_ident);
195

196
    void make_mark_message(OutputBuffer&, session_ident_type session_ident, request_ident_type request_ident);
197

198
    void make_ping(OutputBuffer&, milliseconds_type timestamp, milliseconds_type rtt);
199

200
    std::string compressed_hex_dump(BinaryData blob);
201

202
    // Messages received by the client.
203

204
    // parse_message_received takes a (WebSocket) message and parses it.
205
    // The result of the parsing is handled by an object of type Connection.
206
    // Typically, Connection would be the Connection class from client.cpp
207
    template <class Connection>
208
    void parse_message_received(Connection& connection, std::string_view msg_data)
209
    {
76,768✔
210
        util::Logger& logger = connection.logger;
76,768✔
211
        auto report_error = [&](const auto fmt, auto&&... args) {
38,028✔
212
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
213
            connection.handle_protocol_error(Status{ErrorCodes::SyncProtocolInvariantFailed, std::move(msg)});
×
214
        };
×
215

38,028✔
216
        HeaderLineParser msg(msg_data);
76,768✔
217
        std::string_view message_type;
76,768✔
218
        try {
76,768✔
219
            message_type = msg.read_next<std::string_view>();
76,768✔
220
        }
76,768✔
221
        catch (const ProtocolCodecException& e) {
38,028✔
222
            return report_error("Could not find message type in message: %1", e.what());
×
223
        }
×
224

38,026✔
225
        try {
76,768✔
226
            if (message_type == "download") {
76,768✔
227
                parse_download_message(connection, msg);
44,002✔
228
            }
44,002✔
229
            else if (message_type == "pong") {
32,766✔
230
                auto timestamp = msg.read_next<milliseconds_type>('\n');
156✔
231
                connection.receive_pong(timestamp);
156✔
232
            }
156✔
233
            else if (message_type == "unbound") {
32,610✔
234
                auto session_ident = msg.read_next<session_ident_type>('\n');
4,932✔
235
                connection.receive_unbound_message(session_ident); // Throws
4,932✔
236
            }
4,932✔
237
            else if (message_type == "error") {
27,678✔
238
                auto error_code = msg.read_next<int>();
84✔
239
                auto message_size = msg.read_next<size_t>();
84✔
240
                auto is_fatal = sync::IsFatal{!msg.read_next<bool>()};
84✔
241
                auto session_ident = msg.read_next<session_ident_type>('\n');
84✔
242
                auto message = msg.read_sized_data<StringData>(message_size);
84✔
243

42✔
244
                connection.receive_error_message(sync::ProtocolErrorInfo{error_code, message, is_fatal},
84✔
245
                                                 session_ident); // Throws
84✔
246
            }
84✔
247
            else if (message_type == "log_message") { // introduced in protocol version 10
27,594✔
248
                parse_log_message(connection, msg);
6,720✔
249
            }
6,720✔
250
            else if (message_type == "json_error") { // introduced in protocol 4
20,874✔
251
                sync::ProtocolErrorInfo info{};
906✔
252
                info.raw_error_code = msg.read_next<int>();
906✔
253
                auto message_size = msg.read_next<size_t>();
906✔
254
                auto session_ident = msg.read_next<session_ident_type>('\n');
906✔
255
                auto json_raw = msg.read_sized_data<std::string_view>(message_size);
906✔
256
                try {
906✔
257
                    auto json = nlohmann::json::parse(json_raw);
906✔
258
                    logger.trace("Error message encoded as json: %1", json_raw);
906✔
259
                    info.client_reset_recovery_is_disabled = json["isRecoveryModeDisabled"];
906✔
260
                    info.is_fatal = sync::IsFatal{!json["tryAgain"]};
906✔
261
                    info.message = json["message"];
906✔
262
                    info.log_url = std::make_optional<std::string>(json["logURL"]);
906✔
263
                    info.should_client_reset = std::make_optional<bool>(json["shouldClientReset"]);
906✔
264
                    info.server_requests_action = string_to_action(json["action"]); // Throws
906✔
265

470✔
266
                    if (auto backoff_interval = json.find("backoffIntervalSec"); backoff_interval != json.end()) {
906✔
267
                        info.resumption_delay_interval.emplace();
770✔
268
                        info.resumption_delay_interval->resumption_delay_interval =
770✔
269
                            std::chrono::seconds{backoff_interval->get<int>()};
770✔
270
                        info.resumption_delay_interval->max_resumption_delay_interval =
770✔
271
                            std::chrono::seconds{json.at("backoffMaxDelaySec").get<int>()};
770✔
272
                        info.resumption_delay_interval->resumption_delay_backoff_multiplier =
770✔
273
                            json.at("backoffMultiplier").get<int>();
770✔
274
                    }
770✔
275

470✔
276
                    if (info.raw_error_code == static_cast<int>(sync::ProtocolError::migrate_to_flx)) {
906✔
277
                        auto query_string = json.find("partitionQuery");
36✔
278
                        if (query_string == json.end() || !query_string->is_string() ||
36✔
279
                            query_string->get<std::string_view>().empty()) {
36✔
280
                            return report_error(
×
281
                                "Missing/invalid partition query string in migrate to flexible sync error response");
×
282
                        }
×
283

18✔
284
                        info.migration_query_string.emplace(query_string->get<std::string_view>());
36✔
285
                    }
36✔
286

470✔
287
                    if (auto rejected_updates = json.find("rejectedUpdates"); rejected_updates != json.end()) {
906✔
288
                        if (!rejected_updates->is_array()) {
52✔
289
                            return report_error(
×
290
                                "Compensating writes error list is not stored in an array as expected");
×
291
                        }
×
292

26✔
293
                        for (const auto& rejected_update : *rejected_updates) {
60✔
294
                            if (!rejected_update.is_object()) {
60✔
295
                                return report_error(
×
296
                                    "Compensating write error information is not stored in an object as expected");
×
297
                            }
×
298

30✔
299
                            sync::CompensatingWriteErrorInfo cwei;
60✔
300
                            cwei.reason = rejected_update["reason"];
60✔
301
                            cwei.object_name = rejected_update["table"];
60✔
302
                            std::string_view pk = rejected_update["pk"].get<std::string_view>();
60✔
303
                            cwei.primary_key = sync::parse_base64_encoded_primary_key(pk);
60✔
304
                            info.compensating_writes.push_back(std::move(cwei));
60✔
305
                        }
60✔
306

26✔
307
                        // Not provided when 'write_not_allowed' (230) error is received from the server.
26✔
308
                        if (auto server_version = json.find("compensatingWriteServerVersion");
52✔
309
                            server_version != json.end()) {
52✔
310
                            info.compensating_write_server_version =
48✔
311
                                std::make_optional<version_type>(server_version->get<int64_t>());
48✔
312
                        }
48✔
313
                        info.compensating_write_rejected_client_version =
52✔
314
                            json.at("rejectedClientVersion").get<int64_t>();
52✔
315
                    }
52✔
316
                }
906✔
317
                catch (const nlohmann::json::exception& e) {
470✔
318
                    // If any of the above json fields are not present, this is a fatal error
319
                    // however, additional optional fields may be added in the future.
320
                    return report_error("Failed to parse 'json_error' with error_code %1: '%2'", info.raw_error_code,
×
321
                                        e.what());
×
322
                }
×
323
                connection.receive_error_message(info, session_ident); // Throws
906✔
324
            }
906✔
325
            else if (message_type == "query_error") {
19,968✔
326
                auto error_code = msg.read_next<int>();
16✔
327
                auto message_size = msg.read_next<size_t>();
16✔
328
                auto session_ident = msg.read_next<session_ident_type>();
16✔
329
                auto query_version = msg.read_next<int64_t>('\n');
16✔
330

8✔
331
                auto message = msg.read_sized_data<std::string_view>(message_size);
16✔
332

8✔
333
                connection.receive_query_error_message(error_code, message, query_version, session_ident); // throws
16✔
334
            }
16✔
335
            else if (message_type == "mark") {
19,952✔
336
                auto session_ident = msg.read_next<session_ident_type>();
16,550✔
337
                auto request_ident = msg.read_next<request_ident_type>('\n');
16,550✔
338

7,738✔
339
                connection.receive_mark_message(session_ident, request_ident); // Throws
16,550✔
340
            }
16,550✔
341
            else if (message_type == "ident") {
3,402✔
342
                session_ident_type session_ident = msg.read_next<session_ident_type>();
3,358✔
343
                SaltedFileIdent client_file_ident;
3,358✔
344
                client_file_ident.ident = msg.read_next<file_ident_type>();
3,358✔
345
                client_file_ident.salt = msg.read_next<salt_type>('\n');
3,358✔
346

1,546✔
347
                connection.receive_ident_message(session_ident, client_file_ident); // Throws
3,358✔
348
            }
3,358✔
349
            else if (message_type == "test_command") {
44✔
350
                session_ident_type session_ident = msg.read_next<session_ident_type>();
44✔
351
                request_ident_type request_ident = msg.read_next<request_ident_type>();
44✔
352
                auto body_size = msg.read_next<size_t>('\n');
44✔
353
                auto body = msg.read_sized_data<std::string_view>(body_size);
44✔
354

22✔
355
                connection.receive_test_command_response(session_ident, request_ident, body);
44✔
356
            }
44✔
UNCOV
357
            else {
×
UNCOV
358
                return report_error("Unknown input message type '%1'", msg_data);
×
UNCOV
359
            }
×
360
        }
×
361
        catch (const ProtocolCodecException& e) {
×
362
            return report_error("Bad syntax in %1 message: %2", message_type, e.what());
×
363
        }
×
364
        if (!msg.at_end()) {
76,770✔
365
            return report_error("wire protocol message had leftover data after being parsed");
×
366
        }
×
367
    }
76,770✔
368

369
private:
370
    template <typename Connection>
371
    void parse_download_message(Connection& connection, HeaderLineParser& msg)
372
    {
44,002✔
373
        util::Logger& logger = connection.logger;
44,002✔
374
        auto report_error = [&](ErrorCodes::Error code, const auto fmt, auto&&... args) {
23,194✔
375
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
376
            connection.handle_protocol_error(Status{code, std::move(msg)});
×
377
        };
×
378

23,194✔
379
        auto msg_with_header = msg.remaining();
44,002✔
380
        auto session_ident = msg.read_next<session_ident_type>();
44,002✔
381
        SyncProgress progress;
44,002✔
382
        progress.download.server_version = msg.read_next<version_type>();
44,002✔
383
        progress.download.last_integrated_client_version = msg.read_next<version_type>();
44,002✔
384
        progress.latest_server_version.version = msg.read_next<version_type>();
44,002✔
385
        progress.latest_server_version.salt = msg.read_next<salt_type>();
44,002✔
386
        progress.upload.client_version = msg.read_next<version_type>();
44,002✔
387
        progress.upload.last_integrated_server_version = msg.read_next<version_type>();
44,002✔
388
        auto query_version = connection.is_flx_sync_connection() ? msg.read_next<int64_t>() : 0;
42,724✔
389

23,194✔
390
        // If this is a PBS connection, then every download message is its own complete batch.
23,194✔
391
        auto last_in_batch = connection.is_flx_sync_connection() ? msg.read_next<bool>() : true;
42,724✔
392
        auto downloadable_bytes = msg.read_next<int64_t>();
44,002✔
393
        auto is_body_compressed = msg.read_next<bool>();
44,002✔
394
        auto uncompressed_body_size = msg.read_next<size_t>();
44,002✔
395
        auto compressed_body_size = msg.read_next<size_t>('\n');
44,002✔
396

23,194✔
397
        if (uncompressed_body_size > s_max_body_size) {
44,002✔
398
            auto header = msg_with_header.substr(0, msg_with_header.size() - msg.remaining().size());
×
399
            return report_error(ErrorCodes::LimitExceeded, "Limits exceeded in input message '%1'", header);
×
400
        }
×
401

23,194✔
402
        std::unique_ptr<char[]> uncompressed_body_buffer;
44,002✔
403
        // if is_body_compressed == true, we must decompress the received body.
23,194✔
404
        if (is_body_compressed) {
44,002✔
405
            uncompressed_body_buffer = std::make_unique<char[]>(uncompressed_body_size);
4,466✔
406
            std::error_code ec =
4,466✔
407
                util::compression::decompress({msg.remaining().data(), compressed_body_size},
4,466✔
408
                                              {uncompressed_body_buffer.get(), uncompressed_body_size});
4,466✔
409

2,230✔
410
            if (ec) {
4,466✔
411
                return report_error(ErrorCodes::RuntimeError, "compression::inflate: %1", ec.message());
×
412
            }
×
413

2,230✔
414
            msg = HeaderLineParser(std::string_view(uncompressed_body_buffer.get(), uncompressed_body_size));
4,466✔
415
        }
4,466✔
416

23,194✔
417
        logger.debug("Download message compression: session_ident=%1, is_body_compressed=%2, "
44,002✔
418
                     "compressed_body_size=%3, uncompressed_body_size=%4",
44,002✔
419
                     session_ident, is_body_compressed, compressed_body_size, uncompressed_body_size);
44,002✔
420

23,194✔
421
        ReceivedChangesets received_changesets;
44,002✔
422

23,194✔
423
        // Loop through the body and find the changesets.
23,194✔
424
        while (!msg.at_end()) {
87,088✔
425
            RemoteChangeset cur_changeset;
43,086✔
426
            cur_changeset.remote_version = msg.read_next<version_type>();
43,086✔
427
            cur_changeset.last_integrated_local_version = msg.read_next<version_type>();
43,086✔
428
            cur_changeset.origin_timestamp = msg.read_next<timestamp_type>();
43,086✔
429
            cur_changeset.origin_file_ident = msg.read_next<file_ident_type>();
43,086✔
430
            cur_changeset.original_changeset_size = msg.read_next<size_t>();
43,086✔
431
            auto changeset_size = msg.read_next<size_t>();
43,086✔
432

22,174✔
433
            if (changeset_size > msg.bytes_remaining()) {
43,086✔
434
                return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Bad changeset size %1 > %2",
×
435
                                    changeset_size, msg.bytes_remaining());
×
436
            }
×
437
            if (cur_changeset.remote_version == 0) {
43,086✔
438
                return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
439
                                    "Server version in downloaded changeset cannot be zero");
×
440
            }
×
441
            auto changeset_data = msg.read_sized_data<BinaryData>(changeset_size);
43,086✔
442
            logger.debug("Received: DOWNLOAD CHANGESET(session_ident=%1, server_version=%2, "
43,086✔
443
                         "client_version=%3, origin_timestamp=%4, origin_file_ident=%5, "
43,086✔
444
                         "original_changeset_size=%6, changeset_size=%7)",
43,086✔
445
                         session_ident, cur_changeset.remote_version, cur_changeset.last_integrated_local_version,
43,086✔
446
                         cur_changeset.origin_timestamp, cur_changeset.origin_file_ident,
43,086✔
447
                         cur_changeset.original_changeset_size, changeset_size); // Throws
43,086✔
448
            if (logger.would_log(util::Logger::Level::trace)) {
43,086✔
449
                if (changeset_data.size() < 1056) {
×
450
                    logger.trace("Changeset: %1",
×
451
                                 clamped_hex_dump(changeset_data)); // Throws
×
452
                }
×
453
                else {
×
454
                    logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
455
                                 compressed_hex_dump(changeset_data)); // Throws
×
456
                }
×
457
#if REALM_DEBUG
×
458
                ChunkedBinaryInputStream in{changeset_data};
×
459
                sync::Changeset log;
×
460
                sync::parse_changeset(in, log);
×
461
                std::stringstream ss;
×
462
                log.print(ss);
×
463
                logger.trace("Changeset (parsed):\n%1", ss.str());
×
464
#endif
×
465
            }
×
466

22,174✔
467
            cur_changeset.data = changeset_data;
43,086✔
468
            received_changesets.push_back(std::move(cur_changeset)); // Throws
43,086✔
469
        }
43,086✔
470

23,194✔
471
        auto batch_state =
44,002✔
472
            last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;
43,922✔
473
        connection.receive_download_message(session_ident, progress, downloadable_bytes, query_version, batch_state,
44,002✔
474
                                            received_changesets); // Throws
44,002✔
475
    }
44,002✔
476

477
    static sync::ProtocolErrorInfo::Action string_to_action(const std::string& action_string)
478
    {
906✔
479
        using action = sync::ProtocolErrorInfo::Action;
906✔
480
        static const std::unordered_map<std::string, action> mapping{
906✔
481
            {"ProtocolViolation", action::ProtocolViolation},
906✔
482
            {"ApplicationBug", action::ApplicationBug},
906✔
483
            {"Warning", action::Warning},
906✔
484
            {"Transient", action::Transient},
906✔
485
            {"DeleteRealm", action::DeleteRealm},
906✔
486
            {"ClientReset", action::ClientReset},
906✔
487
            {"ClientResetNoRecovery", action::ClientResetNoRecovery},
906✔
488
            {"MigrateToFLX", action::MigrateToFLX},
906✔
489
            {"RevertToPBS", action::RevertToPBS},
906✔
490
            {"RefreshUser", action::RefreshUser},
906✔
491
            {"RefreshLocation", action::RefreshLocation},
906✔
492
            {"LogOutUser", action::LogOutUser},
906✔
493
        };
906✔
494

470✔
495
        if (auto action_it = mapping.find(action_string); action_it != mapping.end()) {
906✔
496
            return action_it->second;
898✔
497
        }
898✔
498
        return action::ApplicationBug;
8✔
499
    }
8✔
500

501
    template <typename Connection>
502
    void parse_log_message(Connection& connection, HeaderLineParser& msg)
503
    {
6,720✔
504
        auto report_error = [&](const auto fmt, auto&&... args) {
2,952✔
505
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
506
            connection.handle_protocol_error(Status{ErrorCodes::SyncProtocolInvariantFailed, std::move(msg)});
×
507
        };
×
508

2,952✔
509
        auto session_ident = msg.read_next<session_ident_type>();
6,720✔
510
        auto message_length = msg.read_next<size_t>('\n');
6,720✔
511
        auto message_body_str = msg.read_sized_data<std::string_view>(message_length);
6,720✔
512
        nlohmann::json message_body;
6,720✔
513
        try {
6,720✔
514
            message_body = nlohmann::json::parse(message_body_str);
6,720✔
515
        }
6,720✔
516
        catch (const nlohmann::json::exception& e) {
2,952✔
517
            return report_error("Malformed json in log_message message: \"%1\": %2", message_body_str, e.what());
×
518
        }
×
519
        static const std::unordered_map<std::string_view, util::Logger::Level> name_to_level = {
6,720✔
520
            {"fatal", util::Logger::Level::fatal},   {"error", util::Logger::Level::error},
6,720✔
521
            {"warn", util::Logger::Level::warn},     {"info", util::Logger::Level::info},
6,720✔
522
            {"detail", util::Logger::Level::detail}, {"debug", util::Logger::Level::debug},
6,720✔
523
            {"trace", util::Logger::Level::trace},
6,720✔
524
        };
6,720✔
525

2,952✔
526
        // See if the log_message contains the appservices_request_id
2,952✔
527
        if (auto it = message_body.find("co_id"); it != message_body.end() && it->is_string()) {
6,720✔
528
            connection.receive_appservices_request_id(it->get<std::string_view>());
1,918✔
529
        }
1,918✔
530

2,952✔
531
        std::string_view log_level;
6,720✔
532
        bool has_level = false;
6,720✔
533
        if (auto it = message_body.find("level"); it != message_body.end() && it->is_string()) {
6,720✔
534
            log_level = it->get<std::string_view>();
6,720✔
535
            has_level = !log_level.empty();
6,720✔
536
        }
6,720✔
537

2,952✔
538
        std::string_view msg_text;
6,720✔
539
        if (auto it = message_body.find("msg"); it != message_body.end() && it->is_string()) {
6,720✔
540
            msg_text = it->get<std::string_view>();
6,720✔
541
        }
6,720✔
542

2,952✔
543
        // If there is no message text, then we're done
2,952✔
544
        if (msg_text.empty()) {
6,720✔
545
            return;
×
546
        }
×
547

2,952✔
548
        // If a log level wasn't provided, default to debug
2,952✔
549
        util::Logger::Level parsed_level = util::Logger::Level::debug;
6,720✔
550
        if (has_level) {
6,720✔
551
            if (auto it = name_to_level.find(log_level); it != name_to_level.end()) {
6,720✔
552
                parsed_level = it->second;
6,720✔
553
            }
6,720✔
554
            else {
×
555
                return report_error("Unknown log level found in log_message: \"%1\"", log_level);
×
556
            }
×
557
        }
6,720✔
558
        connection.receive_server_log_message(session_ident, parsed_level, msg_text);
6,720✔
559
    }
6,720✔
560

561
    static constexpr std::size_t s_max_body_size = std::numeric_limits<std::size_t>::max();
562

563
    // Permanent buffer to use for building messages.
564
    OutputBuffer m_output_buffer;
565

566
    // Permanent buffers to use for internal purposes such as compression.
567
    std::vector<char> m_buffer;
568

569
    util::compression::CompressMemoryArena m_compress_memory_arena;
570
};
571

572

573
class ServerProtocol {
574
public:
575
    // clang-format off
576
    using file_ident_type    = sync::file_ident_type;
577
    using version_type       = sync::version_type;
578
    using salt_type          = sync::salt_type;
579
    using timestamp_type     = sync::timestamp_type;
580
    using session_ident_type = sync::session_ident_type;
581
    using request_ident_type = sync::request_ident_type;
582
    using SaltedFileIdent    = sync::SaltedFileIdent;
583
    using SaltedVersion      = sync::SaltedVersion;
584
    using milliseconds_type  = sync::milliseconds_type;
585
    using UploadCursor       = sync::UploadCursor;
586
    // clang-format on
587

588
    using OutputBuffer = util::ResettableExpandableBufferOutputStream;
589

590
    // Messages sent by the server to the client
591

592
    void make_ident_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
593
                            file_ident_type client_file_ident, salt_type client_file_ident_salt);
594

595
    void make_alloc_message(OutputBuffer&, session_ident_type session_ident, file_ident_type file_ident);
596

597
    void make_unbound_message(OutputBuffer&, session_ident_type session_ident);
598

599

600
    struct ChangesetInfo {
601
        version_type server_version;
602
        version_type client_version;
603
        sync::HistoryEntry entry;
604
        std::size_t original_size;
605
    };
606

607
    void make_download_message(int protocol_version, OutputBuffer&, session_ident_type session_ident,
608
                               version_type download_server_version, version_type download_client_version,
609
                               version_type latest_server_version, salt_type latest_server_version_salt,
610
                               version_type upload_client_version, version_type upload_server_version,
611
                               std::uint_fast64_t downloadable_bytes, std::size_t num_changesets, const char* body,
612
                               std::size_t uncompressed_body_size, std::size_t compressed_body_size,
613
                               bool body_is_compressed, util::Logger&);
614

615
    void make_mark_message(OutputBuffer&, session_ident_type session_ident, request_ident_type request_ident);
616

617
    void make_error_message(int protocol_version, OutputBuffer&, sync::ProtocolError error_code, const char* message,
618
                            std::size_t message_size, bool try_again, session_ident_type session_ident);
619

620
    void make_pong(OutputBuffer&, milliseconds_type timestamp);
621

622
    void make_log_message(OutputBuffer& out, util::Logger::Level level, std::string message,
623
                          session_ident_type sess_id = 0, std::optional<std::string> co_id = std::nullopt);
624

625
    // Messages received by the server.
626

627
    // parse_ping_received takes a (WebSocket) ping and parses it.
628
    // The result of the parsing is handled by an object of type Connection.
629
    // Typically, Connection would be the Connection class from server.cpp
630
    template <typename Connection>
631
    void parse_ping_received(Connection& connection, std::string_view msg_data)
632
    {
×
633
        try {
×
634
            HeaderLineParser msg(msg_data);
×
635
            auto timestamp = msg.read_next<milliseconds_type>();
×
636
            auto rtt = msg.read_next<milliseconds_type>('\n');
×
637

638
            connection.receive_ping(timestamp, rtt);
×
639
        }
×
640
        catch (const ProtocolCodecException& e) {
×
641
            connection.handle_protocol_error(Status{ErrorCodes::SyncProtocolInvariantFailed,
×
642
                                                    util::format("Bad syntax in PING message: %1", e.what())});
×
643
        }
×
644
    }
×
645

646
    // UploadChangeset is used to store received changesets in
647
    // the UPLOAD message.
648
    struct UploadChangeset {
649
        UploadCursor upload_cursor;
650
        timestamp_type origin_timestamp;
651
        file_ident_type origin_file_ident; // Zero when originating from connected client file
652
        BinaryData changeset;
653
    };
654

655
    // parse_message_received takes a (WebSocket) message and parses it.
656
    // The result of the parsing is handled by an object of type Connection.
657
    // Typically, Connection would be the Connection class from server.cpp
658
    template <class Connection>
659
    void parse_message_received(Connection& connection, std::string_view msg_data)
660
    {
73,800✔
661
        auto& logger = connection.logger;
73,800✔
662

35,414✔
663
        auto report_error = [&](ErrorCodes::Error err, const auto fmt, auto&&... args) {
35,414✔
664
            auto msg = util::format(fmt, std::forward<decltype(args)>(args)...);
×
665
            connection.handle_protocol_error(Status{err, std::move(msg)});
×
666
        };
×
667

35,414✔
668
        HeaderLineParser msg(msg_data);
73,800✔
669
        std::string_view message_type;
73,800✔
670
        try {
73,800✔
671
            message_type = msg.read_next<std::string_view>();
73,800✔
672
        }
73,800✔
673
        catch (const ProtocolCodecException& e) {
35,414✔
674
            return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Could not find message type in message: %1",
×
675
                                e.what());
×
676
        }
×
677

35,420✔
678
        try {
73,808✔
679
            if (message_type == "upload") {
73,808✔
680
                auto msg_with_header = msg.remaining();
45,690✔
681
                auto session_ident = msg.read_next<session_ident_type>();
45,690✔
682
                auto is_body_compressed = msg.read_next<bool>();
45,690✔
683
                auto uncompressed_body_size = msg.read_next<size_t>();
45,690✔
684
                auto compressed_body_size = msg.read_next<size_t>();
45,690✔
685
                auto progress_client_version = msg.read_next<version_type>();
45,690✔
686
                auto progress_server_version = msg.read_next<version_type>();
45,690✔
687
                auto locked_server_version = msg.read_next<version_type>('\n');
45,690✔
688

23,172✔
689
                std::size_t body_size = (is_body_compressed ? compressed_body_size : uncompressed_body_size);
43,342✔
690
                if (body_size > s_max_body_size) {
45,690✔
691
                    auto header = msg_with_header.substr(0, msg_with_header.size() - msg.bytes_remaining());
×
692

693
                    return report_error(ErrorCodes::LimitExceeded,
×
694
                                        "Body size of upload message is too large. Raw header: %1", header);
×
695
                }
×
696

23,172✔
697

23,172✔
698
                std::unique_ptr<char[]> uncompressed_body_buffer;
45,690✔
699
                // if is_body_compressed == true, we must decompress the received body.
23,172✔
700
                if (is_body_compressed) {
45,690✔
701
                    uncompressed_body_buffer = std::make_unique<char[]>(uncompressed_body_size);
4,442✔
702
                    auto compressed_body = msg.read_sized_data<BinaryData>(compressed_body_size);
4,442✔
703

2,094✔
704
                    std::error_code ec = util::compression::decompress(
4,442✔
705
                        compressed_body, {uncompressed_body_buffer.get(), uncompressed_body_size});
4,442✔
706

2,094✔
707
                    if (ec) {
4,442✔
708
                        return report_error(ErrorCodes::RuntimeError, "compression::inflate: %1", ec.message());
×
709
                    }
×
710

2,094✔
711
                    msg = HeaderLineParser(std::string_view(uncompressed_body_buffer.get(), uncompressed_body_size));
4,442✔
712
                }
4,442✔
713

23,172✔
714
                logger.debug("Upload message compression: is_body_compressed = %1, "
45,690✔
715
                             "compressed_body_size=%2, uncompressed_body_size=%3, "
45,690✔
716
                             "progress_client_version=%4, progress_server_version=%5, "
45,690✔
717
                             "locked_server_version=%6",
45,690✔
718
                             is_body_compressed, compressed_body_size, uncompressed_body_size,
45,690✔
719
                             progress_client_version, progress_server_version, locked_server_version); // Throws
45,690✔
720

23,172✔
721

23,172✔
722
                std::vector<UploadChangeset> upload_changesets;
45,690✔
723

23,172✔
724
                // Loop through the body and find the changesets.
23,172✔
725
                while (!msg.at_end()) {
82,886✔
726
                    UploadChangeset upload_changeset;
37,194✔
727
                    size_t changeset_size;
37,194✔
728
                    try {
37,194✔
729
                        upload_changeset.upload_cursor.client_version = msg.read_next<version_type>();
37,194✔
730
                        upload_changeset.upload_cursor.last_integrated_server_version = msg.read_next<version_type>();
37,194✔
731
                        upload_changeset.origin_timestamp = msg.read_next<timestamp_type>();
37,194✔
732
                        upload_changeset.origin_file_ident = msg.read_next<file_ident_type>();
37,194✔
733
                        changeset_size = msg.read_next<size_t>();
37,194✔
734
                    }
37,194✔
735
                    catch (const ProtocolCodecException& e) {
17,962✔
736
                        return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
737
                                            "Bad changeset header syntax: %1", e.what());
×
738
                    }
×
739

17,964✔
740
                    if (changeset_size > msg.bytes_remaining()) {
37,196✔
741
                        return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Bad changeset size");
×
742
                    }
×
743

17,964✔
744
                    upload_changeset.changeset = msg.read_sized_data<BinaryData>(changeset_size);
37,196✔
745

17,964✔
746
                    if (logger.would_log(util::Logger::Level::trace)) {
37,196✔
747
                        logger.trace("Received: UPLOAD CHANGESET(client_version=%1, server_version=%2, "
×
748
                                     "origin_timestamp=%3, origin_file_ident=%4, changeset_size=%5)",
×
749
                                     upload_changeset.upload_cursor.client_version,
×
750
                                     upload_changeset.upload_cursor.last_integrated_server_version,
×
751
                                     upload_changeset.origin_timestamp, upload_changeset.origin_file_ident,
×
752
                                     changeset_size); // Throws
×
753
                        logger.trace("Changeset: %1",
×
754
                                     clamped_hex_dump(upload_changeset.changeset)); // Throws
×
755
                    }
×
756
                    upload_changesets.push_back(std::move(upload_changeset)); // Throws
37,196✔
757
                }
37,196✔
758

23,172✔
759
                connection.receive_upload_message(session_ident, progress_client_version, progress_server_version,
45,692✔
760
                                                  locked_server_version,
45,692✔
761
                                                  upload_changesets); // Throws
45,692✔
762
            }
45,692✔
763
            else if (message_type == "mark") {
28,118✔
764
                auto session_ident = msg.read_next<session_ident_type>();
13,034✔
765
                auto request_ident = msg.read_next<request_ident_type>('\n');
13,034✔
766

5,996✔
767
                connection.receive_mark_message(session_ident, request_ident); // Throws
13,034✔
768
            }
13,034✔
769
            else if (message_type == "ping") {
15,084✔
770
                auto timestamp = msg.read_next<milliseconds_type>();
122✔
771
                auto rtt = msg.read_next<milliseconds_type>('\n');
122✔
772

18✔
773
                connection.receive_ping(timestamp, rtt);
122✔
774
            }
122✔
775
            else if (message_type == "bind") {
14,962✔
776
                auto session_ident = msg.read_next<session_ident_type>();
6,336✔
777
                auto path_size = msg.read_next<size_t>();
6,336✔
778
                auto signed_user_token_size = msg.read_next<size_t>();
6,336✔
779
                auto need_client_file_ident = msg.read_next<bool>();
6,336✔
780
                auto is_subserver = msg.read_next<bool>('\n');
6,336✔
781

2,792✔
782
                if (path_size == 0) {
6,336✔
783
                    return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Path size in BIND message is zero");
×
784
                }
×
785
                if (path_size > s_max_path_size) {
6,336✔
786
                    return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
787
                                        "Path size in BIND message is too large");
×
788
                }
×
789
                if (signed_user_token_size > s_max_signed_user_token_size) {
6,336✔
790
                    return report_error(ErrorCodes::SyncProtocolInvariantFailed,
×
791
                                        "Signed user token size in BIND message is too large");
×
792
                }
×
793

2,792✔
794
                auto path = msg.read_sized_data<std::string>(path_size);
6,336✔
795
                auto signed_user_token = msg.read_sized_data<std::string>(signed_user_token_size);
6,336✔
796

2,792✔
797
                connection.receive_bind_message(session_ident, std::move(path), std::move(signed_user_token),
6,336✔
798
                                                need_client_file_ident, is_subserver); // Throws
6,336✔
799
            }
6,336✔
800
            else if (message_type == "ident") {
8,626✔
801
                auto session_ident = msg.read_next<session_ident_type>();
5,050✔
802
                auto client_file_ident = msg.read_next<file_ident_type>();
5,050✔
803
                auto client_file_ident_salt = msg.read_next<salt_type>();
5,050✔
804
                auto scan_server_version = msg.read_next<version_type>();
5,050✔
805
                auto scan_client_version = msg.read_next<version_type>();
5,050✔
806
                auto latest_server_version = msg.read_next<version_type>();
5,050✔
807
                auto latest_server_version_salt = msg.read_next<salt_type>('\n');
5,050✔
808

2,100✔
809
                connection.receive_ident_message(session_ident, client_file_ident, client_file_ident_salt,
5,050✔
810
                                                 scan_server_version, scan_client_version, latest_server_version,
5,050✔
811
                                                 latest_server_version_salt); // Throws
5,050✔
812
            }
5,050✔
813
            else if (message_type == "unbind") {
3,576✔
814
                auto session_ident = msg.read_next<session_ident_type>('\n');
3,570✔
815

1,336✔
816
                connection.receive_unbind_message(session_ident); // Throws
3,570✔
817
            }
3,570✔
818
            else if (message_type == "json_error") {
6!
819
                auto error_code = msg.read_next<int>();
×
820
                auto message_size = msg.read_next<size_t>();
×
821
                auto session_ident = msg.read_next<session_ident_type>('\n');
×
822
                auto json_raw = msg.read_sized_data<std::string_view>(message_size);
×
823

824
                connection.receive_error_message(session_ident, error_code, json_raw);
×
825
            }
×
826
            else {
6✔
827
                return report_error(ErrorCodes::SyncProtocolInvariantFailed, "unknown message type %1", message_type);
6✔
828
            }
6✔
829
        }
×
830
        catch (const ProtocolCodecException& e) {
×
831
            return report_error(ErrorCodes::SyncProtocolInvariantFailed, "bad syntax in %1 message: %2", message_type,
×
832
                                e.what());
×
833
        }
×
834
    }
73,808✔
835

836
    void insert_single_changeset_download_message(OutputBuffer&, const ChangesetInfo&, util::Logger&);
837

838
private:
839
    // clang-format off
840
    static constexpr std::size_t s_max_head_size              =  256;
841
    static constexpr std::size_t s_max_signed_user_token_size = 2048;
842
    static constexpr std::size_t s_max_client_info_size       = 1024;
843
    static constexpr std::size_t s_max_path_size              = 1024;
844
    static constexpr std::size_t s_max_changeset_size         = std::numeric_limits<std::size_t>::max(); // FIXME: What is a reasonable value here?
845
    static constexpr std::size_t s_max_body_size              = std::numeric_limits<std::size_t>::max();
846
    // clang-format on
847
};
848

849
// make_authorization_header() makes the value of the Authorization header used in the
850
// sync Websocket handshake.
851
std::string make_authorization_header(const std::string& signed_user_token);
852

853
// parse_authorization_header() parses the value of the Authorization header and returns
854
// the signed_user_token. None is returned in case of syntax error.
855
util::Optional<StringData> parse_authorization_header(const std::string& authorization_header);
856

857
} // namespace realm::_impl
858

859
#endif // REALM_NOINST_PROTOCOL_CODEC_HPP
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc