• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2029

12 Feb 2024 06:33PM UTC coverage: 91.838%. First build
2029

push

Evergreen

web-flow
Merge pull request #7329 from realm/tg/jwt-validation

Assorted sync metadata storage refactoring

93050 of 171486 branches covered (0.0%)

219 of 246 new or added lines in 23 files covered. (89.02%)

235273 of 256184 relevant lines covered (91.84%)

6516157.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.18
/src/realm/sync/noinst/protocol_codec.cpp
1
#include <realm/util/assert.hpp>
2
#include <realm/util/base64.hpp>
3
#include <realm/util/from_chars.hpp>
4
#include <realm/sync/noinst/protocol_codec.hpp>
5

6
namespace realm::_impl {
7

8
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
9

10
// Client protocol
11

12
void ClientProtocol::make_pbs_bind_message(int protocol_version, OutputBuffer& out, session_ident_type session_ident,
13
                                           const std::string& server_path, const std::string& signed_user_token,
14
                                           bool need_client_file_ident, bool is_subserver)
15
{
6,832✔
16
    static_cast<void>(protocol_version);
6,832✔
17
    out << "bind " << session_ident << " " << server_path.size() << " " << signed_user_token.size() << " "
6,832✔
18
        << int(need_client_file_ident) << " " << int(is_subserver) << "\n"; // Throws
6,832✔
19
    REALM_ASSERT(!out.fail());
6,832✔
20

3,800✔
21
    out.write(server_path.data(), server_path.size());             // Throws
6,832✔
22
    out.write(signed_user_token.data(), signed_user_token.size()); // Throws
6,832✔
23
}
6,832✔
24

25
void ClientProtocol::make_flx_bind_message(int protocol_version, OutputBuffer& out, session_ident_type session_ident,
26
                                           const nlohmann::json& json_data, const std::string& signed_user_token,
27
                                           bool need_client_file_ident, bool is_subserver)
28
{
1,360✔
29
    static_cast<void>(protocol_version);
1,360✔
30
    std::string json_data_stg;
1,360✔
31
    // Protocol version v8 and above accepts stringified json_data for the first data argument
682✔
32
    if (!json_data.empty()) {
1,360✔
33
        json_data_stg = json_data.dump();
1,356✔
34
    }
1,356✔
35

682✔
36
    out << "bind " << session_ident << " " << json_data_stg.size() << " " << signed_user_token.size() << " "
1,360✔
37
        << int(need_client_file_ident) << " " << int(is_subserver) << "\n"; // Throws
1,360✔
38
    REALM_ASSERT(!out.fail());
1,360✔
39

682✔
40
    out.write(json_data_stg.data(), json_data_stg.size());         // Throws
1,360✔
41
    out.write(signed_user_token.data(), signed_user_token.size()); // Throws
1,360✔
42
}
1,360✔
43

44
void ClientProtocol::make_pbs_ident_message(OutputBuffer& out, session_ident_type session_ident,
45
                                            SaltedFileIdent client_file_ident, const SyncProgress& progress)
46
{
5,630✔
47
    out << "ident " << session_ident << " " << client_file_ident.ident << " " << client_file_ident.salt << " "
5,630✔
48
        << progress.download.server_version << " " << progress.download.last_integrated_client_version << " "
5,630✔
49
        << progress.latest_server_version.version << " " << progress.latest_server_version.salt << "\n"; // Throws
5,630✔
50
    REALM_ASSERT(!out.fail());
5,630✔
51
}
5,630✔
52

53
void ClientProtocol::make_flx_ident_message(OutputBuffer& out, session_ident_type session_ident,
54
                                            SaltedFileIdent client_file_ident, const SyncProgress& progress,
55
                                            int64_t query_version, std::string_view query_body)
56
{
1,282✔
57
    out << "ident " << session_ident << " " << client_file_ident.ident << " " << client_file_ident.salt << " "
1,282✔
58
        << progress.download.server_version << " " << progress.download.last_integrated_client_version << " "
1,282✔
59
        << progress.latest_server_version.version << " " << progress.latest_server_version.salt << " "
1,282✔
60
        << query_version << " " << query_body.size() << "\n"
1,282✔
61
        << query_body; // Throws
1,282✔
62
    REALM_ASSERT(!out.fail());
1,282✔
63
}
1,282✔
64

65
void ClientProtocol::make_query_change_message(OutputBuffer& out, session_ident_type session, int64_t version,
66
                                               std::string_view query_body)
67
{
1,060✔
68
    out << "query " << session << " " << version << " " << query_body.size() << "\n" << query_body; // throws
1,060✔
69
    REALM_ASSERT(!out.fail());
1,060✔
70
}
1,060✔
71

72
void ClientProtocol::make_json_error_message(OutputBuffer& out, session_ident_type session, int error_code,
73
                                             std::string_view error_body)
74
{
44✔
75
    out << "json_error " << error_code << " " << error_body.size() << " " << session << "\n" << error_body; // throws
44✔
76
    REALM_ASSERT(!out.fail());
44✔
77
}
44✔
78

79
void ClientProtocol::make_test_command_message(OutputBuffer& out, session_ident_type session,
80
                                               request_ident_type request_ident, std::string_view body)
81
{
56✔
82
    out << "test_command " << session << " " << request_ident << " " << body.size() << "\n" << body;
56✔
83
    REALM_ASSERT(!out.fail());
56✔
84
}
56✔
85

86
ClientProtocol::UploadMessageBuilder::UploadMessageBuilder(
87
    OutputBuffer& body_buffer, std::vector<char>& compression_buffer,
88
    util::compression::CompressMemoryArena& compress_memory_arena)
89
    : m_body_buffer{body_buffer}
90
    , m_compression_buffer{compression_buffer}
91
    , m_compress_memory_arena{compress_memory_arena}
92
{
56,478✔
93
    m_body_buffer.reset();
56,478✔
94
}
56,478✔
95

96
void ClientProtocol::UploadMessageBuilder::add_changeset(version_type client_version, version_type server_version,
97
                                                         timestamp_type origin_timestamp,
98
                                                         file_ident_type origin_file_ident,
99
                                                         ChunkedBinaryData changeset)
100
{
41,934✔
101
    m_body_buffer << client_version << " " << server_version << " " << origin_timestamp << " " << origin_file_ident
41,934✔
102
                  << " " << changeset.size() << " "; // Throws
41,934✔
103
    changeset.write_to(m_body_buffer);               // Throws
41,934✔
104
    REALM_ASSERT(!m_body_buffer.fail());
41,934✔
105

20,360✔
106
    ++m_num_changesets;
41,934✔
107
}
41,934✔
108

109
void ClientProtocol::UploadMessageBuilder::make_upload_message(int protocol_version, OutputBuffer& out,
110
                                                               session_ident_type session_ident,
111
                                                               version_type progress_client_version,
112
                                                               version_type progress_server_version,
113
                                                               version_type locked_server_version)
114
{
56,476✔
115
    static_cast<void>(protocol_version);
56,476✔
116
    BinaryData body = {m_body_buffer.data(), std::size_t(m_body_buffer.size())};
56,476✔
117

28,166✔
118
    constexpr std::size_t g_max_uncompressed = 1024;
56,476✔
119

28,166✔
120
    bool is_body_compressed = false;
56,476✔
121
    if (body.size() > g_max_uncompressed) {
56,476✔
122
        util::compression::allocate_and_compress(m_compress_memory_arena, body,
4,596✔
123
                                                 m_compression_buffer); // Throws
4,596✔
124
        is_body_compressed = m_compression_buffer.size() < body.size();
4,596✔
125
    }
4,596✔
126

28,166✔
127
    // The compressed body is only sent if it is smaller than the uncompressed body.
28,166✔
128
    std::size_t compressed_body_size = is_body_compressed ? m_compression_buffer.size() : 0;
54,050✔
129

28,166✔
130
    // The header of the upload message.
28,166✔
131
    out << "upload " << session_ident << " " << int(is_body_compressed) << " " << body.size() << " "
56,476✔
132
        << compressed_body_size;
56,476✔
133
    out << " " << progress_client_version << " " << progress_server_version << " " << locked_server_version; // Throws
56,476✔
134
    out << "\n";                                                                                             // Throws
56,476✔
135

28,166✔
136
    if (is_body_compressed)
56,476✔
137
        out.write(m_compression_buffer.data(), compressed_body_size); // Throws
4,596✔
138
    else
51,880✔
139
        out.write(body.data(), body.size()); // Throws
51,880✔
140

28,166✔
141
    REALM_ASSERT(!out.fail());
56,476✔
142
}
56,476✔
143

144
ClientProtocol::UploadMessageBuilder ClientProtocol::make_upload_message_builder()
145
{
56,480✔
146
    return UploadMessageBuilder{m_output_buffer, m_buffer, m_compress_memory_arena};
56,480✔
147
}
56,480✔
148

149
void ClientProtocol::make_unbind_message(OutputBuffer& out, session_ident_type session_ident)
150
{
5,878✔
151
    out << "unbind " << session_ident << "\n"; // Throws
5,878✔
152
    REALM_ASSERT(!out.fail());
5,878✔
153
}
5,878✔
154

155
void ClientProtocol::make_mark_message(OutputBuffer& out, session_ident_type session_ident,
156
                                       request_ident_type request_ident)
157
{
16,918✔
158
    out << "mark " << session_ident << " " << request_ident << "\n"; // Throws
16,918✔
159
    REALM_ASSERT(!out.fail());
16,918✔
160
}
16,918✔
161

162

163
void ClientProtocol::make_ping(OutputBuffer& out, milliseconds_type timestamp, milliseconds_type rtt)
164
{
198✔
165
    out << "ping " << timestamp << " " << rtt << "\n"; // Throws
198✔
166
}
198✔
167

168

169
std::string ClientProtocol::compressed_hex_dump(BinaryData blob)
170
{
×
171
    std::vector<char> buf;
×
172
    util::compression::allocate_and_compress(m_compress_memory_arena, blob, buf); // Throws
×
173

174
    std::string encode_buffer;
×
175
    auto encoded_size = util::base64_encoded_size(buf.size());
×
176
    encode_buffer.resize(encoded_size);
×
NEW
177
    util::base64_encode(buf, encode_buffer);
×
178

179
    return encode_buffer;
×
180
}
×
181

182
// Server protocol
183

184
void ServerProtocol::make_ident_message(int protocol_version, OutputBuffer& out, session_ident_type session_ident,
185
                                        file_ident_type client_file_ident, salt_type client_file_ident_salt)
186
{
1,234✔
187
    static_cast<void>(protocol_version);
1,234✔
188
    out << "ident " << session_ident << " " << client_file_ident << " " << client_file_ident_salt << "\n"; // Throws
1,234✔
189
}
1,234✔
190

191
void ServerProtocol::make_alloc_message(OutputBuffer& out, session_ident_type session_ident,
192
                                        file_ident_type file_ident)
193
{
×
194
    out << "alloc " << session_ident << " " << file_ident << "\n"; // Throws
×
195
}
×
196

197

198
/// insert_single_changeset_download_message() inserts a single changeset and
199
/// the associated meta data into the output buffer.
200
///
201
/// It is the functions responsibility to make sure that the buffer has
202
/// capacity to hold the inserted data.
203
///
204
/// The message format for the single changeset is <server_version>
205
/// <client_version> <timestamp> <client_file_ident> <changeset size>
206
/// <changeset>
207
void ServerProtocol::insert_single_changeset_download_message(OutputBuffer& out, const ChangesetInfo& changeset_info,
208
                                                              util::Logger& logger)
209
{
41,314✔
210
    const sync::HistoryEntry& entry = changeset_info.entry;
41,314✔
211

21,400✔
212
    out << changeset_info.server_version << " " << changeset_info.client_version << " " << entry.origin_timestamp
41,314✔
213
        << " " << entry.origin_file_ident << " " << changeset_info.original_size << " " << entry.changeset.size()
41,314✔
214
        << " ";
41,314✔
215
    entry.changeset.write_to(out);
41,314✔
216

21,400✔
217
    if (logger.would_log(util::Logger::Level::trace)) {
41,314✔
218
        logger.trace("DOWNLOAD: insert single changeset (server_version=%1, "
×
219
                     "client_version=%2, timestamp=%3, client_file_ident=%4, "
×
220
                     "original_changeset_size=%5, changeset_size=%6, changeset='%7').",
×
221
                     changeset_info.server_version, changeset_info.client_version, entry.origin_timestamp,
×
222
                     entry.origin_file_ident, changeset_info.original_size, entry.changeset.size(),
×
223
                     _impl::clamped_hex_dump(entry.changeset.get_first_chunk())); // Throws
×
224
    }
×
225
}
41,314✔
226

227

228
void ServerProtocol::make_download_message(int protocol_version, OutputBuffer& out, session_ident_type session_ident,
229
                                           version_type download_server_version, version_type download_client_version,
230
                                           version_type latest_server_version, salt_type latest_server_version_salt,
231
                                           version_type upload_client_version, version_type upload_server_version,
232
                                           std::uint_fast64_t downloadable_bytes, std::size_t num_changesets,
233
                                           const char* body, std::size_t uncompressed_body_size,
234
                                           std::size_t compressed_body_size, bool body_is_compressed,
235
                                           util::Logger& logger)
236
{
39,840✔
237
    static_cast<void>(protocol_version);
39,840✔
238
    // The header of the download message.
21,600✔
239
    out << "download " << session_ident << " " << download_server_version << " " << download_client_version << " "
39,840✔
240
        << latest_server_version << " " << latest_server_version_salt << " " << upload_client_version << " "
39,840✔
241
        << upload_server_version << " " << downloadable_bytes << " " << int(body_is_compressed) << " "
39,840✔
242
        << uncompressed_body_size << " " << compressed_body_size << "\n"; // Throws
39,840✔
243

21,600✔
244
    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
37,710✔
245
    out.write(body, body_size);
39,840✔
246

21,600✔
247
    logger.detail("Sending: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
39,840✔
248
                  "latest_server_version=%3, latest_server_version_salt=%4, "
39,840✔
249
                  "upload_client_version=%5, upload_server_version=%6, "
39,840✔
250
                  "num_changesets=%7, is_body_compressed=%8, body_size=%9, "
39,840✔
251
                  "compressed_body_size=%10)",
39,840✔
252
                  download_server_version, download_client_version, latest_server_version, latest_server_version_salt,
39,840✔
253
                  upload_client_version, upload_server_version, num_changesets, body_is_compressed,
39,840✔
254
                  uncompressed_body_size, compressed_body_size); // Throws
39,840✔
255
}
39,840✔
256

257

258
void ServerProtocol::make_unbound_message(OutputBuffer& out, session_ident_type session_ident)
259
{
2,324✔
260
    out << "unbound " << session_ident << "\n"; // Throws
2,324✔
261
}
2,324✔
262

263

264
void ServerProtocol::make_mark_message(OutputBuffer& out, session_ident_type session_ident,
265
                                       request_ident_type request_ident)
266
{
12,074✔
267
    out << "mark " << session_ident << " " << request_ident << "\n"; // Throws
12,074✔
268
    REALM_ASSERT(!out.fail());
12,074✔
269
}
12,074✔
270

271

272
void ServerProtocol::make_error_message(int protocol_version, OutputBuffer& out, sync::ProtocolError error_code,
273
                                        const char* message, std::size_t message_size, bool try_again,
274
                                        session_ident_type session_ident)
275
{
98✔
276
    static_cast<void>(protocol_version);
98✔
277
    sync::ProtocolError error_code_2 = error_code;
98✔
278
    out << "error " << int(error_code_2) << " " << message_size << " " << int(try_again) << " " << session_ident
98✔
279
        << "\n";                      // Throws
98✔
280
    out.write(message, message_size); // Throws
98✔
281
}
98✔
282

283

284
void ServerProtocol::make_pong(OutputBuffer& out, milliseconds_type timestamp)
285
{
154✔
286
    out << "pong " << timestamp << "\n"; // Throws
154✔
287
}
154✔
288

289
void ServerProtocol::make_log_message(OutputBuffer& out, util::Logger::Level level, std::string message,
290
                                      session_ident_type sess_id, std::optional<std::string> co_id)
291
{
5,814✔
292
    nlohmann::json log_msg_json;
5,814✔
293
    log_msg_json["level"] = util::Logger::level_to_string(level);
5,814✔
294
    log_msg_json["msg"] = message;
5,814✔
295
    if (co_id) {
5,814✔
296
        log_msg_json["co_id"] = *co_id;
1,856✔
297
    }
1,856✔
298
    std::string json_data_stg = log_msg_json.dump();
5,814✔
299
    out << "log_message " << sess_id << " " << json_data_stg.length() << "\n" << json_data_stg;
5,814✔
300
}
5,814✔
301

302
std::string make_authorization_header(const std::string& signed_user_token)
303
{
×
304
    return "Bearer " + signed_user_token;
×
305
}
×
306

307

308
util::Optional<StringData> parse_authorization_header(const std::string& authorization_header)
309
{
×
310
    StringData prefix = "Bearer ";
×
311
    // Token contains at least three characters. Stricter checks are possible, but do
312
    // not belong here.
313
    if (authorization_header.size() < prefix.size() + 4)
×
314
        return util::none;
×
315

316
    if (authorization_header.compare(0, prefix.size(), prefix) != 0)
×
317
        return util::none;
×
318

319
    std::size_t token_size = authorization_header.size() - prefix.size();
×
320
    return StringData{authorization_header.data() + prefix.size(), token_size};
×
321
}
×
322

323
} // namespace realm::_impl
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc