• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2493

15 Jul 2024 06:38PM UTC coverage: 90.982% (+0.002%) from 90.98%
2493

push

Evergreen

web-flow
RCORE-2192 RCORE-2193 Fix FLX download progress reporting (#7870)

* Fix FLX download progress reporting

We need to store the download progress for each batch of a bootstrap and not
just at the end for it to be useful in any way.

The server will sometimes send us DOWNLOAD messages with a non-one estimate
followed by a one estimate where the byte-level information is the same (as the
final message is empty). When this happens we need to report the download
completion to the user, so add the estimate to the fields checked for changes.

A subscription change which doesn't actually change what set of objects is in
view can result in an empty DOWNLOAD message with no changes other than the
query version, and we should report that too.

* Fix a comment

* Pass the DownloadMessage to process_flx_bootstrap_message()

* Report steady-state download progress

102352 of 180586 branches covered (56.68%)

247 of 257 new or added lines in 10 files covered. (96.11%)

88 existing lines in 17 files now uncovered.

215381 of 236730 relevant lines covered (90.98%)

5726853.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.93
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/memory_stream.hpp>
29
#include <realm/util/optional.hpp>
30
#include <realm/util/platform_info.hpp>
31
#include <realm/util/random.hpp>
32
#include <realm/util/safe_int_ops.hpp>
33
#include <realm/util/scope_exit.hpp>
34
#include <realm/util/scratch_allocator.hpp>
35
#include <realm/util/thread.hpp>
36
#include <realm/util/thread_exec_guard.hpp>
37
#include <realm/util/value_reset_guard.hpp>
38
#include <realm/version.hpp>
39

40
#include <algorithm>
41
#include <atomic>
42
#include <cctype>
43
#include <chrono>
44
#include <cmath>
45
#include <condition_variable>
46
#include <cstdint>
47
#include <cstdio>
48
#include <cstring>
49
#include <functional>
50
#include <locale>
51
#include <map>
52
#include <memory>
53
#include <queue>
54
#include <sstream>
55
#include <stdexcept>
56
#include <thread>
57
#include <vector>
58

59
// NOTE: The protocol specification is in `/doc/protocol.md`
60

61

62
// FIXME: Verify that session identifier spoofing cannot be used to get access
63
// to sessions belonging to other network conections in any way.
64
// FIXME: Seems that server must close connection with zero sessions after a
65
// certain timeout.
66

67

68
using namespace realm;
69
using namespace realm::sync;
70
using namespace realm::util;
71

72
// clang-format off
73
using ServerHistory         = _impl::ServerHistory;
74
using ServerProtocol        = _impl::ServerProtocol;
75
using ServerFileAccessCache = _impl::ServerFileAccessCache;
76
using ServerImplBase = _impl::ServerImplBase;
77

78
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
79
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
80
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
81
using IntegrationResult = ServerHistory::IntegrationResult;
82
using BootstrapError = ServerHistory::BootstrapError;
83
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
84
using ClientType = ServerHistory::ClientType;
85
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
86
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
87

88
using UploadChangeset = ServerProtocol::UploadChangeset;
89
// clang-format on
90

91

92
using UploadChangesets = std::vector<UploadChangeset>;
93

94
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
95

96

97
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
98
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
99
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
100
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
101

102

103
namespace {
104

105
enum class SchedStatus { done = 0, pending, in_progress };
106

107
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
108
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
109
{
×
110
    return "com.mongodb.realm-sync/";
×
111
}
×
112

113
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
114
{
394✔
115
    if (str.size() > cutoff) {
394✔
116
        return "..." + str.substr(str.size() - cutoff);
×
117
    }
×
118
    else {
394✔
119
        return str;
394✔
120
    }
394✔
121
}
394✔
122

123

124
class HttpListHeaderValueParser {
125
public:
126
    HttpListHeaderValueParser(std::string_view string) noexcept
127
        : m_string{string}
944✔
128
    {
2,084✔
129
    }
2,084✔
130
    bool next(std::string_view& elem) noexcept
131
    {
27,092✔
132
        while (m_pos < m_string.size()) {
27,092✔
133
            size_type i = m_pos;
25,008✔
134
            size_type j = m_string.find(',', i);
25,008✔
135
            if (j != std::string_view::npos) {
25,008✔
136
                m_pos = j + 1;
22,924✔
137
            }
22,924✔
138
            else {
2,084✔
139
                j = m_string.size();
2,084✔
140
                m_pos = j;
2,084✔
141
            }
2,084✔
142

143
            // Exclude leading and trailing white space
144
            while (i < j && is_http_lws(m_string[i]))
47,932✔
145
                ++i;
22,924✔
146
            while (j > i && is_http_lws(m_string[j - 1]))
25,008✔
147
                --j;
×
148

149
            if (i != j) {
25,008✔
150
                elem = m_string.substr(i, j - i);
25,008✔
151
                return true;
25,008✔
152
            }
25,008✔
153
        }
25,008✔
154
        return false;
2,084✔
155
    }
27,092✔
156

157
private:
158
    using size_type = std::string_view::size_type;
159
    const std::string_view m_string;
160
    size_type m_pos = 0;
161
    static bool is_http_lws(char ch) noexcept
162
    {
72,940✔
163
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
72,940✔
164
    }
72,940✔
165
};
166

167

168
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
169
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
170
using SteadyTimePoint = SteadyClock::time_point;
171

172
SteadyTimePoint steady_clock_now() noexcept
173
{
153,428✔
174
    return SteadyClock::now();
153,428✔
175
}
153,428✔
176

177
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
178
{
38,082✔
179
    auto duration = end_time - start_time;
38,082✔
180
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
38,082✔
181
    return milliseconds_type(millis_duration);
38,082✔
182
}
38,082✔
183

184

185
bool determine_try_again(ProtocolError error_code) noexcept
186
{
96✔
187
    return (error_code == ProtocolError::connection_closed);
96✔
188
}
96✔
189

190

191
class ServerFile;
192
class ServerImpl;
193
class HTTPConnection;
194
class SyncConnection;
195
class Session;
196

197

198
using Formatter = util::ResettableExpandableBufferOutputStream;
199
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
200

201
using ProtocolVersionRange = std::pair<int, int>;
202

203
class MiscBuffers {
204
public:
205
    Formatter formatter;
206
    OutputBuffer download_message;
207

208
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
209
    ProtocolVersionRanges protocol_version_ranges;
210

211
    std::vector<char> compress;
212

213
    MiscBuffers()
214
    {
1,200✔
215
        formatter.imbue(std::locale::classic());
1,200✔
216
        download_message.imbue(std::locale::classic());
1,200✔
217
    }
1,200✔
218
};
219

220

221
struct DownloadCache {
222
    std::unique_ptr<char[]> body;
223
    std::size_t uncompressed_body_size;
224
    std::size_t compressed_body_size;
225
    bool body_is_compressed;
226
    version_type end_version;
227
    DownloadCursor download_progress;
228
    std::uint_fast64_t downloadable_bytes;
229
    std::size_t num_changesets;
230
    std::size_t accum_original_size;
231
    std::size_t accum_compacted_size;
232
};
233

234

235
// An unblocked work unit is comprised of one Work object for each of the files
236
// that contribute work to the work unit, generally one reference file and a
237
// number of partial files.
238
class Work {
239
public:
240
    // In general, primary work is all forms of modifying work, including file
241
    // deletion.
242
    bool has_primary_work = false;
243

244
    // Only for reference files
245
    bool might_produce_new_sync_version = false;
246

247
    bool produced_new_realm_version = false;
248
    bool produced_new_sync_version = false;
249
    bool expired_reference_version = false;
250

251
    // True if, and only if changesets_from_downstream contains at least one
252
    // changeset.
253
    bool have_changesets_from_downstream = false;
254

255
    FileIdentAllocSlots file_ident_alloc_slots;
256
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
257
    IntegratableChangesets changesets_from_downstream;
258

259
    VersionInfo version_info;
260

261
    // Result of integration of changesets from downstream clients
262
    IntegrationResult integration_result;
263

264
    void reset() noexcept
265
    {
37,976✔
266
        has_primary_work = false;
37,976✔
267

268
        might_produce_new_sync_version = false;
37,976✔
269

270
        produced_new_realm_version = false;
37,976✔
271
        produced_new_sync_version = false;
37,976✔
272
        expired_reference_version = false;
37,976✔
273
        have_changesets_from_downstream = false;
37,976✔
274

275
        file_ident_alloc_slots.clear();
37,976✔
276
        changeset_buffers.clear();
37,976✔
277
        changesets_from_downstream.clear();
37,976✔
278

279
        version_info = {};
37,976✔
280
        integration_result = {};
37,976✔
281
    }
37,976✔
282
};
283

284

285
class WorkerState {
286
public:
287
    FileIdentAllocSlots file_ident_alloc_slots;
288
    util::ScratchMemory scratch_memory;
289
    bool use_file_cache = true;
290
    std::unique_ptr<ServerHistory> reference_hist;
291
    DBRef reference_sg;
292
};
293

294

295
// ============================ SessionQueue ============================
296

297
class SessionQueue {
298
public:
299
    void push_back(Session*) noexcept;
300
    Session* pop_front() noexcept;
301
    void clear() noexcept;
302

303
private:
304
    Session* m_back = nullptr;
305
};
306

307

308
// ============================ FileIdentReceiver ============================
309

310
class FileIdentReceiver {
311
public:
312
    virtual void receive_file_ident(SaltedFileIdent) = 0;
313

314
protected:
315
    ~FileIdentReceiver() {}
5,438✔
316
};
317

318

319
// ============================ WorkerBox =============================
320

321
class WorkerBox {
322
public:
323
    using JobType = util::UniqueFunction<void(WorkerState&)>;
324
    void add_work(WorkerState& state, JobType job)
325
    {
×
326
        std::unique_lock<std::mutex> lock(m_mutex);
×
327
        if (m_jobs.size() >= m_queue_limit) {
×
328
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
329
            // than to queue it.
×
330
            run_a_job(lock, state, job);
×
331
        }
×
332
        else {
×
333
            // Create worker threads on demand (if all existing threads are active):
×
334
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
335
                m_threads.emplace_back([this]() {
×
336
                    WorkerState state;
×
337
                    state.use_file_cache = false;
×
338
                    JobType the_job;
×
339
                    std::unique_lock<std::mutex> lock(m_mutex);
×
340
                    for (;;) {
×
341
                        while (m_jobs.empty() && !m_finish_up)
×
342
                            m_changes.wait(lock);
×
343
                        if (m_finish_up)
×
344
                            break; // terminate thread
×
345
                        the_job = std::move(m_jobs.back());
×
346
                        m_jobs.pop_back();
×
347
                        run_a_job(lock, state, the_job);
×
348
                        m_changes.notify_all();
×
349
                    }
×
350
                });
×
351
            }
×
352

×
353
            // Submit the job for execution:
×
354
            m_jobs.emplace_back(std::move(job));
×
355
            m_changes.notify_all();
×
356
        }
×
357
    }
×
358

359
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
360
    // propagation of exceptions.
361
    void wait_completion(WorkerState& state)
362
    {
×
363
        std::unique_lock<std::mutex> lock(m_mutex);
×
364
        while (!m_jobs.empty() || m_active > 0) {
×
365
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
366
                JobType the_job = std::move(m_jobs.back());
×
367
                m_jobs.pop_back();
×
368
                run_a_job(lock, state, the_job);
×
369
            }
×
370
            else {
×
371
                m_changes.wait(lock);
×
372
            }
×
373
        }
×
374
        if (m_epr) {
×
375
            std::rethrow_exception(m_epr);
×
376
        }
×
377
    }
×
378

379
    WorkerBox(unsigned int num_threads)
380
    {
×
381
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
382
        m_max_num_threads = num_threads;
×
383
    }
×
384

385
    ~WorkerBox()
386
    {
×
387
        {
×
388
            std::unique_lock<std::mutex> lock(m_mutex);
×
389
            m_finish_up = true;
×
390
            m_changes.notify_all();
×
391
        }
×
392
        for (auto& e : m_threads)
×
393
            e.join();
×
394
    }
×
395

396
private:
397
    std::mutex m_mutex;
398
    std::condition_variable m_changes;
399
    std::vector<std::thread> m_threads;
400
    std::vector<JobType> m_jobs;
401
    unsigned int m_active = 0;
402
    bool m_finish_up = false;
403
    unsigned int m_queue_limit = 0;
404
    unsigned int m_max_num_threads = 0;
405
    std::exception_ptr m_epr;
406

407
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
408
    {
×
409
        ++m_active;
×
410
        lock.unlock();
×
411
        try {
×
412
            job(state);
×
413
            lock.lock();
×
414
        }
×
415
        catch (...) {
×
416
            lock.lock();
×
417
            if (!m_epr)
×
418
                m_epr = std::current_exception();
×
419
        }
×
420
        --m_active;
×
421
    }
×
422
};
423

424

425
// ============================ ServerFile ============================
426

427
class ServerFile : public util::RefCountBase {
428
public:
429
    util::PrefixLogger logger;
430

431
    // Logger to be used by the worker thread
432
    util::PrefixLogger wlogger;
433

434
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
435
               bool disable_sync_to_disk);
436
    ~ServerFile() noexcept;
437

438
    void initialize();
439
    void activate();
440

441
    ServerImpl& get_server() noexcept
442
    {
43,098✔
443
        return m_server;
43,098✔
444
    }
43,098✔
445

446
    const std::string& get_real_path() const noexcept
447
    {
×
448
        return m_file.realm_path;
×
449
    }
×
450

451
    const std::string& get_virt_path() const noexcept
452
    {
×
453
        return m_file.virt_path;
×
454
    }
×
455

456
    ServerFileAccessCache::File& access()
457
    {
50,934✔
458
        return m_file.access(); // Throws
50,934✔
459
    }
50,934✔
460

461
    ServerFileAccessCache::File& worker_access()
462
    {
37,948✔
463
        return m_worker_file.access(); // Throws
37,948✔
464
    }
37,948✔
465

466
    version_type get_realm_version() const noexcept
467
    {
×
468
        return m_version_info.realm_version;
×
469
    }
×
470

471
    version_type get_sync_version() const noexcept
472
    {
×
473
        return m_version_info.sync_version.version;
×
474
    }
×
475

476
    SaltedVersion get_salted_sync_version() const noexcept
477
    {
104,324✔
478
        return m_version_info.sync_version;
104,324✔
479
    }
104,324✔
480

481
    DownloadCache& get_download_cache() noexcept;
482

483
    void register_client_access(file_ident_type client_file_ident);
484

485
    using file_ident_request_type = std::int_fast64_t;
486

487
    // Initiate a request for a new client file identifier.
488
    //
489
    // Unless the request is cancelled, the identifier will be delivered to the
490
    // receiver by way of an invocation of
491
    // FileIdentReceiver::receive_file_ident().
492
    //
493
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
494
    // until after request_file_ident() has returned (no callback reentrance).
495
    //
496
    // New client file identifiers will be delivered to receivers in the order
497
    // that they were requested.
498
    //
499
    // The returned value is a nonzero integer that can be used to cancel the
500
    // request before the file identifier is delivered using
501
    // cancel_file_ident_request().
502
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
503

504
    // Cancel the specified file identifier request.
505
    //
506
    // It is an error to call this function after the identifier has been
507
    // delivered.
508
    void cancel_file_ident_request(file_ident_request_type) noexcept;
509

510
    void add_unidentified_session(Session*);
511
    void identify_session(Session*, file_ident_type client_file_ident);
512

513
    void remove_unidentified_session(Session*) noexcept;
514
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
515

516
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
517

518
    bool can_add_changesets_from_downstream() const noexcept;
519
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
520
                                        version_type locked_server_version, const UploadChangeset*,
521
                                        std::size_t num_changesets);
522

523
    // bootstrap_client_session calls the function of same name in server_history
524
    // but corrects the upload_progress with information from pending
525
    // integratable changesets. A situation can occur where a client terminates
526
    // a session and starts a new session and re-uploads changesets that are known
527
    // by the ServerFile object but not by the ServerHistory.
528
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
529
                                            SaltedVersion server_version, ClientType client_type,
530
                                            UploadCursor& upload_progress, version_type& locked_server_version,
531
                                            Logger&);
532

533
    // NOTE: This function is executed by the worker thread
534
    void worker_process_work_unit(WorkerState&);
535

536
    void recognize_external_change();
537

538
private:
539
    ServerImpl& m_server;
540
    ServerFileAccessCache::Slot m_file;
541

542
    // In general, `m_version_info` refers to the last snapshot of the Realm
543
    // file that is supposed to be visible to remote peers engaging in regular
544
    // Realm file synchronization.
545
    VersionInfo m_version_info;
546

547
    file_ident_request_type m_last_file_ident_request = 0;
548

549
    // The set of sessions whose client file identifier is not yet known, i.e.,
550
    // those for which an IDENT message has not yet been received,
551
    std::set<Session*> m_unidentified_sessions;
552

553
    // A map of the sessions whose client file identifier is known, i.e, those
554
    // for which an IDENT message has been received.
555
    std::map<file_ident_type, Session*> m_identified_sessions;
556

557
    // Used when a file used as partial view wants to allocate a client file
558
    // identifier from the reference Realm.
559
    file_ident_request_type m_file_ident_request = 0;
560

561
    struct FileIdentRequestInfo {
562
        FileIdentReceiver* receiver;
563
        file_ident_type proxy_file;
564
        ClientType client_type;
565
    };
566

567
    // When nonempty, it counts towards outstanding blocked work (see
568
    // `m_has_blocked_work`).
569
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
570

571
    // Changesets received from the downstream clients, and waiting to be
572
    // integrated, as well as information about the clients progress in terms of
573
    // integrating changesets received from the server. When nonempty, it counts
574
    // towards outstanding blocked work (see `m_has_blocked_work`).
575
    //
576
    // At any given time, the set of changesets from a particular client-side
577
    // file may be comprised of changesets received via distinct sessions.
578
    //
579
    // See also `m_num_changesets_from_downstream`.
580
    IntegratableChangesets m_changesets_from_downstream;
581

582
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
583
    //
584
    // Its purpose is also to initialize
585
    // `Work::have_changesets_from_downstream`.
586
    std::size_t m_num_changesets_from_downstream = 0;
587

588
    // The total size, in bytes, of the changesets that were received from
589
    // clients, are targeting this file, and are currently part of the blocked
590
    // work unit.
591
    //
592
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
593
    // purpose is to allow the server to keep track of the accumulated size of
594
    // changesets being processed, or waiting to be processed (metric
595
    // `upload.pending.bytes`) (see
596
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
597
    //
598
    // Its purpose is also to enable the "very poor man's" backpressure solution
599
    // (see can_add_changesets_from_downstream()).
600
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
601

602
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
603
    // currently unblocked work unit.
604
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
605

606
    // When nonempty, it counts towards outstanding blocked work (see
607
    // `m_has_blocked_work`).
608
    std::vector<std::string> m_permission_changes;
609

610
    // True iff this file, or any of its associated partial files (when
611
    // applicable), has a nonzero amount of outstanding work that is currently
612
    // held back from being passed to the worker thread because a previously
613
    // accumulated chunk of work related to this file is currently in progress.
614
    bool m_has_blocked_work = false;
615

616
    // A file, that is not a partial file, is considered *exposed to the worker
617
    // thread* from the point in time where it is submitted to the worker
618
    // (Worker::enqueue()) and up until the point in time where
619
    // group_postprocess_stage_1() starts to execute. A partial file is
620
    // considered *exposed to the worker thread* precisely when the associated
621
    // reference file is exposed to the worker thread, but only if it was in
622
    // `m_reference_file->m_work.partial_files` at the point in time where the
623
    // reference file was passed to the worker.
624
    //
625
    // While this file is exposed to the worker thread, all members of `m_work`
626
    // other than `changesets_from_downstream` may be accessed and modified by
627
    // the worker thread only.
628
    //
629
    // While this file is exposed to the worker thread,
630
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
631
    // must not be modified by any thread. This special status of
632
    // `m_work.changesets_from_downstream` is required to allow
633
    // ServerFile::bootstrap_client_session() to read from it at any time.
634
    Work m_work;
635

636
    // For reference files, set to true when work is unblocked, and reset back
637
    // to false when the work finalization process completes
638
    // (group_postprocess_stage_3()). Always zero for partial files.
639
    bool m_has_work_in_progress = 0;
640

641
    // This one must only be accessed by the worker thread.
642
    //
643
    // More specifically, `m_worker_file.access()` must only be called by the
644
    // worker thread, and if it was ever called, it must be closed by the worker
645
    // thread before the ServerFile object is destroyed, if destruction happens
646
    // before the destruction of the server object itself.
647
    ServerFileAccessCache::Slot m_worker_file;
648

649
    std::vector<std::int_fast64_t> m_deleting_connections;
650

651
    DownloadCache m_download_cache;
652

653
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
654
    void on_work_added();
655
    void group_unblock_work();
656
    void unblock_work();
657

658
    /// Resume history scanning in all sessions bound to this file. To be called
659
    /// after a successfull integration of a changeset.
660
    void resume_download() noexcept;
661

662
    // NOTE: These functions are executed by the worker thread
663
    void worker_allocate_file_identifiers();
664
    bool worker_integrate_changes_from_downstream(WorkerState&);
665
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
666
                                           DBRef& sg_ptr);
667
    ServerHistory& get_reference_file_history(WorkerState& state);
668
    void group_postprocess_stage_1();
669
    void group_postprocess_stage_2();
670
    void group_postprocess_stage_3();
671
    void group_finalize_work_stage_1();
672
    void group_finalize_work_stage_2();
673
    void finalize_work_stage_1();
674
    void finalize_work_stage_2();
675
};
676

677

678
inline DownloadCache& ServerFile::get_download_cache() noexcept
679
{
40,722✔
680
    return m_download_cache;
40,722✔
681
}
40,722✔
682

683
inline void ServerFile::group_finalize_work_stage_1()
684
{
37,568✔
685
    finalize_work_stage_1(); // Throws
37,568✔
686
}
37,568✔
687

688
inline void ServerFile::group_finalize_work_stage_2()
689
{
37,568✔
690
    finalize_work_stage_2(); // Throws
37,568✔
691
}
37,568✔
692

693

694
// ============================ Worker ============================
695

696
// All write transaction on server-side Realm files performed on behalf of the
697
// server, must be performed by the worker thread, not the network event loop
698
// thread. This is to ensure that the network event loop thread never gets
699
// blocked waiting for the worker thread to end a long running write
700
// transaction.
701
//
702
// FIXME: Currently, the event loop thread does perform a number of write
703
// transactions, but only on subtier nodes of a star topology server cluster.
704
class Worker : public ServerHistory::Context {
705
public:
706
    std::shared_ptr<util::Logger> logger_ptr;
707
    util::Logger& logger;
708

709
    explicit Worker(ServerImpl&);
710

711
    ServerFileAccessCache& get_file_access_cache() noexcept;
712

713
    void enqueue(ServerFile*);
714

715
    // Overriding members of ServerHistory::Context
716
    std::mt19937_64& server_history_get_random() noexcept override final;
717

718
private:
719
    ServerImpl& m_server;
720
    std::mt19937_64 m_random;
721
    ServerFileAccessCache m_file_access_cache;
722

723
    util::Mutex m_mutex;
724
    util::CondVar m_cond; // Protected by `m_mutex`
725

726
    bool m_stop = false; // Protected by `m_mutex`
727

728
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
729

730
    WorkerState m_state;
731

732
    void run();
733
    void stop() noexcept;
734

735
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
736
};
737

738

739
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
740
{
1,136✔
741
    return m_file_access_cache;
1,136✔
742
}
1,136✔
743

744

745
// ============================ ServerImpl ============================
746

747
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
748
public:
749
    std::uint_fast64_t errors_seen = 0;
750

751
    std::atomic<milliseconds_type> m_par_time;
752
    std::atomic<milliseconds_type> m_seq_time;
753

754
    util::Mutex last_client_accesses_mutex;
755

756
    const std::shared_ptr<util::Logger> logger_ptr;
757
    util::Logger& logger;
758

759
    network::Service& get_service() noexcept
760
    {
51,916✔
761
        return m_service;
51,916✔
762
    }
51,916✔
763

764
    const network::Service& get_service() const noexcept
765
    {
×
766
        return m_service;
×
767
    }
×
768

769
    std::mt19937_64& get_random() noexcept
770
    {
64,128✔
771
        return m_random;
64,128✔
772
    }
64,128✔
773

774
    const Server::Config& get_config() const noexcept
775
    {
111,120✔
776
        return m_config;
111,120✔
777
    }
111,120✔
778

779
    std::size_t get_max_upload_backlog() const noexcept
780
    {
46,618✔
781
        return m_max_upload_backlog;
46,618✔
782
    }
46,618✔
783

784
    const std::string& get_root_dir() const noexcept
785
    {
5,436✔
786
        return m_root_dir;
5,436✔
787
    }
5,436✔
788

789
    network::ssl::Context& get_ssl_context() noexcept
790
    {
48✔
791
        return *m_ssl_context;
48✔
792
    }
48✔
793

794
    const AccessControl& get_access_control() const noexcept
795
    {
×
796
        return m_access_control;
×
797
    }
×
798

799
    ProtocolVersionRange get_protocol_version_range() const noexcept
800
    {
2,084✔
801
        return m_protocol_version_range;
2,084✔
802
    }
2,084✔
803

804
    ServerProtocol& get_server_protocol() noexcept
805
    {
134,108✔
806
        return m_server_protocol;
134,108✔
807
    }
134,108✔
808

809
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
810
    {
4,256✔
811
        return m_compress_memory_arena;
4,256✔
812
    }
4,256✔
813

814
    MiscBuffers& get_misc_buffers() noexcept
815
    {
47,060✔
816
        return m_misc_buffers;
47,060✔
817
    }
47,060✔
818

819
    int_fast64_t get_current_server_session_ident() const noexcept
820
    {
×
821
        return m_current_server_session_ident;
×
822
    }
×
823

824
    util::ScratchMemory& get_scratch_memory() noexcept
825
    {
×
826
        return m_scratch_memory;
×
827
    }
×
828

829
    Worker& get_worker() noexcept
830
    {
40,252✔
831
        return m_worker;
40,252✔
832
    }
40,252✔
833

834
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
835
    {
×
836
        parallel_section = m_par_time;
×
837
        sequential_section = m_seq_time;
×
838
    }
×
839

840
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
841
    ~ServerImpl() noexcept;
842

843
    void start();
844

845
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
846
    {
684✔
847
        m_config.listen_address = listen_address;
684✔
848
        m_config.listen_port = listen_port;
684✔
849
        m_config.reuse_address = reuse_address;
684✔
850

851
        start(); // Throws
684✔
852
    }
684✔
853

854
    network::Endpoint listen_endpoint() const
855
    {
1,254✔
856
        return m_acceptor.local_endpoint();
1,254✔
857
    }
1,254✔
858

859
    void run();
860
    void stop() noexcept;
861

862
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
863

864
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
865
    void remove_sync_connection(int_fast64_t connection_id);
866

867
    size_t get_number_of_http_connections()
868
    {
×
869
        return m_http_connections.size();
×
870
    }
×
871

872
    size_t get_number_of_sync_connections()
873
    {
×
874
        return m_sync_connections.size();
×
875
    }
×
876

877
    bool is_sync_stopped()
878
    {
40,066✔
879
        return m_sync_stopped;
40,066✔
880
    }
40,066✔
881

882
    const std::set<std::string>& get_realm_names() const noexcept
883
    {
×
884
        return m_realm_names;
×
885
    }
×
886

887
    // virt_path must be valid when get_or_create_file() is called.
888
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
889
    {
5,412✔
890
        util::bind_ptr<ServerFile> file = get_file(virt_path);
5,412✔
891
        if (REALM_LIKELY(file))
5,412✔
892
            return file;
4,270✔
893

894
        _impl::VirtualPathComponents virt_path_components =
1,142✔
895
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,142✔
896
        REALM_ASSERT(virt_path_components.is_valid);
1,142✔
897

898
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,142✔
899
        m_realm_names.insert(virt_path);         // Throws
1,142✔
900
        {
1,142✔
901
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,142✔
902
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,142✔
903
                                      disable_sync_to_disk)); // Throws
1,142✔
904
        }
1,142✔
905

906
        file->initialize();
1,142✔
907
        m_files[virt_path] = file; // Throws
1,142✔
908
        file->activate();          // Throws
1,142✔
909
        return file;
1,142✔
910
    }
5,412✔
911

912
    std::unique_ptr<ServerHistory> make_history_for_path()
913
    {
×
914
        return std::make_unique<ServerHistory>(*this);
×
915
    }
×
916

917
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
918
    {
5,412✔
919
        auto i = m_files.find(virt_path);
5,412✔
920
        if (REALM_LIKELY(i != m_files.end()))
5,412✔
921
            return i->second;
4,274✔
922
        return {};
1,138✔
923
    }
5,412✔
924

925
    // Returns the number of seconds since the Epoch of
926
    // std::chrono::system_clock.
927
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
928
    {
×
929
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
930
            return m_config.token_expiration_clock->now();
×
931
        return std::chrono::system_clock::now();
×
932
    }
×
933

934
    void set_connection_reaper_timeout(milliseconds_type);
935

936
    void close_connections();
937
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
938

939
    void recognize_external_change(const std::string& virt_path);
940

941
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
942
                                                  milliseconds_type timeout);
943

944
    // Server global outputbuffers that can be reused.
945
    // The server is single threaded, so there are no
946
    // synchronization issues.
947
    // output_buffers_count is equal to the
948
    // maximum number of buffers needed at any point.
949
    static constexpr int output_buffers_count = 1;
950
    OutputBuffer output_buffers[output_buffers_count];
951

952
    bool is_load_balancing_allowed() const
953
    {
×
954
        return m_allow_load_balancing;
×
955
    }
×
956

957
    // inc_byte_size_for_pending_downstream_changesets() must be called by
958
    // ServerFile objects when changesets from downstream clients have been
959
    // received.
960
    //
961
    // dec_byte_size_for_pending_downstream_changesets() must be called by
962
    // ServerFile objects when changesets from downstream clients have been
963
    // processed or discarded.
964
    //
965
    // ServerImpl uses this information to keep a running tally (metric
966
    // `upload.pending.bytes`) of the total byte size of pending changesets from
967
    // downstream clients.
968
    //
969
    // These functions must be called on the network thread.
970
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
971
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
972

973
    // Overriding member functions in _impl::ServerHistory::Context
974
    std::mt19937_64& server_history_get_random() noexcept override final;
975

976
private:
977
    Server::Config m_config;
978
    network::Service m_service;
979
    std::mt19937_64 m_random;
980
    const std::size_t m_max_upload_backlog;
981
    const std::string m_root_dir;
982
    const AccessControl m_access_control;
983
    const ProtocolVersionRange m_protocol_version_range;
984

985
    // The reserved files will be closed in situations where the server
986
    // runs out of file descriptors.
987
    std::unique_ptr<File> m_reserved_files[5];
988

989
    // The set of all Realm files known to this server, represented by their
990
    // virtual path.
991
    //
992
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
993
    // reported by an invocation of _impl::get_realm_names()), then the
994
    // corresponding virtual path is in `m_realm_names`, assuming no external
995
    // file-system level intervention.
996
    std::set<std::string> m_realm_names;
997

998
    std::unique_ptr<network::ssl::Context> m_ssl_context;
999
    ServerFileAccessCache m_file_access_cache;
1000
    Worker m_worker;
1001
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1002
    network::Acceptor m_acceptor;
1003
    std::int_fast64_t m_next_conn_id = 0;
1004
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1005
    network::Endpoint m_next_http_conn_endpoint;
1006
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1007
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1008
    ServerProtocol m_server_protocol;
1009
    compression::CompressMemoryArena m_compress_memory_arena;
1010
    MiscBuffers m_misc_buffers;
1011
    int_fast64_t m_current_server_session_ident;
1012
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1013
    bool m_allow_load_balancing = false;
1014

1015
    util::Mutex m_mutex;
1016

1017
    bool m_stopped = false; // Protected by `m_mutex`
1018

1019
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1020
    // When m_sync_stopped is true, the server does not perform any sync.
1021
    bool m_sync_stopped = false;
1022

1023
    std::atomic<bool> m_running{false}; // Debugging facility
1024

1025
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1026

1027
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1028

1029
    util::ScratchMemory m_scratch_memory;
1030

1031
    void listen();
1032
    void initiate_accept();
1033
    void handle_accept(std::error_code);
1034

1035
    void reap_connections();
1036
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1037
    void do_close_connections();
1038

1039
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1040
    {
1,200✔
1041
        if (config.max_upload_backlog == 0)
1,200✔
1042
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
1,200✔
1043
        return config.max_upload_backlog;
×
1044
    }
1,200✔
1045

1046
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1047
    {
1,198✔
1048
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
1,198✔
1049
        const int actual_max = get_current_protocol_version();
1,198✔
1050
        static_assert(actual_min <= actual_max, "");
1,198✔
1051
        int min = actual_min;
1,198✔
1052
        int max = actual_max;
1,198✔
1053
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
1,198!
1054
            if (config.max_protocol_version < min)
×
1055
                throw Server::NoSupportedProtocolVersions();
×
1056
            max = config.max_protocol_version;
×
1057
        }
×
1058
        return {min, max};
1,198✔
1059
    }
1,198✔
1060

1061
    void do_recognize_external_change(const std::string& virt_path);
1062

1063
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1064
                                                     milliseconds_type timeout);
1065
};
1066

1067
// ============================ SyncConnection ============================
1068

1069
class SyncConnection : public websocket::Config {
1070
public:
1071
    const std::shared_ptr<util::Logger> logger_ptr;
1072
    util::Logger& logger;
1073

1074
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1075
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1076
    // Clients with sync protocol version less than 10 do not support log messages
1077
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1078

1079
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1080
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1081
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1082
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1083
        : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
944✔
1084
                                                          serv.logger_ptr)} // Throws
944✔
1085
        , logger{*logger_ptr}
944✔
1086
        , m_server{serv}
944✔
1087
        , m_id{id}
944✔
1088
        , m_socket{std::move(socket)}
944✔
1089
        , m_ssl_stream{std::move(ssl_stream)}
944✔
1090
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
944✔
1091
        , m_websocket{*this}
944✔
1092
        , m_client_protocol_version{client_protocol_version}
944✔
1093
        , m_client_user_agent{std::move(client_user_agent)}
944✔
1094
        , m_remote_endpoint{std::move(remote_endpoint)}
944✔
1095
        , m_appservices_request_id{std::move(appservices_request_id)}
944✔
1096
    {
2,084✔
1097
        // Make the output buffer stream throw std::bad_alloc if it fails to
1098
        // expand the buffer
1099
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
2,084✔
1100

1101
        network::Service& service = m_server.get_service();
2,084✔
1102
        auto handler = [this](Status status) {
96,000✔
1103
            if (!status.is_ok())
96,000✔
1104
                return;
×
1105
            if (!m_is_sending)
96,000✔
1106
                send_next_message(); // Throws
42,244✔
1107
        };
96,000✔
1108
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
2,084✔
1109
    }
2,084✔
1110

1111
    ~SyncConnection() noexcept;
1112

1113
    ServerImpl& get_server() noexcept
1114
    {
114,020✔
1115
        return m_server;
114,020✔
1116
    }
114,020✔
1117

1118
    ServerProtocol& get_server_protocol() noexcept
1119
    {
134,108✔
1120
        return m_server.get_server_protocol();
134,108✔
1121
    }
134,108✔
1122

1123
    int get_client_protocol_version()
1124
    {
104,802✔
1125
        return m_client_protocol_version;
104,802✔
1126
    }
104,802✔
1127

1128
    const std::string& get_client_user_agent() const noexcept
1129
    {
5,408✔
1130
        return m_client_user_agent;
5,408✔
1131
    }
5,408✔
1132

1133
    const std::string& get_remote_endpoint() const noexcept
1134
    {
5,410✔
1135
        return m_remote_endpoint;
5,410✔
1136
    }
5,410✔
1137

1138
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1139
    {
2,084✔
1140
        return logger_ptr;
2,084✔
1141
    }
2,084✔
1142

1143
    std::mt19937_64& websocket_get_random() noexcept final override
1144
    {
62,990✔
1145
        return m_server.get_random();
62,990✔
1146
    }
62,990✔
1147

1148
    bool websocket_binary_message_received(const char* data, size_t size) final override
1149
    {
71,676✔
1150
        using sf = _impl::SimulatedFailure;
71,676✔
1151
        if (sf::check_trigger(sf::sync_server__read_head)) {
71,676✔
1152
            // Suicide
1153
            read_error(sf::sync_server__read_head);
562✔
1154
            return false;
562✔
1155
        }
562✔
1156
        // After a connection level error has occurred, all incoming messages
1157
        // will be ignored. By continuing to read until end of input, the server
1158
        // is able to know when the client closes the connection, which in
1159
        // general means that is has received the ERROR message.
1160
        if (REALM_LIKELY(!m_is_closing)) {
71,114✔
1161
            m_last_activity_at = steady_clock_now();
71,100✔
1162
            handle_message_received(data, size);
71,100✔
1163
        }
71,100✔
1164
        return true;
71,114✔
1165
    }
71,676✔
1166

1167
    bool websocket_ping_message_received(const char* data, size_t size) final override
1168
    {
×
1169
        if (REALM_LIKELY(!m_is_closing)) {
×
1170
            m_last_activity_at = steady_clock_now();
×
1171
            handle_ping_received(data, size);
×
1172
        }
×
1173
        return true;
×
1174
    }
×
1175

1176
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1177
    {
62,996✔
1178
        if (m_ssl_stream) {
62,996✔
1179
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1180
        }
70✔
1181
        else {
62,926✔
1182
            m_socket->async_write(data, size, std::move(handler)); // Throws
62,926✔
1183
        }
62,926✔
1184
    }
62,996✔
1185

1186
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1187
    {
216,556✔
1188
        if (m_ssl_stream) {
216,556✔
1189
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
176✔
1190
        }
176✔
1191
        else {
216,380✔
1192
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
216,380✔
1193
        }
216,380✔
1194
    }
216,556✔
1195

1196
    void async_read_until(char* buffer, size_t size, char delim,
1197
                          websocket::ReadCompletionHandler handler) final override
1198
    {
×
1199
        if (m_ssl_stream) {
×
1200
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1201
                                           std::move(handler)); // Throws
×
1202
        }
×
1203
        else {
×
1204
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1205
                                       std::move(handler)); // Throws
×
1206
        }
×
1207
    }
×
1208

1209
    void websocket_read_error_handler(std::error_code ec) final override
1210
    {
678✔
1211
        read_error(ec);
678✔
1212
    }
678✔
1213

1214
    void websocket_write_error_handler(std::error_code ec) final override
1215
    {
×
1216
        write_error(ec);
×
1217
    }
×
1218

1219
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view) final override
1220
    {
×
1221
        // WebSocket class has already logged a message for this error
1222
        close_due_to_error(ec); // Throws
×
1223
    }
×
1224

1225
    void websocket_protocol_error_handler(std::error_code ec) final override
1226
    {
×
1227
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1228
        close_due_to_error(ec);                                              // Throws
×
1229
    }
×
1230

1231
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1232
    {
×
1233
        // This is not called since we handle HTTP request in handle_request_for_sync()
1234
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
1235
    }
×
1236

1237
    int_fast64_t get_id() const noexcept
1238
    {
×
1239
        return m_id;
×
1240
    }
×
1241

1242
    network::Socket& get_socket() noexcept
1243
    {
×
1244
        return *m_socket;
×
1245
    }
×
1246

1247
    void initiate();
1248

1249
    // Commits suicide
1250
    template <class... Params>
1251
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1252

1253
    // Commits suicide
1254
    void terminate_if_dead(SteadyTimePoint now);
1255

1256
    void enlist_to_send(Session*) noexcept;
1257

1258
    // Sessions should get the output_buffer and insert a message, after which
1259
    // they call initiate_write_output_buffer().
1260
    OutputBuffer& get_output_buffer()
1261
    {
63,004✔
1262
        m_output_buffer.reset();
63,004✔
1263
        return m_output_buffer;
63,004✔
1264
    }
63,004✔
1265

1266
    // More advanced memory strategies can be implemented if needed.
1267
    void release_output_buffer() {}
62,808✔
1268

1269
    // When this function is called, the connection will initiate a write with
1270
    // its output_buffer. Sessions use this method.
1271
    void initiate_write_output_buffer();
1272

1273
    void initiate_pong_output_buffer();
1274

1275
    void handle_protocol_error(Status status);
1276

1277
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1278
                              bool need_client_file_ident, bool is_subserver);
1279

1280
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1281
                               salt_type client_file_ident_salt, version_type scan_server_version,
1282
                               version_type scan_client_version, version_type latest_server_version,
1283
                               salt_type latest_server_version_salt);
1284

1285
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1286
                                version_type progress_server_version, version_type locked_server_version,
1287
                                const UploadChangesets&);
1288

1289
    void receive_mark_message(session_ident_type, request_ident_type);
1290

1291
    void receive_unbind_message(session_ident_type);
1292

1293
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1294

1295
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1296

1297
    void protocol_error(ProtocolError, Session* = nullptr);
1298

1299
    void initiate_soft_close();
1300

1301
    void discard_session(session_ident_type) noexcept;
1302

1303
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1304
                          std::optional<std::string> co_id = std::nullopt);
1305

1306
private:
1307
    ServerImpl& m_server;
1308
    const int_fast64_t m_id;
1309
    std::unique_ptr<network::Socket> m_socket;
1310
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1311
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1312

1313
    websocket::Socket m_websocket;
1314
    std::unique_ptr<char[]> m_input_body_buffer;
1315
    OutputBuffer m_output_buffer;
1316
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1317

1318
    // The protocol version in use by the connected client.
1319
    const int m_client_protocol_version;
1320

1321
    // The user agent description passed by the client.
1322
    const std::string m_client_user_agent;
1323

1324
    const std::string m_remote_endpoint;
1325

1326
    const std::string m_appservices_request_id;
1327

1328
    // A queue of sessions that have enlisted for an opportunity to send a
1329
    // message. Sessions will be served in the order that they enlist. A session
1330
    // can only occur once in this queue (linked list). If the queue is not
1331
    // empty, and no message is currently being written to the socket, the first
1332
    // session is taken out of the queue, and then granted an opportunity to
1333
    // send a message.
1334
    //
1335
    // Sessions will never be destroyed while in this queue. This is ensured
1336
    // because the connection owns the sessions that are associated with it, and
1337
    // the connection only removes a session from m_sessions at points in time
1338
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1339
    // (Connection::send_next_message() and Connection::~Connection()).
1340
    SessionQueue m_sessions_enlisted_to_send;
1341

1342
    Session* m_receiving_session = nullptr;
1343

1344
    bool m_is_sending = false;
1345
    bool m_is_closing = false;
1346

1347
    bool m_send_pong = false;
1348
    bool m_sending_pong = false;
1349

1350
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1351

1352
    milliseconds_type m_last_ping_timestamp = 0;
1353

1354
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1355
    // set to true (initiation of soft close). Otherwise, if no messages have
1356
    // been received from the client, this is the time at which the connection
1357
    // object was initiated (completion of WebSocket handshake). Otherwise this
1358
    // is the time at which the last message was received from the client.
1359
    SteadyTimePoint m_last_activity_at;
1360

1361
    // These are initialized by do_initiate_soft_close().
1362
    //
1363
    // With recent versions of the protocol (when the version is greater than,
1364
    // or equal to 23), `m_error_session_ident` is always zero.
1365
    ProtocolError m_error_code = {};
1366
    session_ident_type m_error_session_ident = 0;
1367

1368
    struct LogMessage {
1369
        session_ident_type sess_ident;
1370
        util::Logger::Level level;
1371
        std::string message;
1372
        std::optional<std::string> co_id;
1373
    };
1374

1375
    std::mutex m_log_mutex;
1376
    std::queue<LogMessage> m_log_messages;
1377

1378
    static std::string make_logger_prefix(int_fast64_t id)
1379
    {
2,084✔
1380
        std::ostringstream out;
2,084✔
1381
        out.imbue(std::locale::classic());
2,084✔
1382
        out << "Sync Connection[" << id << "]: "; // Throws
2,084✔
1383
        return out.str();                         // Throws
2,084✔
1384
    }
2,084✔
1385

1386
    // The return value of handle_message_received() designates whether
1387
    // message processing should continue. If the connection object is
1388
    // destroyed during execution of handle_message_received(), the return
1389
    // value must be false.
1390
    void handle_message_received(const char* data, size_t size);
1391

1392
    void handle_ping_received(const char* data, size_t size);
1393

1394
    void send_next_message();
1395
    void send_pong(milliseconds_type timestamp);
1396
    void send_log_message(const LogMessage& log_msg);
1397

1398
    void handle_write_output_buffer();
1399
    void handle_pong_output_buffer();
1400

1401
    void initiate_write_error(ProtocolError, session_ident_type);
1402
    void handle_write_error(std::error_code ec);
1403

1404
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1405
    void read_error(std::error_code);
1406
    void write_error(std::error_code);
1407

1408
    void close_due_to_close_by_client(std::error_code);
1409
    void close_due_to_error(std::error_code);
1410

1411
    void terminate_sessions();
1412

1413
    void bad_session_ident(const char* message_type, session_ident_type);
1414
    void message_after_unbind(const char* message_type, session_ident_type);
1415
    void message_before_ident(const char* message_type, session_ident_type);
1416
};
1417

1418

1419
inline void SyncConnection::read_error(std::error_code ec)
1420
{
1,240✔
1421
    REALM_ASSERT(ec != util::error::operation_aborted);
1,240✔
1422
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,240✔
1423
        // Suicide
1424
        close_due_to_close_by_client(ec); // Throws
678✔
1425
        return;
678✔
1426
    }
678✔
1427
    if (ec == util::MiscExtErrors::delim_not_found) {
562✔
1428
        logger.error("Input message head delimited not found"); // Throws
×
1429
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1430
        return;
×
1431
    }
×
1432

1433
    logger.error("Reading failed: %1", ec.message()); // Throws
562✔
1434

1435
    // Suicide
1436
    close_due_to_error(ec); // Throws
562✔
1437
}
562✔
1438

1439
inline void SyncConnection::write_error(std::error_code ec)
1440
{
×
1441
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1442
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1443
        // Suicide
1444
        close_due_to_close_by_client(ec); // Throws
×
1445
        return;
×
1446
    }
×
1447
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1448

1449
    // Suicide
1450
    close_due_to_error(ec); // Throws
×
1451
}
×
1452

1453

1454
// ============================ HTTPConnection ============================
1455

1456
std::string g_user_agent = "User-Agent";
1457

1458
class HTTPConnection {
1459
public:
1460
    const std::shared_ptr<Logger> logger_ptr;
1461
    util::Logger& logger;
1462

1463
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1464
        : logger_ptr{std::make_shared<PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1,516✔
1465
                                                    serv.logger_ptr)} // Throws
1,516✔
1466
        , logger{*logger_ptr}
1,516✔
1467
        , m_server{serv}
1,516✔
1468
        , m_id{id}
1,516✔
1469
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1,516✔
1470
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1,516✔
1471
        , m_http_server{*this, logger_ptr}
1,516✔
1472
    {
3,328✔
1473
        // Make the output buffer stream throw std::bad_alloc if it fails to
1474
        // expand the buffer
1475
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
3,328✔
1476

1477
        if (is_ssl) {
3,328✔
1478
            using namespace network::ssl;
48✔
1479
            Context& ssl_context = serv.get_ssl_context();
48✔
1480
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1481
                                                    Stream::server); // Throws
48✔
1482
        }
48✔
1483
    }
3,328✔
1484

1485
    ServerImpl& get_server() noexcept
1486
    {
×
1487
        return m_server;
×
1488
    }
×
1489

1490
    int_fast64_t get_id() const noexcept
1491
    {
2,128✔
1492
        return m_id;
2,128✔
1493
    }
2,128✔
1494

1495
    network::Socket& get_socket() noexcept
1496
    {
5,108✔
1497
        return *m_socket;
5,108✔
1498
    }
5,108✔
1499

1500
    template <class H>
1501
    void async_write(const char* data, size_t size, H handler)
1502
    {
2,104✔
1503
        if (m_ssl_stream) {
2,104✔
1504
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1505
        }
14✔
1506
        else {
2,090✔
1507
            m_socket->async_write(data, size, std::move(handler)); // Throws
2,090✔
1508
        }
2,090✔
1509
    }
2,104✔
1510

1511
    template <class H>
1512
    void async_read(char* buffer, size_t size, H handler)
1513
    {
8✔
1514
        if (m_ssl_stream) {
8✔
1515
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1516
                                     std::move(handler)); // Throws
×
1517
        }
×
1518
        else {
8✔
1519
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1520
                                 std::move(handler)); // Throws
8✔
1521
        }
8✔
1522
    }
8✔
1523

1524
    template <class H>
1525
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1526
    {
18,830✔
1527
        if (m_ssl_stream) {
18,830✔
1528
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1529
                                           std::move(handler)); // Throws
126✔
1530
        }
126✔
1531
        else {
18,704✔
1532
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
18,704✔
1533
                                       std::move(handler)); // Throws
18,704✔
1534
        }
18,704✔
1535
    }
18,830✔
1536

1537
    void initiate(std::string remote_endpoint)
1538
    {
2,128✔
1539
        m_last_activity_at = steady_clock_now();
2,128✔
1540
        m_remote_endpoint = std::move(remote_endpoint);
2,128✔
1541

1542
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
2,128✔
1543

1544
        if (m_ssl_stream) {
2,128✔
1545
            initiate_ssl_handshake(); // Throws
24✔
1546
        }
24✔
1547
        else {
2,104✔
1548
            initiate_http(); // Throws
2,104✔
1549
        }
2,104✔
1550
    }
2,128✔
1551

1552
    void respond_200_ok()
1553
    {
×
1554
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1555
    }
×
1556

1557
    void respond_404_not_found()
1558
    {
×
1559
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1560
    }
×
1561

1562
    void respond_503_service_unavailable()
1563
    {
×
1564
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1565
    }
×
1566

1567
    // Commits suicide
1568
    template <class... Params>
1569
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1570
    {
44✔
1571
        logger.log(log_level, log_message, log_params...); // Throws
44✔
1572
        m_ssl_stream.reset();
44✔
1573
        m_socket.reset();
44✔
1574
        m_server.remove_http_connection(m_id); // Suicide
44✔
1575
    }
44✔
1576

1577
    // Commits suicide
1578
    void terminate_if_dead(SteadyTimePoint now)
1579
    {
2✔
1580
        milliseconds_type time = steady_duration(m_last_activity_at, now);
2✔
1581
        const Server::Config& config = m_server.get_config();
2✔
1582
        if (m_is_sending) {
2✔
1583
            if (time >= config.http_response_timeout) {
×
1584
                // Suicide
1585
                terminate(Logger::Level::detail,
×
1586
                          "HTTP connection closed (request timeout)"); // Throws
×
1587
            }
×
1588
        }
×
1589
        else {
2✔
1590
            if (time >= config.http_request_timeout) {
2✔
1591
                // Suicide
1592
                terminate(Logger::Level::detail,
×
1593
                          "HTTP connection closed (response timeout)"); // Throws
×
1594
            }
×
1595
        }
2✔
1596
    }
2✔
1597

1598
    std::string get_appservices_request_id() const
1599
    {
2,104✔
1600
        return m_appservices_request_id.to_string();
2,104✔
1601
    }
2,104✔
1602

1603
private:
1604
    ServerImpl& m_server;
1605
    const int_fast64_t m_id;
1606
    const ObjectId m_appservices_request_id = ObjectId::gen();
1607
    std::unique_ptr<network::Socket> m_socket;
1608
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1609
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1610
    HTTPServer<HTTPConnection> m_http_server;
1611
    OutputBuffer m_output_buffer;
1612
    bool m_is_sending = false;
1613
    SteadyTimePoint m_last_activity_at;
1614
    std::string m_remote_endpoint;
1615
    int m_negotiated_protocol_version = 0;
1616

1617
    void initiate_ssl_handshake()
1618
    {
24✔
1619
        auto handler = [this](std::error_code ec) {
24✔
1620
            if (ec != util::error::operation_aborted)
24✔
1621
                handle_ssl_handshake(ec); // Throws
24✔
1622
        };
24✔
1623
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1624
    }
24✔
1625

1626
    void handle_ssl_handshake(std::error_code ec)
1627
    {
24✔
1628
        if (ec) {
24✔
1629
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
10✔
1630
            close_due_to_error(ec);                                         // Throws
10✔
1631
            return;
10✔
1632
        }
10✔
1633
        initiate_http(); // Throws
14✔
1634
    }
14✔
1635

1636
    void initiate_http()
1637
    {
2,118✔
1638
        logger.debug("Connection initiates HTTP receipt");
2,118✔
1639

1640
        auto handler = [this](HTTPRequest request, std::error_code ec) {
2,118✔
1641
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
2,118✔
1642
                return;
×
1643
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
2,118✔
1644
                logger.error("Malformed HTTP request");
×
1645
                close_due_to_error(ec); // Throws
×
1646
                return;
×
1647
            }
×
1648
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
2,118✔
1649
                logger.error("Bad HTTP request");
8✔
1650
                const char* body = "The HTTP request was corrupted";
8✔
1651
                handle_400_bad_request(body); // Throws
8✔
1652
                return;
8✔
1653
            }
8✔
1654
            if (REALM_UNLIKELY(ec)) {
2,110✔
1655
                read_error(ec); // Throws
14✔
1656
                return;
14✔
1657
            }
14✔
1658
            handle_http_request(std::move(request)); // Throws
2,096✔
1659
        };
2,096✔
1660
        m_http_server.async_receive_request(std::move(handler)); // Throws
2,118✔
1661
    }
2,118✔
1662

1663
    void handle_http_request(const HTTPRequest& request)
1664
    {
2,096✔
1665
        StringData path = request.path;
2,096✔
1666

1667
        logger.debug("HTTP request received, request = %1", request);
2,096✔
1668

1669
        m_is_sending = true;
2,096✔
1670
        m_last_activity_at = steady_clock_now();
2,096✔
1671

1672
        // FIXME: When thinking of this function as a switching device, it seem
1673
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
1674
        // supposed to be mandatory, then that check ought to be delegated to
1675
        // handle_request_for_sync(), as that will yield a sharper separation of
1676
        // concerns.
1677
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
2,096✔
1678
            handle_request_for_sync(request); // Throws
2,084✔
1679
        }
2,084✔
1680
        else {
12✔
1681
            handle_404_not_found(request); // Throws
12✔
1682
        }
12✔
1683
    }
2,096✔
1684

1685
    void handle_request_for_sync(const HTTPRequest& request)
1686
    {
2,084✔
1687
        if (m_server.is_sync_stopped()) {
2,084✔
1688
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1689
                         "stopped"); // Throws
×
1690
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1691
                                                    "connections"); // Throws
×
1692
            return;
×
1693
        }
×
1694

1695
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
2,084✔
1696

1697
        // Figure out whether there are any protocol versions supported by both
1698
        // the client and the server, and if so, choose the newest one of them.
1699
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
2,084✔
1700
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
2,084✔
1701
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
2,084✔
1702
        {
2,084✔
1703
            protocol_version_ranges.clear();
2,084✔
1704
            util::MemoryInputStream in;
2,084✔
1705
            in.imbue(std::locale::classic());
2,084✔
1706
            in.unsetf(std::ios_base::skipws);
2,084✔
1707
            std::string_view value;
2,084✔
1708
            if (sec_websocket_protocol)
2,084✔
1709
                value = *sec_websocket_protocol;
2,084✔
1710
            HttpListHeaderValueParser parser{value};
2,084✔
1711
            std::string_view elem;
2,084✔
1712
            while (parser.next(elem)) {
27,092✔
1713
                // FIXME: Use std::string_view::begins_with() in C++20.
1714
                const StringData protocol{elem};
25,008✔
1715
                std::string_view prefix;
25,008✔
1716
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
25,008✔
1717
                    prefix = get_pbs_websocket_protocol_prefix();
25,008✔
1718
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
×
1719
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1720
                if (!prefix.empty()) {
25,008✔
1721
                    auto parse_version = [&](std::string_view str) {
25,008✔
1722
                        in.set_buffer(str.data(), str.data() + str.size());
25,008✔
1723
                        int version = 0;
25,008✔
1724
                        in >> version;
25,008✔
1725
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
25,008✔
1726
                            return version;
25,008✔
UNCOV
1727
                        return -1;
×
1728
                    };
25,008✔
1729
                    int min, max;
25,008✔
1730
                    std::string_view range = elem.substr(prefix.size());
25,008✔
1731
                    auto i = range.find('-');
25,008✔
1732
                    if (i != std::string_view::npos) {
25,008✔
1733
                        min = parse_version(range.substr(0, i));
×
1734
                        max = parse_version(range.substr(i + 1));
×
1735
                    }
×
1736
                    else {
25,008✔
1737
                        min = parse_version(range);
25,008✔
1738
                        max = min;
25,008✔
1739
                    }
25,008✔
1740
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
25,008✔
1741
                        protocol_version_ranges.emplace_back(min, max); // Throws
25,008✔
1742
                        continue;
25,008✔
1743
                    }
25,008✔
UNCOV
1744
                    logger.error("Protocol version negotiation failed: Client sent malformed "
×
UNCOV
1745
                                 "specification of supported protocol versions: '%1'",
×
UNCOV
1746
                                 elem); // Throws
×
UNCOV
1747
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
×
UNCOV
1748
                                           "specification of supported protocol "
×
UNCOV
1749
                                           "versions\n"); // Throws
×
UNCOV
1750
                    return;
×
1751
                }
25,008✔
1752
                logger.warn("Unrecognized protocol token in HTTP response header "
×
1753
                            "Sec-WebSocket-Protocol: '%1'",
×
1754
                            elem); // Throws
×
1755
            }
×
1756
            if (protocol_version_ranges.empty()) {
2,084✔
1757
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1758
                             "specification of supported protocol versions"); // Throws
×
1759
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1760
                                       "of supported protocol versions\n"); // Throws
×
1761
                return;
×
1762
            }
×
1763
        }
2,084✔
1764
        {
2,084✔
1765
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
2,084✔
1766
            int server_min = server_range.first;
2,084✔
1767
            int server_max = server_range.second;
2,084✔
1768
            int best_match = 0;
2,084✔
1769
            int overall_client_min = std::numeric_limits<int>::max();
2,084✔
1770
            int overall_client_max = std::numeric_limits<int>::min();
2,084✔
1771
            for (const auto& range : protocol_version_ranges) {
25,008✔
1772
                int client_min = range.first;
25,008✔
1773
                int client_max = range.second;
25,008✔
1774
                if (client_max >= server_min && client_min <= server_max) {
25,008✔
1775
                    // Overlap
1776
                    int version = std::min(client_max, server_max);
25,008✔
1777
                    if (version > best_match) {
25,008✔
1778
                        best_match = version;
2,084✔
1779
                    }
2,084✔
1780
                }
25,008✔
1781
                if (client_min < overall_client_min)
25,008✔
1782
                    overall_client_min = client_min;
25,008✔
1783
                if (client_max > overall_client_max)
25,008✔
1784
                    overall_client_max = client_max;
2,084✔
1785
            }
25,008✔
1786
            Formatter& formatter = misc_buffers.formatter;
2,084✔
1787
            if (REALM_UNLIKELY(best_match == 0)) {
2,084✔
1788
                const char* elaboration = "No version supported by both client and server";
×
1789
                const char* identifier_hint = nullptr;
×
1790
                if (overall_client_max < server_min) {
×
1791
                    // Client is too old
1792
                    elaboration = "Client is too old for server";
×
1793
                    identifier_hint = "CLIENT_TOO_OLD";
×
1794
                }
×
1795
                else if (overall_client_min > server_max) {
×
1796
                    // Client is too new
1797
                    elaboration = "Client is too new for server";
×
1798
                    identifier_hint = "CLIENT_TOO_NEW";
×
1799
                }
×
1800
                auto format_ranges = [&](const auto& list) {
×
1801
                    bool nonfirst = false;
×
1802
                    for (auto range : list) {
×
1803
                        if (nonfirst)
×
1804
                            formatter << ", "; // Throws
×
1805
                        int min = range.first, max = range.second;
×
1806
                        REALM_ASSERT(min <= max);
×
1807
                        formatter << min;
×
1808
                        if (max != min)
×
1809
                            formatter << "-" << max;
×
1810
                        nonfirst = true;
×
1811
                    }
×
1812
                };
×
1813
                using Range = ProtocolVersionRange;
×
1814
                formatter.reset();
×
1815
                format_ranges(protocol_version_ranges); // Throws
×
1816
                logger.error("Protocol version negotiation failed: %1 "
×
1817
                             "(client supports: %2)",
×
1818
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1819
                formatter.reset();
×
1820
                formatter << "Protocol version negotiation failed: "
×
1821
                             ""
×
1822
                          << elaboration << ".\n\n";                                   // Throws
×
1823
                formatter << "Server supports: ";                                      // Throws
×
1824
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1825
                formatter << "\n";                                                     // Throws
×
1826
                formatter << "Client supports: ";                                      // Throws
×
1827
                format_ranges(protocol_version_ranges);                                // Throws
×
1828
                formatter << "\n\n";                                                   // Throws
×
1829
                formatter << "REALM_SYNC_PROTOCOL_MISMATCH";                           // Throws
×
1830
                if (identifier_hint)
×
1831
                    formatter << ":" << identifier_hint;                      // Throws
×
1832
                formatter << "\n";                                            // Throws
×
1833
                handle_400_bad_request({formatter.data(), formatter.size()}); // Throws
×
1834
                return;
×
1835
            }
×
1836
            m_negotiated_protocol_version = best_match;
2,084✔
1837
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
2,084✔
1838
                         m_negotiated_protocol_version); // Throws
2,084✔
1839
            formatter.reset();
2,084✔
1840
        }
2,084✔
1841

1842
        std::string sec_websocket_protocol_2;
×
1843
        {
2,084✔
1844
            std::string_view prefix =
2,084✔
1845
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
2,084✔
1846
                    ? get_old_pbs_websocket_protocol_prefix()
2,084✔
1847
                    : get_pbs_websocket_protocol_prefix();
2,084✔
1848
            std::ostringstream out;
2,084✔
1849
            out.imbue(std::locale::classic());
2,084✔
1850
            out << prefix << m_negotiated_protocol_version; // Throws
2,084✔
1851
            sec_websocket_protocol_2 = std::move(out).str();
2,084✔
1852
        }
2,084✔
1853

1854
        std::error_code ec;
2,084✔
1855
        util::Optional<HTTPResponse> response =
2,084✔
1856
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
2,084✔
1857

1858
        if (ec) {
2,084✔
1859
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1860
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1861
            }
×
1862
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1863
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1864
            }
×
1865
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1866
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1867
            }
×
1868
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1869
                logger.error("The header Sec-WebSocket-Key is missing");
×
1870
            }
×
1871

1872
            logger.error("The HTTP request with the error is:\n%1", request);
×
1873
            logger.error("Check the proxy configuration and make sure that the "
×
1874
                         "HTTP request is a valid Websocket request.");
×
1875
            close_due_to_error(ec);
×
1876
            return;
×
1877
        }
×
1878
        REALM_ASSERT(response);
2,084✔
1879
        add_common_http_response_headers(*response);
2,084✔
1880

1881
        std::string user_agent;
2,084✔
1882
        {
2,084✔
1883
            auto i = request.headers.find(g_user_agent);
2,084✔
1884
            if (i != request.headers.end())
2,084✔
1885
                user_agent = i->second; // Throws (copy)
2,084✔
1886
        }
2,084✔
1887

1888
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
2,084✔
1889
                        this](std::error_code ec) {
2,084✔
1890
            // If the operation is aborted, the socket object may have been destroyed.
1891
            if (ec != util::error::operation_aborted) {
2,084✔
1892
                if (ec) {
2,084✔
1893
                    write_error(ec);
×
1894
                    return;
×
1895
                }
×
1896

1897
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
2,084✔
1898
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
2,084✔
1899
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
2,084✔
1900
                    get_appservices_request_id()); // Throws
2,084✔
1901
                SyncConnection& sync_conn_ref = *sync_conn;
2,084✔
1902
                m_server.add_sync_connection(m_id, std::move(sync_conn));
2,084✔
1903
                m_server.remove_http_connection(m_id);
2,084✔
1904
                sync_conn_ref.initiate();
2,084✔
1905
            }
2,084✔
1906
        };
2,084✔
1907
        m_http_server.async_send_response(*response, std::move(handler));
2,084✔
1908
    }
2,084✔
1909

1910
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1911
    {
20✔
1912
        std::string body_2 = std::string(body); // Throws
20✔
1913

1914
        HTTPResponse response;
20✔
1915
        response.status = http_status;
20✔
1916
        add_common_http_response_headers(response);
20✔
1917
        response.headers["Connection"] = "close";
20✔
1918

1919
        if (!body_2.empty()) {
20✔
1920
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1921
            response.body = std::move(body_2);
20✔
1922
        }
20✔
1923

1924
        auto handler = [this](std::error_code ec) {
20✔
1925
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1926
                return;
×
1927
            if (REALM_UNLIKELY(ec)) {
20✔
1928
                write_error(ec);
×
1929
                return;
×
1930
            }
×
1931
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1932
        };
20✔
1933
        m_http_server.async_send_response(response, std::move(handler));
20✔
1934
    }
20✔
1935

1936
    void handle_400_bad_request(std::string_view body)
1937
    {
8✔
1938
        logger.detail("400 Bad Request");
8✔
1939
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1940
    }
8✔
1941

1942
    void handle_404_not_found(const HTTPRequest&)
1943
    {
12✔
1944
        logger.detail("404 Not Found"); // Throws
12✔
1945
        handle_text_response(HTTPStatus::NotFound,
12✔
1946
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1947
    }
12✔
1948

1949
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1950
    {
×
1951
        logger.debug("503 Service Unavailable");                       // Throws
×
1952
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1953
    }
×
1954

1955
    void add_common_http_response_headers(HTTPResponse& response)
1956
    {
2,104✔
1957
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
2,104✔
1958
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
2,104✔
1959
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
1960
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1961
        }
20✔
1962
    }
2,104✔
1963

1964
    void read_error(std::error_code ec)
1965
    {
14✔
1966
        REALM_ASSERT(ec != util::error::operation_aborted);
14✔
1967
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
14!
1968
            // Suicide
1969
            close_due_to_close_by_client(ec); // Throws
14✔
1970
            return;
14✔
1971
        }
14✔
1972
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1973
            logger.error("Input message head delimited not found"); // Throws
×
1974
            close_due_to_error(ec);                                 // Throws
×
1975
            return;
×
1976
        }
×
1977

1978
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1979

1980
        // Suicide
1981
        close_due_to_error(ec); // Throws
×
1982
    }
×
1983

1984
    void write_error(std::error_code ec)
1985
    {
×
1986
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1987
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1988
            // Suicide
1989
            close_due_to_close_by_client(ec); // Throws
×
1990
            return;
×
1991
        }
×
1992
        logger.error("Writing failed: %1", ec.message()); // Throws
×
1993

1994
        // Suicide
1995
        close_due_to_error(ec); // Throws
×
1996
    }
×
1997

1998
    void close_due_to_close_by_client(std::error_code ec)
1999
    {
14✔
2000
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
14✔
2001
        // Suicide
2002
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
14✔
2003
    }
14✔
2004

2005
    void close_due_to_error(std::error_code ec)
2006
    {
10✔
2007
        // Suicide
2008
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
10✔
2009
                  ec.message()); // Throws
10✔
2010
    }
10✔
2011

2012
    static std::string make_logger_prefix(int_fast64_t id)
2013
    {
3,328✔
2014
        std::ostringstream out;
3,328✔
2015
        out.imbue(std::locale::classic());
3,328✔
2016
        out << "HTTP Connection[" << id << "]: "; // Throws
3,328✔
2017
        return out.str();                         // Throws
3,328✔
2018
    }
3,328✔
2019
};
2020

2021

2022
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2023
public:
2024
    std::size_t num_changesets = 0;
2025
    std::size_t accum_original_size = 0;
2026
    std::size_t accum_compacted_size = 0;
2027

2028
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2029
        : m_protocol{protocol}
18,702✔
2030
        , m_buffer{buffer}
18,702✔
2031
        , m_logger{logger}
18,702✔
2032
    {
40,722✔
2033
    }
40,722✔
2034

2035
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2036
    {
41,384✔
2037
        version_type client_version = entry.remote_version;
41,384✔
2038
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
41,384✔
2039
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
41,384✔
2040
        ++num_changesets;
41,384✔
2041
        accum_original_size += original_size;
41,384✔
2042
        accum_compacted_size += entry.changeset.size();
41,384✔
2043
    }
41,384✔
2044

2045
private:
2046
    ServerProtocol& m_protocol;
2047
    OutputBuffer& m_buffer;
2048
    util::Logger& m_logger;
2049
};
2050

2051

2052
// ============================ Session ============================
2053

2054
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2055
//   Protocol             ent file    IDENT    message   message   Error     message
2056
//   state                identifier  message  received  received  occurred  sent
2057
// ---------------------------------------------------------------------------------
2058
//   AllocatingIdent      yes         yes      no        no        no        no
2059
//   SendIdent            no          yes      no        no        no        no
2060
//   WaitForIdent         no          no       no        no        no        no
2061
//   WaitForUnbind        maybe       no       yes       no        no        no
2062
//   SendError            maybe       maybe    maybe     no        yes       no
2063
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2064
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2065
//
2066
//
2067
//   Condition                      Expression
2068
// ----------------------------------------------------------
2069
//   Need client file identifier    need_client_file_ident()
2070
//   Send IDENT message             must_send_ident_message()
2071
//   IDENT message received         ident_message_received()
2072
//   UNBIND message received        unbind_message_received()
2073
//   Error occurred                 error_occurred()
2074
//   ERROR message sent             m_error_message_sent
2075
//
2076
//
2077
//   Protocol
2078
//   state                Will send              Can receive
2079
// -----------------------------------------------------------------------
2080
//   AllocatingIdent      none                   UNBIND
2081
//   SendIdent            IDENT                  UNBIND
2082
//   WaitForIdent         none                   IDENT, UNBIND
2083
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2084
//                        MARK, ALLOC            ALLOC, UNBIND
2085
//   SendError            ERROR                  any
2086
//   WaitForUnbindErr     none                   any
2087
//   SendUnbound          UNBOUND                none
2088
//
2089
class Session final : private FileIdentReceiver {
2090
public:
2091
    util::PrefixLogger logger;
2092

2093
    Session(SyncConnection& conn, session_ident_type session_ident)
2094
        : logger{util::LogCategory::server, make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2,658✔
2095
        , m_connection{conn}
2,658✔
2096
        , m_session_ident{session_ident}
2,658✔
2097
    {
5,436✔
2098
    }
5,436✔
2099

2100
    ~Session() noexcept
2101
    {
5,438✔
2102
        REALM_ASSERT(!is_enlisted_to_send());
5,438✔
2103
        detach_from_server_file();
5,438✔
2104
    }
5,438✔
2105

2106
    SyncConnection& get_connection() noexcept
2107
    {
40,744✔
2108
        return m_connection;
40,744✔
2109
    }
40,744✔
2110

2111
    const Optional<std::array<char, 64>>& get_encryption_key()
2112
    {
×
2113
        return m_connection.get_server().get_config().encryption_key;
×
2114
    }
×
2115

2116
    session_ident_type get_session_ident() const noexcept
2117
    {
160✔
2118
        return m_session_ident;
160✔
2119
    }
160✔
2120

2121
    ServerProtocol& get_server_protocol() noexcept
2122
    {
56,660✔
2123
        return m_connection.get_server_protocol();
56,660✔
2124
    }
56,660✔
2125

2126
    bool need_client_file_ident() const noexcept
2127
    {
6,982✔
2128
        return (m_file_ident_request != 0);
6,982✔
2129
    }
6,982✔
2130

2131
    bool must_send_ident_message() const noexcept
2132
    {
4,298✔
2133
        return m_send_ident_message;
4,298✔
2134
    }
4,298✔
2135

2136
    bool ident_message_received() const noexcept
2137
    {
343,898✔
2138
        return m_client_file_ident != 0;
343,898✔
2139
    }
343,898✔
2140

2141
    bool unbind_message_received() const noexcept
2142
    {
346,326✔
2143
        return m_unbind_message_received;
346,326✔
2144
    }
346,326✔
2145

2146
    bool error_occurred() const noexcept
2147
    {
338,808✔
2148
        return int(m_error_code) != 0;
338,808✔
2149
    }
338,808✔
2150

2151
    bool relayed_alloc_request_in_progress() const noexcept
2152
    {
×
2153
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2154
    }
×
2155

2156
    // Returns the file identifier (always a nonzero value) of the client side
2157
    // file if ident_message_received() returns true. Otherwise it returns zero.
2158
    file_ident_type get_client_file_ident() const noexcept
2159
    {
×
2160
        return m_client_file_ident;
×
2161
    }
×
2162

2163
    void initiate()
2164
    {
5,440✔
2165
        logger.detail("Session initiated", m_session_ident); // Throws
5,440✔
2166
    }
5,440✔
2167

2168
    void terminate()
2169
    {
4,456✔
2170
        logger.detail("Session terminated", m_session_ident); // Throws
4,456✔
2171
    }
4,456✔
2172

2173
    // Initiate the deactivation process, if it has not been initiated already
2174
    // by the client.
2175
    //
2176
    // IMPORTANT: This function must not be called with protocol versions
2177
    // earlier than 23.
2178
    //
2179
    // The deactivation process will eventually lead to termination of the
2180
    // session.
2181
    //
2182
    // The session will detach itself from the server file when the deactivation
2183
    // process is initiated, regardless of whether it is initiated by the
2184
    // client, or by calling this function.
2185
    void initiate_deactivation(ProtocolError error_code)
2186
    {
80✔
2187
        REALM_ASSERT(is_session_level_error(error_code));
80✔
2188
        REALM_ASSERT(!error_occurred()); // Must only be called once
80✔
2189

2190
        // If the UNBIND message has been received, then the client has
2191
        // initiated the deactivation process already.
2192
        if (REALM_LIKELY(!unbind_message_received())) {
80✔
2193
            detach_from_server_file();
80✔
2194
            m_error_code = error_code;
80✔
2195
            // Protocol state is now SendError
2196
            ensure_enlisted_to_send();
80✔
2197
            return;
80✔
2198
        }
80✔
2199
        // Protocol state was SendUnbound, and remains unchanged
2200
    }
80✔
2201

2202
    bool is_enlisted_to_send() const noexcept
2203
    {
270,748✔
2204
        return m_next != nullptr;
270,748✔
2205
    }
270,748✔
2206

2207
    void ensure_enlisted_to_send() noexcept
2208
    {
52,536✔
2209
        if (!is_enlisted_to_send())
52,536✔
2210
            enlist_to_send();
51,480✔
2211
    }
52,536✔
2212

2213
    void enlist_to_send() noexcept
2214
    {
108,464✔
2215
        m_connection.enlist_to_send(this);
108,464✔
2216
    }
108,464✔
2217

2218
    // Overriding memeber function in FileIdentReceiver
2219
    void receive_file_ident(SaltedFileIdent file_ident) override final
2220
    {
1,342✔
2221
        // Protocol state must be AllocatingIdent or WaitForUnbind
2222
        if (!ident_message_received()) {
1,342✔
2223
            REALM_ASSERT(need_client_file_ident());
1,342✔
2224
            REALM_ASSERT(m_send_ident_message);
1,342✔
2225
        }
1,342✔
2226
        else {
×
2227
            REALM_ASSERT(!m_send_ident_message);
×
2228
        }
×
2229
        REALM_ASSERT(!unbind_message_received());
1,342✔
2230
        REALM_ASSERT(!error_occurred());
1,342✔
2231
        REALM_ASSERT(!m_error_message_sent);
1,342✔
2232

2233
        m_file_ident_request = 0;
1,342✔
2234
        m_allocated_file_ident = file_ident;
1,342✔
2235

2236
        // If the protocol state was AllocatingIdent, it is now SendIdent,
2237
        // otherwise it continues to be WaitForUnbind.
2238

2239
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,342✔
2240
                     file_ident.salt); // Throws
1,342✔
2241

2242
        ensure_enlisted_to_send();
1,342✔
2243
    }
1,342✔
2244

2245
    // Called by the associated connection object when this session is granted
2246
    // an opportunity to initiate the sending of a message.
2247
    //
2248
    // This function may lead to the destruction of the session object
2249
    // (suicide).
2250
    void send_message()
2251
    {
108,244✔
2252
        if (REALM_LIKELY(!unbind_message_received())) {
108,244✔
2253
            if (REALM_LIKELY(!error_occurred())) {
105,736✔
2254
                if (REALM_LIKELY(ident_message_received())) {
105,656✔
2255
                    // State is WaitForUnbind.
2256
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
104,314✔
2257
                    if (REALM_LIKELY(!relayed_alloc)) {
104,316✔
2258
                        // Send DOWNLOAD or MARK.
2259
                        continue_history_scan(); // Throws
104,316✔
2260
                        // Session object may have been
2261
                        // destroyed at this point (suicide)
2262
                        return;
104,316✔
2263
                    }
104,316✔
2264
                    send_alloc_message(); // Throws
2,147,483,647✔
2265
                    return;
2,147,483,647✔
2266
                }
104,314✔
2267
                // State is SendIdent
2268
                send_ident_message(); // Throws
1,342✔
2269
                return;
1,342✔
2270
            }
105,656✔
2271
            // State is SendError
2272
            send_error_message(); // Throws
80✔
2273
            return;
80✔
2274
        }
105,736✔
2275
        // State is SendUnbound
2276
        send_unbound_message(); // Throws
2,508✔
2277
        terminate();            // Throws
2,508✔
2278
        m_connection.discard_session(m_session_ident);
2,508✔
2279
        // This session is now destroyed!
2280
    }
2,508✔
2281

2282
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2283
                              bool is_subserver, ProtocolError& error)
2284
    {
5,440✔
2285
        if (logger.would_log(util::Logger::Level::info)) {
5,440✔
2286
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
366✔
2287
                          "need_client_file_ident=%3, is_subserver=%4)",
366✔
2288
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
366✔
2289
                          int(is_subserver)); // Throws
366✔
2290
        }
366✔
2291

2292
        ServerImpl& server = m_connection.get_server();
5,440✔
2293
        _impl::VirtualPathComponents virt_path_components =
5,440✔
2294
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
5,440✔
2295

2296
        if (!virt_path_components.is_valid) {
5,440✔
2297
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2298
                         "signed_user_token='%2')",
28✔
2299
                         path,
28✔
2300
                         short_token_fmt(signed_user_token)); // Throws
28✔
2301
            error = ProtocolError::illegal_realm_path;
28✔
2302
            return false;
28✔
2303
        }
28✔
2304

2305
        // The user has proper permissions at this stage.
2306

2307
        m_server_file = server.get_or_create_file(path); // Throws
5,412✔
2308

2309
        m_server_file->add_unidentified_session(this); // Throws
5,412✔
2310

2311
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
5,412✔
2312
                    m_connection.get_client_protocol_version(),
5,412✔
2313
                    m_connection.get_client_user_agent()); // Throws
5,412✔
2314

2315
        m_is_subserver = is_subserver;
5,412✔
2316
        if (REALM_LIKELY(!need_client_file_ident)) {
5,412✔
2317
            // Protocol state is now WaitForUnbind
2318
            return true;
4,014✔
2319
        }
4,014✔
2320

2321
        // FIXME: We must make a choice about client file ident for read only
2322
        // sessions. They should have a special read-only client file ident.
2323
        file_ident_type proxy_file = 0; // No proxy
1,398✔
2324
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
1,398✔
2325
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
1,398✔
2326
        m_send_ident_message = true;
1,398✔
2327
        // Protocol state is now AllocatingIdent
2328

2329
        return true;
1,398✔
2330
    }
5,412✔
2331

2332
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2333
                               version_type scan_server_version, version_type scan_client_version,
2334
                               version_type latest_server_version, salt_type latest_server_version_salt,
2335
                               ProtocolError& error)
2336
    {
4,298✔
2337
        // Protocol state must be WaitForIdent
2338
        REALM_ASSERT(!need_client_file_ident());
4,298✔
2339
        REALM_ASSERT(!m_send_ident_message);
4,298✔
2340
        REALM_ASSERT(!ident_message_received());
4,298✔
2341
        REALM_ASSERT(!unbind_message_received());
4,298✔
2342
        REALM_ASSERT(!error_occurred());
4,298✔
2343
        REALM_ASSERT(!m_error_message_sent);
4,298✔
2344

2345
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
4,298✔
2346
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
4,298✔
2347
                     "latest_server_version_salt=%6)",
4,298✔
2348
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
4,298✔
2349
                     latest_server_version, latest_server_version_salt); // Throws
4,298✔
2350

2351
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
4,298✔
2352
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
4,298✔
2353
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
4,298✔
2354
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
4,298✔
2355
        UploadCursor upload_threshold = {0, 0};
4,298✔
2356
        version_type locked_server_version = 0;
4,298✔
2357
        BootstrapError error_2 =
4,298✔
2358
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
4,298✔
2359
                                                    client_type, upload_threshold, locked_server_version,
4,298✔
2360
                                                    logger); // Throws
4,298✔
2361
        switch (error_2) {
4,298✔
2362
            case BootstrapError::no_error:
4,266✔
2363
                break;
4,266✔
2364
            case BootstrapError::client_file_expired:
✔
2365
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2366
                error = ProtocolError::client_file_expired;
×
2367
                return false;
×
2368
            case BootstrapError::bad_client_file_ident:
✔
2369
                logger.error("Bad client file ident (%1) in IDENT message",
×
2370
                             client_file_ident); // Throws
×
2371
                error = ProtocolError::bad_client_file_ident;
×
2372
                return false;
×
2373
            case BootstrapError::bad_client_file_ident_salt:
4✔
2374
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2375
                             client_file_ident_salt); // Throws
4✔
2376
                error = ProtocolError::diverging_histories;
4✔
2377
                return false;
4✔
2378
            case BootstrapError::bad_download_server_version:
✔
2379
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2380
                error = ProtocolError::bad_server_version;
×
2381
                return false;
×
2382
            case BootstrapError::bad_download_client_version:
4✔
2383
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2384
                error = ProtocolError::bad_client_version;
4✔
2385
                return false;
4✔
2386
            case BootstrapError::bad_server_version:
20✔
2387
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2388
                error = ProtocolError::bad_server_version;
20✔
2389
                return false;
20✔
2390
            case BootstrapError::bad_server_version_salt:
4✔
2391
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2392
                error = ProtocolError::diverging_histories;
4✔
2393
                return false;
4✔
2394
            case BootstrapError::bad_client_type:
✔
2395
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2396
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2397
                                                              // `bad_client_type`.
2398
                return false;
×
2399
        }
4,298✔
2400

2401
        // Make sure there is no other session currently associcated with the
2402
        // same client-side file
2403
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
4,266✔
2404
            SyncConnection& other_conn = other_sess->get_connection();
×
2405
            // It is a protocol violation if the other session is associated
2406
            // with the same connection
2407
            if (&other_conn == &m_connection) {
×
2408
                logger.error("Client file already bound in other session associated with "
×
2409
                             "the same connection"); // Throws
×
2410
                error = ProtocolError::bound_in_other_session;
×
2411
                return false;
×
2412
            }
×
2413
            // When the other session is associated with a different connection
2414
            // (`other_conn`), the clash may be due to the server not yet having
2415
            // realized that the other connection has been closed by the
2416
            // client. If so, the other connention is a "zombie". In the
2417
            // interest of getting rid of zombie connections as fast as
2418
            // possible, we shall assume that a clash with a session in another
2419
            // connection is always due to that other connection being a
2420
            // zombie. And when such a situation is detected, we want to close
2421
            // the zombie connection immediately.
2422
            auto log_level = Logger::Level::detail;
×
2423
            other_conn.terminate(log_level,
×
2424
                                 "Sync connection closed (superseded session)"); // Throws
×
2425
        }
×
2426

2427
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
4,266✔
2428

2429
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
4,266✔
2430
                                                                  m_session_ident, client_file_ident));
4,266✔
2431

2432
        m_server_file->identify_session(this, client_file_ident); // Throws
4,266✔
2433

2434
        m_client_file_ident = client_file_ident;
4,266✔
2435
        m_download_progress = download_progress;
4,266✔
2436
        m_upload_threshold = upload_threshold;
4,266✔
2437
        m_locked_server_version = locked_server_version;
4,266✔
2438

2439
        ServerImpl& server = m_connection.get_server();
4,266✔
2440
        const Server::Config& config = server.get_config();
4,266✔
2441
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
4,266✔
2442

2443
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
4,266✔
2444
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2445
                                              client_file_ident); // Throws
×
2446
        }
×
2447

2448
        // Protocol  state is now WaitForUnbind
2449
        enlist_to_send();
4,266✔
2450
        return true;
4,266✔
2451
    }
4,266✔
2452

2453
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2454
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2455
                                ProtocolError& error)
2456
    {
46,616✔
2457
        // Protocol state must be WaitForUnbind
2458
        REALM_ASSERT(!m_send_ident_message);
46,616✔
2459
        REALM_ASSERT(ident_message_received());
46,616✔
2460
        REALM_ASSERT(!unbind_message_received());
46,616✔
2461
        REALM_ASSERT(!error_occurred());
46,616✔
2462
        REALM_ASSERT(!m_error_message_sent);
46,616✔
2463

2464
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
46,616✔
2465
                      "locked_server_version=%3, num_changesets=%4)",
46,616✔
2466
                      progress_client_version, progress_server_version, locked_server_version,
46,616✔
2467
                      upload_changesets.size()); // Throws
46,616✔
2468

2469
        // We are unable to reproduce the cursor object for the upload progress
2470
        // when the protocol version is less than 29, because the client does
2471
        // not provide the required information. When the protocol version is
2472
        // less than 25, we can always get a consistent cursor by taking it from
2473
        // the changeset that was uploaded last, but in protocol versions 25,
2474
        // 26, 27, and 28, things are more complicated. Here, we receive new
2475
        // values for `last_integrated_server_version` which we cannot afford to
2476
        // ignore, but we do not know what client versions they correspond
2477
        // to. Fortunately, we can produce a cursor that works, and is mutually
2478
        // consistent with previous cursors, by simply bumping
2479
        // `upload_progress.client_version` when
2480
        // `upload_progress.last_intgerated_server_version` grows.
2481
        //
2482
        // To see that this scheme works, consider the last changeset, A, that
2483
        // will have already been uploaded and integrated at the beginning of
2484
        // the next session, and the first changeset, B, that follows A in the
2485
        // client side history, and is not upload skippable (of local origin and
2486
        // nonempty). We then need to show that A will be skipped, if uploaded
2487
        // in the next session, but B will not.
2488
        //
2489
        // Let V be the client version produced by A, and let T be the value of
2490
        // `upload_progress.client_version` as determined in this session, which
2491
        // is used as threshold in the next session. Then we know that A is
2492
        // skipped during the next session if V is less than, or equal to T. If
2493
        // the protocol version is at least 29, the protocol requires that T is
2494
        // greater than, or equal to V. If the protocol version is less than 25,
2495
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
2496
        // or 28, we construct T such that it is always greater than, or equal
2497
        // to V, so in all cases, A will be skipped during the next session.
2498
        //
2499
        // Let W be the client version on which B is based. We then know that B
2500
        // will be retained if, and only if W is greater than, or equalto T. If
2501
        // the protocol version is at least 29, we know that T is less than, or
2502
        // equal to W, since B is not integrated until the next session. If the
2503
        // protocol version is less tahn 25, we know that T is V. Since V must
2504
        // be less than, or equal to W, we again know that T is less than, or
2505
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
2506
        // construct T such that it is equal to V + N, where N is the number of
2507
        // observed increments in `last_integrated_server_version` since the
2508
        // client version prodiced by A. For each of these observed increments,
2509
        // there must have been a distinct new client version, but all these
2510
        // client versions must be less than, or equal to W, since B is not
2511
        // integrated until the next session. Therefore, we know that T = V + N
2512
        // is less than, or qual to W. So, in all cases, B will not skipped
2513
        // during the next session.
2514
        int protocol_version = m_connection.get_client_protocol_version();
46,616✔
2515
        static_cast<void>(protocol_version); // No protocol diversion (yet)
46,616✔
2516

2517
        UploadCursor upload_progress;
46,616✔
2518
        upload_progress = {progress_client_version, progress_server_version};
46,616✔
2519

2520
        // `upload_progress.client_version` must be nondecreasing across the
2521
        // session.
2522
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
46,616✔
2523
        if (REALM_UNLIKELY(!good_1)) {
46,616✔
2524
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2525
                         m_upload_progress.client_version); // Throws
×
2526
            error = ProtocolError::bad_client_version;
×
2527
            return false;
×
2528
        }
×
2529
        // `upload_progress.last_integrated_server_version` must be a version
2530
        // that the client can have heard about.
2531
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
46,616✔
2532
        if (REALM_UNLIKELY(!good_2)) {
46,616✔
2533
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2534
                         upload_progress.last_integrated_server_version,
×
2535
                         m_download_progress.server_version); // Throws
×
2536
            error = ProtocolError::bad_server_version;
×
2537
            return false;
×
2538
        }
×
2539

2540
        // `upload_progress` must be consistent.
2541
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
46,616✔
2542
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2543
                         upload_progress.last_integrated_server_version); // Throws
×
2544
            error = ProtocolError::bad_server_version;
×
2545
            return false;
×
2546
        }
×
2547
        // `upload_progress` and `m_upload_threshold` must be mutually
2548
        // consistent.
2549
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
46,616✔
2550
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2551
                         "threshold (%3, %4)",
×
2552
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2553
                         m_upload_threshold.client_version,
×
2554
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2555
            error = ProtocolError::bad_server_version;
×
2556
            return false;
×
2557
        }
×
2558
        // `upload_progress` and `m_upload_progress` must be mutually
2559
        // consistent.
2560
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
46,616✔
2561
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2562
                         "upload progress (%3, %4)",
×
2563
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2564
                         m_upload_progress.client_version,
×
2565
                         m_upload_progress.last_integrated_server_version); // Throws
×
2566
            error = ProtocolError::bad_server_version;
×
2567
            return false;
×
2568
        }
×
2569

2570
        version_type locked_server_version_2 = locked_server_version;
46,616✔
2571

2572
        // `locked_server_version_2` must be nondecreasing over the lifetime of
2573
        // the client-side file.
2574
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
46,616✔
2575
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2576
                         m_locked_server_version); // Throws
×
2577
            error = ProtocolError::bad_server_version;
×
2578
            return false;
×
2579
        }
×
2580
        // `locked_server_version_2` must be a version that the client can have
2581
        // heard about.
2582
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
46,616✔
2583
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2584
                         m_download_progress.server_version); // Throws
×
2585
            error = ProtocolError::bad_server_version;
×
2586
            return false;
×
2587
        }
×
2588

2589
        std::size_t num_previously_integrated_changesets = 0;
46,616✔
2590
        if (!upload_changesets.empty()) {
46,616✔
2591
            UploadCursor up = m_upload_progress;
25,242✔
2592
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
36,880✔
2593
                // `uc.upload_cursor.client_version` must be increasing across
2594
                // all the changesets in this UPLOAD message, and all must be
2595
                // greater than upload_progress.client_version of previous
2596
                // UPLOAD message.
2597
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
36,880✔
2598
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2599
                                 "changeset (%1 <= %2)",
×
2600
                                 uc.upload_cursor.client_version,
×
2601
                                 up.client_version); // Throws
×
2602
                    error = ProtocolError::bad_client_version;
×
2603
                    return false;
×
2604
                }
×
2605
                // `uc.upload_progress` must be consistent.
2606
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
36,880✔
2607
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2608
                                 uc.upload_cursor.client_version,
×
2609
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2610
                    error = ProtocolError::bad_server_version;
×
2611
                    return false;
×
2612
                }
×
2613
                // `uc.upload_progress` must be mutually consistent with
2614
                // previous upload cursor.
2615
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
36,880✔
2616
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2617
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2618
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2619
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2620
                    error = ProtocolError::bad_server_version;
×
2621
                    return false;
×
2622
                }
×
2623
                // `uc.upload_progress` must be mutually consistent with
2624
                // threshold, that is, for changesets that have not previously
2625
                // been integrated, it is important that the specified value of
2626
                // `last_integrated_server_version` is greater than, or equal to
2627
                // the reciprocal history base version.
2628
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
36,880✔
2629
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
36,880✔
2630
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2631
                                 "inconsistent with threshold (%3, %4)",
×
2632
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2633
                                 m_upload_threshold.client_version,
×
2634
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2635
                    error = ProtocolError::bad_server_version;
×
2636
                    return false;
×
2637
                }
×
2638
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
36,880✔
2639
                if (previously_integrated)
36,880✔
2640
                    ++num_previously_integrated_changesets;
2,468✔
2641
                up = uc.upload_cursor;
36,880✔
2642
            }
36,880✔
2643
            // `upload_progress.client_version` must be greater than, or equal
2644
            // to client versions produced by each of the changesets in this
2645
            // UPLOAD message.
2646
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
25,242✔
2647
                logger.error("Upload progress less than client version produced by uploaded "
×
2648
                             "changeset (%1 > %2)",
×
2649
                             up.client_version,
×
2650
                             upload_progress.client_version); // Throws
×
2651
                error = ProtocolError::bad_client_version;
×
2652
                return false;
×
2653
            }
×
2654
            // The upload cursor of last uploaded changeset must be mutually
2655
            // consistent with the reported upload progress.
2656
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
25,242✔
2657
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2658
                             "inconsistent with upload progress (%3, %4)",
×
2659
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2660
                             upload_progress.last_integrated_server_version); // Throws
×
2661
                error = ProtocolError::bad_server_version;
×
2662
                return false;
×
2663
            }
×
2664
        }
25,242✔
2665

2666
        // FIXME: Part of a very poor man's substitute for a proper backpressure
2667
        // scheme.
2668
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
46,616✔
2669
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2670
            // Using this exact error code, because it causes `try_again` flag
2671
            // to be set to true, which causes the client to wait for about 5
2672
            // minuites before trying to connect again.
2673
            error = ProtocolError::connection_closed;
×
2674
            return false;
×
2675
        }
×
2676

2677
        m_upload_progress = upload_progress;
46,616✔
2678

2679
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
46,616✔
2680
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
46,616✔
2681

2682
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
46,616✔
2683
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
46,616✔
2684

2685
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
46,616✔
2686
        if (!have_anything_to_do)
46,616✔
2687
            return true;
280✔
2688

2689
        if (!have_real_upload_progress)
46,336✔
2690
            upload_progress = m_upload_threshold;
×
2691

2692
        if (num_previously_integrated_changesets > 0) {
46,336✔
2693
            logger.detail("Ignoring %1 previously integrated changesets",
784✔
2694
                          num_previously_integrated_changesets); // Throws
784✔
2695
        }
784✔
2696
        if (num_changesets_to_integrate > 0) {
46,336✔
2697
            logger.detail("Initiate integration of %1 remote changesets",
24,854✔
2698
                          num_changesets_to_integrate); // Throws
24,854✔
2699
        }
24,854✔
2700

2701
        REALM_ASSERT(m_server_file);
46,336✔
2702
        ServerFile& file = *m_server_file;
46,336✔
2703
        std::size_t offset = num_previously_integrated_changesets;
46,336✔
2704
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
46,336✔
2705
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
46,336✔
2706

2707
        m_locked_server_version = locked_server_version_2;
46,336✔
2708
        return true;
46,336✔
2709
    }
46,616✔
2710

2711
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2712
    {
12,016✔
2713
        // Protocol state must be WaitForUnbind
2714
        REALM_ASSERT(!m_send_ident_message);
12,016✔
2715
        REALM_ASSERT(ident_message_received());
12,016✔
2716
        REALM_ASSERT(!unbind_message_received());
12,016✔
2717
        REALM_ASSERT(!error_occurred());
12,016✔
2718
        REALM_ASSERT(!m_error_message_sent);
12,016✔
2719

2720
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
12,016✔
2721

2722
        m_download_completion_request = request_ident;
12,016✔
2723

2724
        ensure_enlisted_to_send();
12,016✔
2725
        return true;
12,016✔
2726
    }
12,016✔
2727

2728
    // Returns true if the deactivation process has been completed, at which
2729
    // point the caller (SyncConnection::receive_unbind_message()) should
2730
    // terminate the session.
2731
    //
2732
    // CAUTION: This function may commit suicide!
2733
    void receive_unbind_message()
2734
    {
2,546✔
2735
        // Protocol state may be anything but SendUnbound
2736
        REALM_ASSERT(!m_unbind_message_received);
2,546✔
2737

2738
        logger.detail("Received: UNBIND"); // Throws
2,546✔
2739

2740
        detach_from_server_file();
2,546✔
2741
        m_unbind_message_received = true;
2,546✔
2742

2743
        // Detect completion of the deactivation process
2744
        if (m_error_message_sent) {
2,546✔
2745
            // Deactivation process completed
2746
            terminate(); // Throws
26✔
2747
            m_connection.discard_session(m_session_ident);
26✔
2748
            // This session is now destroyed!
2749
            return;
26✔
2750
        }
26✔
2751

2752
        // Protocol state is now SendUnbound
2753
        ensure_enlisted_to_send();
2,520✔
2754
    }
2,520✔
2755

2756
    void receive_error_message(session_ident_type, int, std::string_view)
2757
    {
×
2758
        REALM_ASSERT(!m_unbind_message_received);
×
2759

2760
        logger.detail("Received: ERROR"); // Throws
×
2761
    }
×
2762

2763
private:
2764
    SyncConnection& m_connection;
2765

2766
    const session_ident_type m_session_ident;
2767

2768
    // Not null if, and only if this session is in
2769
    // m_connection.m_sessions_enlisted_to_send.
2770
    Session* m_next = nullptr;
2771

2772
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2773
    // reset to null when the deactivation process is initiated, either when the
2774
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2775
    util::bind_ptr<ServerFile> m_server_file;
2776

2777
    bool m_disable_download = false;
2778
    bool m_is_subserver = false;
2779

2780
    using file_ident_request_type = ServerFile::file_ident_request_type;
2781

2782
    // When nonzero, this session has an outstanding request for a client file
2783
    // identifier.
2784
    file_ident_request_type m_file_ident_request = 0;
2785

2786
    // Payload for next outgoing ALLOC message.
2787
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2788

2789
    // Zero until the session receives an IDENT message from the client.
2790
    file_ident_type m_client_file_ident = 0;
2791

2792
    // Zero until initiate_deactivation() is called.
2793
    ProtocolError m_error_code = {};
2794

2795
    // The current point of progression of the download process. Set to (<server
2796
    // version>, <client version>) of the IDENT message when the IDENT message
2797
    // is received. At the time of return from continue_history_scan(), it
2798
    // points to the latest server version such that all preceding changesets in
2799
    // the server-side history have been downloaded, are currently being
2800
    // downloaded, or are *download excluded*.
2801
    DownloadCursor m_download_progress = {0, 0};
2802

2803
    request_ident_type m_download_completion_request = 0;
2804

2805
    // Records the progress of the upload process. Used to check that the client
2806
    // uploads changesets in order. Also, when m_upload_progress >
2807
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2808
    // version of the upload progress.
2809
    UploadCursor m_upload_progress = {0, 0};
2810

2811
    // Initialized on reception of the IDENT message. Specifies the actual
2812
    // upload progress (as recorded on the server-side) at the beginning of the
2813
    // session, and it remains fixed throughout the session.
2814
    //
2815
    // m_upload_threshold includes the progress resulting from the received
2816
    // changesets that have not yet been integrated (only relevant for
2817
    // synchronous backup).
2818
    UploadCursor m_upload_threshold = {0, 0};
2819

2820
    // Works partially as a cache of the persisted value, and partially as a way
2821
    // of checking that the client respects that it can never decrease.
2822
    version_type m_locked_server_version = 0;
2823

2824
    bool m_send_ident_message = false;
2825
    bool m_unbind_message_received = false;
2826
    bool m_error_message_sent = false;
2827

2828
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2829
    /// has been sent in the current session. The variable is used to ensure
2830
    /// that a DOWNLOAD message is always sent in a session. The received
2831
    /// DOWNLOAD message is needed by the client to ensure that its current
2832
    /// download progress is up to date.
2833
    bool m_one_download_message_sent = false;
2834

2835
    static std::string make_logger_prefix(session_ident_type session_ident)
2836
    {
5,436✔
2837
        std::ostringstream out;
5,436✔
2838
        out.imbue(std::locale::classic());
5,436✔
2839
        out << "Session[" << session_ident << "]: "; // Throws
5,436✔
2840
        return out.str();                            // Throws
5,436✔
2841
    }
5,436✔
2842

2843
    // Scan the history for changesets to be downloaded.
2844
    // If the history is longer than the end point of the previous scan,
2845
    // a DOWNLOAD message will be sent.
2846
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2847
    // requested to be notified about download completion.
2848
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2849
    //
2850
    // This function may lead to the destruction of the session object
2851
    // (suicide).
2852
    void continue_history_scan()
2853
    {
104,312✔
2854
        // Protocol state must be WaitForUnbind
2855
        REALM_ASSERT(!m_send_ident_message);
104,312✔
2856
        REALM_ASSERT(ident_message_received());
104,312✔
2857
        REALM_ASSERT(!unbind_message_received());
104,312✔
2858
        REALM_ASSERT(!error_occurred());
104,312✔
2859
        REALM_ASSERT(!m_error_message_sent);
104,312✔
2860
        REALM_ASSERT(!is_enlisted_to_send());
104,312✔
2861

2862
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
104,312✔
2863
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
104,312✔
2864

2865
        ServerImpl& server = m_connection.get_server();
104,312✔
2866
        const Server::Config& config = server.get_config();
104,312✔
2867
        if (REALM_UNLIKELY(m_disable_download))
104,312✔
2868
            return;
×
2869

2870
        bool have_more_to_scan =
104,312✔
2871
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
104,312✔
2872
        if (have_more_to_scan) {
104,312✔
2873
            m_server_file->register_client_access(m_client_file_ident);     // Throws
40,722✔
2874
            const ServerHistory& history = m_server_file->access().history; // Throws
40,722✔
2875
            const char* body;
40,722✔
2876
            std::size_t uncompressed_body_size;
40,722✔
2877
            std::size_t compressed_body_size = 0;
40,722✔
2878
            bool body_is_compressed = false;
40,722✔
2879
            version_type end_version = last_server_version.version;
40,722✔
2880
            DownloadCursor download_progress;
40,722✔
2881
            UploadCursor upload_progress = {0, 0};
40,722✔
2882
            std::uint_fast64_t downloadable_bytes = 0;
40,722✔
2883
            std::size_t num_changesets;
40,722✔
2884
            std::size_t accum_original_size;
40,722✔
2885
            std::size_t accum_compacted_size;
40,722✔
2886
            ServerProtocol& protocol = get_server_protocol();
40,722✔
2887
            bool disable_download_compaction = config.disable_download_compaction;
40,722✔
2888
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
40,722!
2889
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
40,722!
2890
            DownloadCache& cache = m_server_file->get_download_cache();
40,722✔
2891
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
40,722!
2892
            if (fetch_from_cache) {
40,722✔
2893
                body = cache.body.get();
×
2894
                uncompressed_body_size = cache.uncompressed_body_size;
×
2895
                compressed_body_size = cache.compressed_body_size;
×
2896
                body_is_compressed = cache.body_is_compressed;
×
2897
                download_progress = cache.download_progress;
×
2898
                downloadable_bytes = cache.downloadable_bytes;
×
2899
                num_changesets = cache.num_changesets;
×
2900
                accum_original_size = cache.accum_original_size;
×
2901
                accum_compacted_size = cache.accum_compacted_size;
×
2902
            }
×
2903
            else {
40,722✔
2904
                // Discard the old cached DOWNLOAD body before generating a new
2905
                // one to be cached. This can make a big difference because the
2906
                // size of that body can be very large (10GiB has been seen in a
2907
                // real-world case).
2908
                if (enable_cache)
40,722✔
2909
                    cache.body = {};
×
2910

2911
                OutputBuffer& out = server.get_misc_buffers().download_message;
40,722✔
2912
                out.reset();
40,722✔
2913
                download_progress = m_download_progress;
40,722✔
2914
                auto fetch_and_compress = [&](std::size_t max_download_size) {
40,722✔
2915
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
40,718✔
2916
                    std::uint_fast64_t cumulative_byte_size_current;
40,718✔
2917
                    std::uint_fast64_t cumulative_byte_size_total;
40,718✔
2918
                    bool not_expired = history.fetch_download_info(
40,718✔
2919
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
40,718✔
2920
                        cumulative_byte_size_current, cumulative_byte_size_total, disable_download_compaction,
40,718✔
2921
                        max_download_size); // Throws
40,718✔
2922
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
40,718✔
2923
                    SyncConnection& conn = get_connection();
40,718✔
2924
                    if (REALM_UNLIKELY(!not_expired)) {
40,718✔
2925
                        logger.debug("History scanning failed: Client file entry "
×
2926
                                     "expired during session"); // Throws
×
2927
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2928
                        // Session object may have been destroyed at this point
2929
                        // (suicide).
2930
                        return false;
×
2931
                    }
×
2932

2933
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
40,718✔
2934
                    uncompressed_body_size = out.size();
40,718✔
2935
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
40,718✔
2936
                    body = uncompressed.data();
40,718✔
2937
                    std::size_t max_uncompressed = 1024;
40,718✔
2938
                    if (uncompressed.size() > max_uncompressed) {
40,718✔
2939
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,256✔
2940
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,256✔
2941
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,256✔
2942
                        if (buffer.size() < uncompressed.size()) {
4,256✔
2943
                            body = buffer.data();
4,256✔
2944
                            compressed_body_size = buffer.size();
4,256✔
2945
                            body_is_compressed = true;
4,256✔
2946
                        }
4,256✔
2947
                    }
4,256✔
2948
                    num_changesets = handler.num_changesets;
40,718✔
2949
                    accum_original_size = handler.accum_original_size;
40,718✔
2950
                    accum_compacted_size = handler.accum_compacted_size;
40,718✔
2951
                    return true;
40,718✔
2952
                };
40,718✔
2953
                if (enable_cache) {
40,722✔
2954
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2955
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2956
                        // Session object may have been destroyed at this point
2957
                        // (suicide).
2958
                        return;
×
2959
                    }
×
2960
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2961
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2962
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2963
                    std::copy(body, body + body_size, cache.body.get());
×
2964
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2965
                    cache.compressed_body_size = compressed_body_size;
×
2966
                    cache.body_is_compressed = body_is_compressed;
×
2967
                    cache.end_version = end_version;
×
2968
                    cache.download_progress = download_progress;
×
2969
                    cache.downloadable_bytes = downloadable_bytes;
×
2970
                    cache.num_changesets = num_changesets;
×
2971
                    cache.accum_original_size = accum_original_size;
×
2972
                    cache.accum_compacted_size = accum_compacted_size;
×
2973
                }
×
2974
                else {
40,722✔
2975
                    std::size_t max_download_size = config.max_download_size;
40,722✔
2976
                    if (!fetch_and_compress(max_download_size)) { // Throws
40,722✔
2977
                        // Session object may have been destroyed at this point
2978
                        // (suicide).
2979
                        return;
×
2980
                    }
×
2981
                }
40,722✔
2982
            }
40,722✔
2983

2984
            OutputBuffer& out = m_connection.get_output_buffer();
40,722✔
2985
            protocol.make_download_message(
40,722✔
2986
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
40,722✔
2987
                download_progress.last_integrated_client_version, last_server_version.version,
40,722✔
2988
                last_server_version.salt, upload_progress.client_version,
40,722✔
2989
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
40,722✔
2990
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
40,722✔
2991

2992
            if (!disable_download_compaction) {
40,724✔
2993
                std::size_t saved = accum_original_size - accum_compacted_size;
40,724✔
2994
                double saved_2 = (accum_original_size == 0 ? 0 : std::round(saved * 100.0 / accum_original_size));
40,724✔
2995
                logger.detail("Download compaction: Saved %1 bytes (%2%%)", saved, saved_2); // Throws
40,724✔
2996
            }
40,724✔
2997

2998
            m_download_progress = download_progress;
40,722✔
2999
            logger.debug("Setting of m_download_progress.server_version = %1",
40,722✔
3000
                         m_download_progress.server_version); // Throws
40,722✔
3001
            send_download_message();
40,722✔
3002
            m_one_download_message_sent = true;
40,722✔
3003

3004
            enlist_to_send();
40,722✔
3005
        }
40,722✔
3006
        else if (m_download_completion_request) {
63,590✔
3007
            // Send a MARK message
3008
            request_ident_type request_ident = m_download_completion_request;
11,994✔
3009
            send_mark_message(request_ident);  // Throws
11,994✔
3010
            m_download_completion_request = 0; // Request handled
11,994✔
3011
            enlist_to_send();
11,994✔
3012
        }
11,994✔
3013
    }
104,312✔
3014

3015
    void send_ident_message()
3016
    {
1,342✔
3017
        // Protocol state must be SendIdent
3018
        REALM_ASSERT(!need_client_file_ident());
1,342✔
3019
        REALM_ASSERT(m_send_ident_message);
1,342✔
3020
        REALM_ASSERT(!ident_message_received());
1,342✔
3021
        REALM_ASSERT(!unbind_message_received());
1,342✔
3022
        REALM_ASSERT(!error_occurred());
1,342✔
3023
        REALM_ASSERT(!m_error_message_sent);
1,342✔
3024

3025
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,342✔
3026

3027
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,342✔
3028
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,342✔
3029

3030
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,342✔
3031
                     client_file_ident_salt); // Throws
1,342✔
3032

3033
        ServerProtocol& protocol = get_server_protocol();
1,342✔
3034
        OutputBuffer& out = m_connection.get_output_buffer();
1,342✔
3035
        int protocol_version = m_connection.get_client_protocol_version();
1,342✔
3036
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,342✔
3037
                                    client_file_ident_salt); // Throws
1,342✔
3038
        m_connection.initiate_write_output_buffer();         // Throws
1,342✔
3039

3040
        m_allocated_file_ident.ident = 0; // Consumed
1,342✔
3041
        m_send_ident_message = false;
1,342✔
3042
        // Protocol state is now WaitForStateRequest or WaitForIdent
3043
    }
1,342✔
3044

3045
    void send_download_message()
3046
    {
40,724✔
3047
        m_connection.initiate_write_output_buffer(); // Throws
40,724✔
3048
    }
40,724✔
3049

3050
    void send_mark_message(request_ident_type request_ident)
3051
    {
11,994✔
3052
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
11,994✔
3053

3054
        ServerProtocol& protocol = get_server_protocol();
11,994✔
3055
        OutputBuffer& out = m_connection.get_output_buffer();
11,994✔
3056
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
11,994✔
3057
        m_connection.initiate_write_output_buffer();                     // Throws
11,994✔
3058
    }
11,994✔
3059

3060
    void send_alloc_message()
3061
    {
×
3062
        // Protocol state must be WaitForUnbind
3063
        REALM_ASSERT(!m_send_ident_message);
×
3064
        REALM_ASSERT(ident_message_received());
×
3065
        REALM_ASSERT(!unbind_message_received());
×
3066
        REALM_ASSERT(!error_occurred());
×
3067
        REALM_ASSERT(!m_error_message_sent);
×
3068

3069
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3070

3071
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3072
        REALM_ASSERT(false);
×
3073

3074
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3075

3076
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3077

3078
        ServerProtocol& protocol = get_server_protocol();
×
3079
        OutputBuffer& out = m_connection.get_output_buffer();
×
3080
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3081
        m_connection.initiate_write_output_buffer();                   // Throws
×
3082

3083
        m_allocated_file_ident.ident = 0; // Consumed
×
3084

3085
        // Other messages may be waiting to be sent.
3086
        enlist_to_send();
×
3087
    }
×
3088

3089
    void send_unbound_message()
3090
    {
2,506✔
3091
        // Protocol state must be SendUnbound
3092
        REALM_ASSERT(unbind_message_received());
2,506✔
3093
        REALM_ASSERT(!m_error_message_sent);
2,506✔
3094

3095
        logger.debug("Sending: UNBOUND"); // Throws
2,506✔
3096

3097
        ServerProtocol& protocol = get_server_protocol();
2,506✔
3098
        OutputBuffer& out = m_connection.get_output_buffer();
2,506✔
3099
        protocol.make_unbound_message(out, m_session_ident); // Throws
2,506✔
3100
        m_connection.initiate_write_output_buffer();         // Throws
2,506✔
3101
    }
2,506✔
3102

3103
    void send_error_message()
3104
    {
80✔
3105
        // Protocol state must be SendError
3106
        REALM_ASSERT(!unbind_message_received());
80✔
3107
        REALM_ASSERT(error_occurred());
80✔
3108
        REALM_ASSERT(!m_error_message_sent);
80✔
3109

3110
        REALM_ASSERT(is_session_level_error(m_error_code));
80✔
3111

3112
        ProtocolError error_code = m_error_code;
80✔
3113
        const char* message = get_protocol_error_message(int(error_code));
80✔
3114
        std::size_t message_size = std::strlen(message);
80✔
3115
        bool try_again = determine_try_again(error_code);
80✔
3116

3117
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
80✔
3118
                      try_again); // Throws
80✔
3119

3120
        ServerProtocol& protocol = get_server_protocol();
80✔
3121
        OutputBuffer& out = m_connection.get_output_buffer();
80✔
3122
        int protocol_version = m_connection.get_client_protocol_version();
80✔
3123
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
80✔
3124
                                    m_session_ident); // Throws
80✔
3125
        m_connection.initiate_write_output_buffer();  // Throws
80✔
3126

3127
        m_error_message_sent = true;
80✔
3128
        // Protocol state is now WaitForUnbindErr
3129
    }
80✔
3130

3131
    void send_log_message(util::Logger::Level level, const std::string&& message)
3132
    {
4,266✔
3133
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
4,266✔
3134
            return logger.log(level, message.c_str());
×
3135
        }
×
3136

3137
        m_connection.send_log_message(level, std::move(message), m_session_ident);
4,266✔
3138
    }
4,266✔
3139

3140
    // Idempotent
3141
    void detach_from_server_file() noexcept
3142
    {
8,066✔
3143
        if (!m_server_file)
8,066✔
3144
            return;
2,654✔
3145
        ServerFile& file = *m_server_file;
5,412✔
3146
        if (ident_message_received()) {
5,412✔
3147
            file.remove_identified_session(m_client_file_ident);
4,266✔
3148
        }
4,266✔
3149
        else {
1,146✔
3150
            file.remove_unidentified_session(this);
1,146✔
3151
        }
1,146✔
3152
        if (m_file_ident_request != 0)
5,412✔
3153
            file.cancel_file_ident_request(m_file_ident_request);
54✔
3154
        m_server_file.reset();
5,412✔
3155
    }
5,412✔
3156

3157
    friend class SessionQueue;
3158
};
3159

3160

3161
// ============================ SessionQueue implementation ============================
3162

3163
void SessionQueue::push_back(Session* sess) noexcept
3164
{
108,462✔
3165
    REALM_ASSERT(!sess->m_next);
108,462✔
3166
    if (m_back) {
108,462✔
3167
        sess->m_next = m_back->m_next;
40,682✔
3168
        m_back->m_next = sess;
40,682✔
3169
    }
40,682✔
3170
    else {
67,780✔
3171
        sess->m_next = sess;
67,780✔
3172
    }
67,780✔
3173
    m_back = sess;
108,462✔
3174
}
108,462✔
3175

3176

3177
Session* SessionQueue::pop_front() noexcept
3178
{
156,654✔
3179
    Session* sess = nullptr;
156,654✔
3180
    if (m_back) {
156,654✔
3181
        sess = m_back->m_next;
108,244✔
3182
        if (sess != m_back) {
108,244✔
3183
            m_back->m_next = sess->m_next;
40,600✔
3184
        }
40,600✔
3185
        else {
67,644✔
3186
            m_back = nullptr;
67,644✔
3187
        }
67,644✔
3188
        sess->m_next = nullptr;
108,244✔
3189
    }
108,244✔
3190
    return sess;
156,654✔
3191
}
156,654✔
3192

3193

3194
void SessionQueue::clear() noexcept
3195
{
3,360✔
3196
    if (m_back) {
3,360✔
3197
        Session* sess = m_back;
134✔
3198
        for (;;) {
210✔
3199
            Session* next = sess->m_next;
210✔
3200
            sess->m_next = nullptr;
210✔
3201
            if (next == m_back)
210✔
3202
                break;
134✔
3203
            sess = next;
76✔
3204
        }
76✔
3205
        m_back = nullptr;
134✔
3206
    }
134✔
3207
}
3,360✔
3208

3209

3210
// ============================ ServerFile implementation ============================
3211

3212
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3213
                       std::string real_path, bool disable_sync_to_disk)
3214
    : logger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.logger_ptr}               // Throws
446✔
3215
    , wlogger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.get_worker().logger_ptr} // Throws
446✔
3216
    , m_server{server}
446✔
3217
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
446✔
3218
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
446✔
3219
{
1,136✔
3220
}
1,136✔
3221

3222

3223
ServerFile::~ServerFile() noexcept
3224
{
1,136✔
3225
    REALM_ASSERT(m_unidentified_sessions.empty());
1,136✔
3226
    REALM_ASSERT(m_identified_sessions.empty());
1,136✔
3227
    REALM_ASSERT(m_file_ident_request == 0);
1,136✔
3228
}
1,136✔
3229

3230

3231
void ServerFile::initialize()
3232
{
1,136✔
3233
    const ServerHistory& history = access().history; // Throws
1,136✔
3234
    file_ident_type partial_file_ident = 0;
1,136✔
3235
    version_type partial_progress_reference_version = 0;
1,136✔
3236
    bool has_upstream_sync_status;
1,136✔
3237
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,136✔
3238
                       partial_progress_reference_version); // Throws
1,136✔
3239
    REALM_ASSERT(!has_upstream_sync_status);
1,136✔
3240
    REALM_ASSERT(partial_file_ident == 0);
1,136✔
3241
}
1,136✔
3242

3243

3244
void ServerFile::activate() {}
1,136✔
3245

3246

3247
// This function must be called only after a completed invocation of
3248
// initialize(). Both functinos must only ever be called by the network event
3249
// loop thread.
3250
void ServerFile::register_client_access(file_ident_type) {}
91,318✔
3251

3252

3253
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file, ClientType client_type)
3254
    -> file_ident_request_type
3255
{
1,396✔
3256
    auto request = ++m_last_file_ident_request;
1,396✔
3257
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
1,396✔
3258

3259
    on_work_added(); // Throws
1,396✔
3260
    return request;
1,396✔
3261
}
1,396✔
3262

3263

3264
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3265
{
54✔
3266
    auto i = m_file_ident_requests.find(request);
54✔
3267
    REALM_ASSERT(i != m_file_ident_requests.end());
54✔
3268
    FileIdentRequestInfo& info = i->second;
54✔
3269
    REALM_ASSERT(info.receiver);
54✔
3270
    info.receiver = nullptr;
54✔
3271
}
54✔
3272

3273

3274
void ServerFile::add_unidentified_session(Session* sess)
3275
{
5,412✔
3276
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
5,412✔
3277
    m_unidentified_sessions.insert(sess); // Throws
5,412✔
3278
}
5,412✔
3279

3280

3281
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3282
{
4,262✔
3283
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
4,262✔
3284
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
4,262✔
3285

3286
    m_identified_sessions[client_file_ident] = sess; // Throws
4,262✔
3287
    m_unidentified_sessions.erase(sess);
4,262✔
3288
}
4,262✔
3289

3290

3291
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3292
{
1,144✔
3293
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
1,144✔
3294
    m_unidentified_sessions.erase(sess);
1,144✔
3295
}
1,144✔
3296

3297

3298
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3299
{
4,266✔
3300
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
4,266✔
3301
    m_identified_sessions.erase(client_file_ident);
4,266✔
3302
}
4,266✔
3303

3304

3305
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3306
{
4,268✔
3307
    auto i = m_identified_sessions.find(client_file_ident);
4,268✔
3308
    if (i == m_identified_sessions.end())
4,268✔
3309
        return nullptr;
4,266✔
3310
    return i->second;
2✔
3311
}
4,268✔
3312

3313
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3314
{
46,612✔
3315
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
46,612✔
3316
}
46,612✔
3317

3318

3319
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3320
                                                version_type locked_server_version, const UploadChangeset* changesets,
3321
                                                std::size_t num_changesets)
3322
{
46,334✔
3323
    register_client_access(client_file_ident); // Throws
46,334✔
3324

3325
    bool dirty = false;
46,334✔
3326

3327
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
46,334✔
3328
    std::size_t num_bytes = 0;
46,334✔
3329
    for (std::size_t i = 0; i < num_changesets; ++i) {
80,746✔
3330
        const UploadChangeset& uc = changesets[i];
34,412✔
3331
        auto& changesets = list.changesets;
34,412✔
3332
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,412✔
3333
                                uc.changeset); // Throws
34,412✔
3334
        num_bytes += uc.changeset.size();
34,412✔
3335
        dirty = true;
34,412✔
3336
    }
34,412✔
3337

3338
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
46,334✔
3339
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
46,334✔
3340
    if (upload_progress.client_version > list.upload_progress.client_version) {
46,334✔
3341
        list.upload_progress = upload_progress;
46,332✔
3342
        dirty = true;
46,332✔
3343
    }
46,332✔
3344

3345
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
46,334✔
3346
    if (locked_server_version > list.locked_server_version) {
46,334✔
3347
        list.locked_server_version = locked_server_version;
39,498✔
3348
        dirty = true;
39,498✔
3349
    }
39,498✔
3350

3351
    if (REALM_LIKELY(dirty)) {
46,336✔
3352
        if (num_changesets > 0) {
46,334✔
3353
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
24,850✔
3354
        }
24,850✔
3355
        else {
21,484✔
3356
            on_work_added(); // Throws
21,484✔
3357
        }
21,484✔
3358
    }
46,334✔
3359
}
46,334✔
3360

3361

3362
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3363
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3364
                                                    ClientType client_type, UploadCursor& upload_progress,
3365
                                                    version_type& locked_server_version, Logger& logger)
3366
{
4,298✔
3367
    // The Realm file may contain a later snapshot than the one reflected by
3368
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
3369
    if (server_version.version > m_version_info.sync_version.version)
4,298✔
3370
        return BootstrapError::bad_server_version;
20✔
3371

3372
    const ServerHistory& hist = access().history; // Throws
4,278✔
3373
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
4,278✔
3374
                                                         client_type, upload_progress, locked_server_version,
4,278✔
3375
                                                         logger); // Throws
4,278✔
3376

3377
    // FIXME: Rather than taking previously buffered changesets from the same
3378
    // client file into account when determining the upload progress, and then
3379
    // allowing for an error during the integration of those changesets to be
3380
    // reported to, and terminate the new session, consider to instead postpone
3381
    // the bootstrapping of the new session until all previously buffered
3382
    // changesets from same client file have been fully processed.
3383

3384
    if (error == BootstrapError::no_error) {
4,278✔
3385
        register_client_access(client_file_ident.ident); // Throws
4,266✔
3386

3387
        // If upload, or releaseing of server versions progressed further during
3388
        // previous sessions than the persisted points, take that into account
3389
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
4,266✔
3390
        if (i != m_work.changesets_from_downstream.end()) {
4,266✔
3391
            const IntegratableChangesetList& list = i->second;
1,340✔
3392
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,340✔
3393
            upload_progress = list.upload_progress;
1,340✔
3394
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,340✔
3395
            locked_server_version = list.locked_server_version;
1,340✔
3396
        }
1,340✔
3397
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
4,266✔
3398
        if (j != m_changesets_from_downstream.end()) {
4,266✔
3399
            const IntegratableChangesetList& list = j->second;
74✔
3400
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
74✔
3401
            upload_progress = list.upload_progress;
74✔
3402
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
74✔
3403
            locked_server_version = list.locked_server_version;
74✔
3404
        }
74✔
3405
    }
4,266✔
3406

3407
    return error;
4,278✔
3408
}
4,298✔
3409

3410
// NOTE: This function is executed by the worker thread
3411
void ServerFile::worker_process_work_unit(WorkerState& state)
3412
{
37,942✔
3413
    SteadyTimePoint start_time = steady_clock_now();
37,942✔
3414
    milliseconds_type parallel_time = 0;
37,942✔
3415

3416
    Work& work = m_work;
37,942✔
3417
    wlogger.debug("Work unit execution started"); // Throws
37,942✔
3418

3419
    if (work.has_primary_work) {
37,942✔
3420
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
37,938✔
3421
            worker_allocate_file_identifiers(); // Throws
1,330✔
3422

3423
        if (!m_work.changesets_from_downstream.empty())
37,938✔
3424
            worker_integrate_changes_from_downstream(state); // Throws
36,622✔
3425
    }
37,938✔
3426

3427
    wlogger.debug("Work unit execution completed"); // Throws
37,942✔
3428

3429
    milliseconds_type time = steady_duration(start_time);
37,942✔
3430
    milliseconds_type seq_time = time - parallel_time;
37,942✔
3431
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
37,942✔
3432
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
37,942✔
3433

3434
    // Pass control back to the network event loop thread
3435
    network::Service& service = m_server.get_service();
37,942✔
3436
    service.post([this](Status) {
37,942✔
3437
        // FIXME: The safety of capturing `this` here, relies on the fact
3438
        // that ServerFile objects currently are not destroyed until the
3439
        // server object is destroyed.
3440
        group_postprocess_stage_1(); // Throws
37,568✔
3441
        // Suicide may have happened at this point
3442
    }); // Throws
37,568✔
3443
}
37,942✔
3444

3445

3446
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3447
{
24,848✔
3448
    m_num_changesets_from_downstream += num_changesets;
24,848✔
3449

3450
    if (num_bytes > 0) {
24,848✔
3451
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
24,848✔
3452
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
24,848✔
3453
    }
24,848✔
3454

3455
    on_work_added(); // Throws
24,848✔
3456
}
24,848✔
3457

3458

3459
void ServerFile::on_work_added()
3460
{
47,734✔
3461
    if (m_has_blocked_work)
47,734✔
3462
        return;
9,680✔
3463
    m_has_blocked_work = true;
38,054✔
3464
    // Reference file
3465
    if (m_has_work_in_progress)
38,054✔
3466
        return;
14,212✔
3467
    group_unblock_work(); // Throws
23,842✔
3468
}
23,842✔
3469

3470

3471
void ServerFile::group_unblock_work()
3472
{
37,978✔
3473
    REALM_ASSERT(!m_has_work_in_progress);
37,978✔
3474
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
37,978✔
3475
        unblock_work(); // Throws
37,976✔
3476
        const Work& work = m_work;
37,976✔
3477
        if (REALM_LIKELY(work.has_primary_work)) {
37,976✔
3478
            logger.trace("Work unit unblocked"); // Throws
37,970✔
3479
            m_has_work_in_progress = true;
37,970✔
3480
            Worker& worker = m_server.get_worker();
37,970✔
3481
            worker.enqueue(this); // Throws
37,970✔
3482
        }
37,970✔
3483
    }
37,976✔
3484
}
37,978✔
3485

3486

3487
void ServerFile::unblock_work()
3488
{
37,976✔
3489
    REALM_ASSERT(m_has_blocked_work);
37,976✔
3490

3491
    m_work.reset();
37,976✔
3492

3493
    // Discard requests for file identifiers whose receiver is no longer
3494
    // waiting.
3495
    {
37,976✔
3496
        auto i = m_file_ident_requests.begin();
37,976✔
3497
        auto end = m_file_ident_requests.end();
37,976✔
3498
        while (i != end) {
39,372✔
3499
            auto j = i++;
1,396✔
3500
            const FileIdentRequestInfo& info = j->second;
1,396✔
3501
            if (!info.receiver)
1,396✔
3502
                m_file_ident_requests.erase(j);
2✔
3503
        }
1,396✔
3504
    }
37,976✔
3505
    std::size_t n = m_file_ident_requests.size();
37,976✔
3506
    if (n > 0) {
37,976✔
3507
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,342✔
3508
        std::size_t i = 0;
1,342✔
3509
        for (const auto& pair : m_file_ident_requests) {
1,392✔
3510
            const FileIdentRequestInfo& info = pair.second;
1,392✔
3511
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,392✔
3512
            slot.proxy_file = info.proxy_file;
1,392✔
3513
            slot.client_type = info.client_type;
1,392✔
3514
            ++i;
1,392✔
3515
        }
1,392✔
3516
        m_work.has_primary_work = true;
1,342✔
3517
    }
1,342✔
3518

3519
    // FIXME: `ServerFile::m_changesets_from_downstream` and
3520
    // `Work::changesets_from_downstream` should be renamed to something else,
3521
    // as it may contain kinds of data other than changesets.
3522

3523
    using std::swap;
37,976✔
3524
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
37,976✔
3525
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
37,976✔
3526
    bool has_changesets = !m_work.changesets_from_downstream.empty();
37,976✔
3527
    if (has_changesets) {
37,976✔
3528
        m_work.has_primary_work = true;
36,640✔
3529
    }
36,640✔
3530

3531
    // Keep track of the size of pending changesets
3532
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
37,976✔
3533
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
37,976✔
3534
    m_blocked_changesets_from_downstream_byte_size = 0;
37,976✔
3535

3536
    m_num_changesets_from_downstream = 0;
37,976✔
3537
    m_has_blocked_work = false;
37,976✔
3538
}
37,976✔
3539

3540

3541
void ServerFile::resume_download() noexcept
3542
{
23,028✔
3543
    for (const auto& entry : m_identified_sessions) {
36,568✔
3544
        Session& sess = *entry.second;
36,568✔
3545
        sess.ensure_enlisted_to_send();
36,568✔
3546
    }
36,568✔
3547
}
23,028✔
3548

3549

3550
void ServerFile::recognize_external_change()
3551
{
4,800✔
3552
    VersionInfo prev_version_info = m_version_info;
4,800✔
3553
    const ServerHistory& history = access().history;       // Throws
4,800✔
3554
    bool has_upstream_status;                              // Dummy
4,800✔
3555
    sync::file_ident_type partial_file_ident;              // Dummy
4,800✔
3556
    sync::version_type partial_progress_reference_version; // Dummy
4,800✔
3557
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,800✔
3558
                       partial_progress_reference_version); // Throws
4,800✔
3559

3560
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,800✔
3561
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,800✔
3562
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,800✔
3563
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,800✔
3564
        resume_download();
4,800✔
3565
    }
4,800✔
3566
}
4,800✔
3567

3568

3569
// NOTE: This function is executed by the worker thread
3570
void ServerFile::worker_allocate_file_identifiers()
3571
{
1,330✔
3572
    Work& work = m_work;
1,330✔
3573
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,330✔
3574
    ServerHistory& hist = worker_access().history;                                      // Throws
1,330✔
3575
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,330✔
3576
    m_work.produced_new_realm_version = true;
1,330✔
3577
}
1,330✔
3578

3579

3580
// Returns true when, and only when this function produces a new sync version
3581
// (adds a new entry to the sync history).
3582
//
3583
// NOTE: This function is executed by the worker thread
3584
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3585
{
36,622✔
3586
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
36,622✔
3587

3588
    std::unique_ptr<ServerHistory> hist_ptr;
36,622✔
3589
    DBRef sg_ptr;
36,622✔
3590
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
36,622✔
3591
    bool backup_whole_realm = false;
36,622✔
3592
    bool produced_new_realm_version = hist.integrate_client_changesets(
36,622✔
3593
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
36,622✔
3594
        wlogger); // Throws
36,622✔
3595
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
36,622✔
3596
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
36,622✔
3597
    if (produced_new_realm_version) {
36,622✔
3598
        m_work.produced_new_realm_version = true;
36,604✔
3599
        if (produced_new_sync_version) {
36,604✔
3600
            m_work.produced_new_sync_version = true;
18,242✔
3601
        }
18,242✔
3602
    }
36,604✔
3603
    return produced_new_sync_version;
36,622✔
3604
}
36,622✔
3605

3606
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3607
                                                   DBRef& sg_ptr)
3608
{
36,620✔
3609
    if (state.use_file_cache)
36,620✔
3610
        return worker_access().history; // Throws
36,618✔
3611
    const std::string& path = m_worker_file.realm_path;
2✔
3612
    hist_ptr = m_server.make_history_for_path();                   // Throws
2✔
3613
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
2✔
3614
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
2✔
3615
    sg_ptr->claim_sync_agent();                                    // Throws
2✔
3616
    return *hist_ptr;                                              // Throws
2✔
3617
}
36,620✔
3618

3619

3620
// When worker thread finishes work unit.
3621
void ServerFile::group_postprocess_stage_1()
3622
{
37,568✔
3623
    REALM_ASSERT(m_has_work_in_progress);
37,568✔
3624

3625
    group_finalize_work_stage_1(); // Throws
37,568✔
3626
    group_finalize_work_stage_2(); // Throws
37,568✔
3627
    group_postprocess_stage_2();   // Throws
37,568✔
3628
}
37,568✔
3629

3630

3631
void ServerFile::group_postprocess_stage_2()
3632
{
37,570✔
3633
    REALM_ASSERT(m_has_work_in_progress);
37,570✔
3634
    group_postprocess_stage_3(); // Throws
37,570✔
3635
    // Suicide may have happened at this point
3636
}
37,570✔
3637

3638

3639
// When all files, including the reference file, have been backed up.
3640
void ServerFile::group_postprocess_stage_3()
3641
{
37,570✔
3642
    REALM_ASSERT(m_has_work_in_progress);
37,570✔
3643
    m_has_work_in_progress = false;
37,570✔
3644

3645
    logger.trace("Work unit postprocessing complete"); // Throws
37,570✔
3646
    if (m_has_blocked_work)
37,570✔
3647
        group_unblock_work(); // Throws
14,140✔
3648
}
37,570✔
3649

3650

3651
void ServerFile::finalize_work_stage_1()
3652
{
37,568✔
3653
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
37,568✔
3654
        // Report the byte size of completed downstream changesets.
3655
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
18,250✔
3656
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
18,250✔
3657
        m_unblocked_changesets_from_downstream_byte_size = 0;
18,250✔
3658
    }
18,250✔
3659

3660
    // Deal with errors (bad changesets) pertaining to downstream clients
3661
    std::size_t num_changesets_removed = 0;
37,568✔
3662
    std::size_t num_bytes_removed = 0;
37,568✔
3663
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
37,568✔
3664
        file_ident_type client_file_ident = entry.first;
20✔
3665
        ExtendedIntegrationError error = entry.second;
20✔
3666
        ProtocolError error_2 = ProtocolError::other_session_error;
20✔
3667
        switch (error) {
20✔
3668
            case ExtendedIntegrationError::client_file_expired:
✔
3669
                logger.debug("Changeset integration failed: Client file entry "
×
3670
                             "expired during session"); // Throws
×
3671
                error_2 = ProtocolError::client_file_expired;
×
3672
                break;
×
3673
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3674
                error_2 = ProtocolError::bad_origin_file_ident;
×
3675
                break;
×
3676
            case ExtendedIntegrationError::bad_changeset:
20✔
3677
                error_2 = ProtocolError::bad_changeset;
20✔
3678
                break;
20✔
3679
        }
20✔
3680
        auto i = m_identified_sessions.find(client_file_ident);
20✔
3681
        if (i != m_identified_sessions.end()) {
20✔
3682
            Session& sess = *i->second;
20✔
3683
            SyncConnection& conn = sess.get_connection();
20✔
3684
            conn.protocol_error(error_2, &sess); // Throws
20✔
3685
        }
20✔
3686
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
20✔
3687
        std::size_t num_changesets = list.changesets.size();
20✔
3688
        std::size_t num_bytes = 0;
20✔
3689
        for (const IntegratableChangeset& ic : list.changesets)
20✔
3690
            num_bytes += ic.changeset.size();
×
3691
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
20✔
3692
                    client_file_ident); // Throws
20✔
3693
        num_changesets_removed += num_changesets;
20✔
3694
        num_bytes_removed += num_bytes;
20✔
3695
        m_changesets_from_downstream.erase(client_file_ident);
20✔
3696
    }
20✔
3697

3698
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
37,568✔
3699
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
37,568✔
3700

3701
    if (num_changesets_removed == 0)
37,568✔
3702
        return;
37,568✔
3703

3704
    m_num_changesets_from_downstream -= num_changesets_removed;
×
3705

3706
    // The byte size of the blocked changesets must be decremented.
3707
    if (num_bytes_removed > 0) {
×
3708
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3709
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3710
    }
×
3711
}
×
3712

3713

3714
void ServerFile::finalize_work_stage_2()
3715
{
37,568✔
3716
    // Expose new snapshot to remote peers
3717
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
37,568✔
3718
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
37,568✔
3719
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
37,546✔
3720
        m_version_info = m_work.version_info;
37,546✔
3721
    }
37,546✔
3722

3723
    bool resume_download_and_upload = m_work.produced_new_sync_version;
37,568✔
3724

3725
    // Deliver allocated file identifiers to requesters
3726
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
37,568✔
3727
    auto begin = m_file_ident_requests.begin();
37,568✔
3728
    auto i = begin;
37,568✔
3729
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
37,568✔
3730
        FileIdentRequestInfo& info = i->second;
1,368✔
3731
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,368✔
3732
        REALM_ASSERT(info.client_type == slot.client_type);
1,368✔
3733
        if (FileIdentReceiver* receiver = info.receiver) {
1,368✔
3734
            info.receiver = nullptr;
1,342✔
3735
            receiver->receive_file_ident(slot.file_ident); // Throws
1,342✔
3736
        }
1,342✔
3737
        ++i;
1,368✔
3738
    }
1,368✔
3739
    m_file_ident_requests.erase(begin, i);
37,568✔
3740

3741
    // Resume download to downstream clients
3742
    if (resume_download_and_upload) {
37,568✔
3743
        resume_download();
18,230✔
3744
    }
18,230✔
3745
}
37,568✔
3746

3747
// ============================ Worker implementation ============================
3748

3749
Worker::Worker(ServerImpl& server)
3750
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, "Worker: ", server.logger_ptr)}
548✔
3751
    // Throws
3752
    , logger(*logger_ptr)
548✔
3753
    , m_server{server}
548✔
3754
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
548✔
3755
{
1,200✔
3756
    util::seed_prng_nondeterministically(m_random); // Throws
1,200✔
3757
}
1,200✔
3758

3759

3760
void Worker::enqueue(ServerFile* file)
3761
{
37,980✔
3762
    util::LockGuard lock{m_mutex};
37,980✔
3763
    m_queue.push_back(file); // Throws
37,980✔
3764
    m_cond.notify_all();
37,980✔
3765
}
37,980✔
3766

3767

3768
std::mt19937_64& Worker::server_history_get_random() noexcept
3769
{
2,418✔
3770
    return m_random;
2,418✔
3771
}
2,418✔
3772

3773

3774
void Worker::run()
3775
{
1,144✔
3776
    for (;;) {
39,084✔
3777
        ServerFile* file = nullptr;
39,084✔
3778
        {
39,084✔
3779
            util::LockGuard lock{m_mutex};
39,084✔
3780
            for (;;) {
77,548✔
3781
                if (REALM_UNLIKELY(m_stop))
77,548✔
3782
                    return;
1,144✔
3783
                if (!m_queue.empty()) {
76,404✔
3784
                    file = m_queue.front();
37,942✔
3785
                    m_queue.pop_front();
37,942✔
3786
                    break;
37,942✔
3787
                }
37,942✔
3788
                m_cond.wait(lock);
38,462✔
3789
            }
38,462✔
3790
        }
39,084✔
3791
        file->worker_process_work_unit(m_state); // Throws
37,940✔
3792
    }
37,940✔
3793
}
1,144✔
3794

3795

3796
void Worker::stop() noexcept
3797
{
1,144✔
3798
    util::LockGuard lock{m_mutex};
1,144✔
3799
    m_stop = true;
1,144✔
3800
    m_cond.notify_all();
1,144✔
3801
}
1,144✔
3802

3803

3804
// ============================ ServerImpl implementation ============================
3805

3806
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3807
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::server, std::move(config.logger))}
548✔
3808
    , logger{*logger_ptr}
548✔
3809
    , m_config{std::move(config)}
548✔
3810
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
548✔
3811
    , m_root_dir{root_dir} // Throws
548✔
3812
    , m_access_control{std::move(pkey)}
548✔
3813
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
548✔
3814
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
548✔
3815
    , m_worker{*this}                                                                    // Throws
548✔
3816
    , m_acceptor{get_service()}
548✔
3817
    , m_server_protocol{}       // Throws
548✔
3818
    , m_compress_memory_arena{} // Throws
548✔
3819
{
1,200✔
3820
    if (m_config.ssl) {
1,200✔
3821
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3822
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3823
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3824
    }
24✔
3825
}
1,200✔
3826

3827

3828
ServerImpl::~ServerImpl() noexcept
3829
{
1,200✔
3830
    bool server_destroyed_while_still_running = m_running;
1,200✔
3831
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
1,200✔
3832
}
1,200✔
3833

3834

3835
void ServerImpl::start()
3836
{
1,200✔
3837
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
1,200✔
3838
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
1,200✔
3839
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
1,200✔
3840
                m_protocol_version_range.first,
1,200✔
3841
                m_protocol_version_range.second); // Throws
1,200✔
3842
    logger.info("Platform: %1", util::get_platform_info());
1,200✔
3843
    bool is_debug_build = false;
1,200✔
3844
#if REALM_DEBUG
1,200✔
3845
    is_debug_build = true;
1,200✔
3846
#endif
1,200✔
3847
    {
1,200✔
3848
        const char* lead_text = "Build mode";
1,200✔
3849
        if (is_debug_build) {
1,200✔
3850
            logger.info("%1: Debug", lead_text); // Throws
1,200✔
3851
        }
1,200✔
3852
        else {
×
3853
            logger.info("%1: Release", lead_text); // Throws
×
3854
        }
×
3855
    }
1,200✔
3856
    if (is_debug_build) {
1,200✔
3857
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
1,200✔
3858
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
1,200✔
3859
    }
1,200✔
3860
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
1,200✔
3861
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
1,200✔
3862
    {
1,200✔
3863
        const char* lead_text = "Encryption";
1,200✔
3864
        if (m_config.encryption_key) {
1,200✔
3865
            logger.info("%1: Yes", lead_text); // Throws
4✔
3866
        }
4✔
3867
        else {
1,196✔
3868
            logger.info("%1: No", lead_text); // Throws
1,196✔
3869
        }
1,196✔
3870
    }
1,200✔
3871
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
1,200✔
3872
    {
1,200✔
3873
        const char* lead_text = "Disable sync to disk";
1,200✔
3874
        if (m_config.disable_sync_to_disk) {
1,200✔
3875
            logger.info("%1: All files", lead_text); // Throws
496✔
3876
        }
496✔
3877
        else {
704✔
3878
            logger.info("%1: No", lead_text); // Throws
704✔
3879
        }
704✔
3880
    }
1,200✔
3881
    if (m_config.disable_sync_to_disk) {
1,200✔
3882
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
496✔
3883
                    "never do this in production!"); // Throws
496✔
3884
    }
496✔
3885
    logger.info("Download compaction: %1",
1,200✔
3886
                (m_config.disable_download_compaction ? "No" : "Yes")); // Throws
1,200✔
3887
    logger.info("Download bootstrap caching: %1",
1,200✔
3888
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
1,200✔
3889
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
1,200✔
3890
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
1,200✔
3891
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
1,200✔
3892
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
1,200✔
3893
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
1,200✔
3894
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
1,200✔
3895
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
1,200✔
3896
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
1,200✔
3897

3898
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
1,200✔
3899

3900
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
1,200✔
3901

3902
    listen(); // Throws
1,200✔
3903
}
1,200✔
3904

3905

3906
void ServerImpl::run()
3907
{
1,144✔
3908
    auto ta = util::make_temp_assign(m_running, true);
1,144✔
3909

3910
    {
1,144✔
3911
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
1,144✔
3912
        std::string name;
1,144✔
3913
        if (util::Thread::get_name(name)) {
1,144✔
3914
            name += "-worker";
624✔
3915
            worker_thread.start_with_signals_blocked(name); // Throws
624✔
3916
        }
624✔
3917
        else {
520✔
3918
            worker_thread.start_with_signals_blocked(); // Throws
520✔
3919
        }
520✔
3920

3921
        m_service.run(); // Throws
1,144✔
3922

3923
        worker_thread.stop_and_rethrow(); // Throws
1,144✔
3924
    }
1,144✔
3925

3926
    logger.info("Realm sync server stopped");
1,144✔
3927
}
1,144✔
3928

3929

3930
void ServerImpl::stop() noexcept
3931
{
2,052✔
3932
    util::LockGuard lock{m_mutex};
2,052✔
3933
    if (m_stopped)
2,052✔
3934
        return;
852✔
3935
    m_stopped = true;
1,200✔
3936
    m_wait_or_service_stopped_cond.notify_all();
1,200✔
3937
    m_service.stop();
1,200✔
3938
}
1,200✔
3939

3940

3941
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3942
{
24,846✔
3943
    m_pending_changesets_from_downstream_byte_size += byte_size;
24,846✔
3944
    logger.debug("Byte size for pending downstream changesets incremented by "
24,846✔
3945
                 "%1 to reach a total of %2",
24,846✔
3946
                 byte_size,
24,846✔
3947
                 m_pending_changesets_from_downstream_byte_size); // Throws
24,846✔
3948
}
24,846✔
3949

3950

3951
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3952
{
18,248✔
3953
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
18,248✔
3954
    m_pending_changesets_from_downstream_byte_size -= byte_size;
18,248✔
3955
    logger.debug("Byte size for pending downstream changesets decremented by "
18,248✔
3956
                 "%1 to reach a total of %2",
18,248✔
3957
                 byte_size,
18,248✔
3958
                 m_pending_changesets_from_downstream_byte_size); // Throws
18,248✔
3959
}
18,248✔
3960

3961

3962
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3963
{
1,136✔
3964
    return get_random();
1,136✔
3965
}
1,136✔
3966

3967

3968
void ServerImpl::listen()
3969
{
1,200✔
3970
    network::Resolver resolver{get_service()};
1,200✔
3971
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
1,200✔
3972
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
1,200✔
3973
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
1,200✔
3974

3975
    auto i = endpoints.begin();
1,200✔
3976
    auto end = endpoints.end();
1,200✔
3977
    for (;;) {
1,200✔
3978
        std::error_code ec;
1,200✔
3979
        m_acceptor.open(i->protocol(), ec);
1,200✔
3980
        if (!ec) {
1,200✔
3981
            using SocketBase = network::SocketBase;
1,200✔
3982
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
1,200✔
3983
            if (!ec) {
1,200✔
3984
                m_acceptor.bind(*i, ec);
1,200✔
3985
                if (!ec)
1,200✔
3986
                    break;
1,200✔
3987
            }
1,200✔
3988
            m_acceptor.close();
×
3989
        }
×
3990
        if (i + 1 == end) {
×
3991
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
3992
                // FIXME: We don't have the error code for previous attempts, so
3993
                // can't print a nice message.
3994
                logger.error("Failed to bind to %1:%2", i2->address(),
×
3995
                             i2->port()); // Throws
×
3996
            }
×
3997
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
3998
                         ec.message()); // Throws
×
3999
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
4000
        }
×
4001
    }
×
4002

4003
    m_acceptor.listen(m_config.listen_backlog);
1,200✔
4004

4005
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
1,200✔
4006
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
1,200✔
4007
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
1,200✔
4008
                m_config.listen_backlog, ssl_mode); // Throws
1,200✔
4009

4010
    initiate_accept();
1,200✔
4011
}
1,200✔
4012

4013

4014
void ServerImpl::initiate_accept()
4015
{
3,328✔
4016
    auto handler = [this](std::error_code ec) {
3,328✔
4017
        if (ec != util::error::operation_aborted)
2,128✔
4018
            handle_accept(ec);
2,128✔
4019
    };
2,128✔
4020
    bool is_ssl = bool(m_ssl_context);
3,328✔
4021
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
3,328✔
4022
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
3,328✔
4023
}
3,328✔
4024

4025

4026
void ServerImpl::handle_accept(std::error_code ec)
4027
{
2,128✔
4028
    if (ec) {
2,128✔
4029
        if (ec != util::error::connection_aborted) {
×
4030
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4031

4032
            // We close the reserved files to get a few extra file descriptors.
4033
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4034
                m_reserved_files[i].reset();
×
4035
            }
×
4036

4037
            // FIXME: There are probably errors that need to be treated
4038
            // specially, and not cause the server to "crash".
4039

4040
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4041
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4042
                             "consider increasing the limit in your system config"); // Throws
×
4043
                throw OutOfFilesError(ec);
×
4044
            }
×
4045
            else {
×
4046
                throw std::system_error(ec);
×
4047
            }
×
4048
        }
×
4049
        logger.debug("Skipping aborted connection"); // Throws
×
4050
    }
×
4051
    else {
2,128✔
4052
        HTTPConnection& conn = *m_next_http_conn;
2,128✔
4053
        if (m_config.tcp_no_delay)
2,128✔
4054
            conn.get_socket().set_option(network::SocketBase::no_delay(true));  // Throws
1,780✔
4055
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn)); // Throws
2,128✔
4056
        Formatter& formatter = m_misc_buffers.formatter;
2,128✔
4057
        formatter.reset();
2,128✔
4058
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
2,128✔
4059
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
2,128✔
4060
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
2,128✔
4061
    }
2,128✔
4062
    initiate_accept(); // Throws
2,128✔
4063
}
2,128✔
4064

4065

4066
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4067
{
2,128✔
4068
    m_http_connections.erase(conn_id);
2,128✔
4069
}
2,128✔
4070

4071

4072
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4073
{
2,084✔
4074
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
2,084✔
4075
}
2,084✔
4076

4077

4078
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4079
{
1,244✔
4080
    m_sync_connections.erase(connection_id);
1,244✔
4081
}
1,244✔
4082

4083

4084
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4085
{
4✔
4086
    get_service().post([this, timeout](Status) {
4✔
4087
        m_config.connection_reaper_timeout = timeout;
4✔
4088
    });
4✔
4089
}
4✔
4090

4091

4092
void ServerImpl::close_connections()
4093
{
16✔
4094
    get_service().post([this](Status) {
16✔
4095
        do_close_connections(); // Throws
16✔
4096
    });
16✔
4097
}
16✔
4098

4099

4100
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4101
{
72✔
4102
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4103
}
72✔
4104

4105

4106
void ServerImpl::recognize_external_change(const std::string& virt_path)
4107
{
4,800✔
4108
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4109
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4110
        do_recognize_external_change(virt_path); // Throws
4,800✔
4111
    });                                          // Throws
4,800✔
4112
}
4,800✔
4113

4114

4115
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4116
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4117
{
×
4118
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4119
                "timeout = %1",
×
4120
                timeout); // Throws
×
4121

4122
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4123
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4124
                                                    timeout); // Throws
×
4125
    });
×
4126
}
×
4127

4128

4129
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4130
{
1,342✔
4131
    m_connection_reaper_timer.emplace(get_service());
1,342✔
4132
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
1,342✔
4133
        if (status != ErrorCodes::OperationAborted) {
144✔
4134
            reap_connections();                        // Throws
144✔
4135
            initiate_connection_reaper_timer(timeout); // Throws
144✔
4136
        }
144✔
4137
    }); // Throws
144✔
4138
}
1,342✔
4139

4140

4141
void ServerImpl::reap_connections()
4142
{
144✔
4143
    logger.debug("Discarding dead connections"); // Throws
144✔
4144
    SteadyTimePoint now = steady_clock_now();
144✔
4145
    {
144✔
4146
        auto end = m_http_connections.end();
144✔
4147
        auto i = m_http_connections.begin();
144✔
4148
        while (i != end) {
146✔
4149
            HTTPConnection& conn = *i->second;
2✔
4150
            ++i;
2✔
4151
            // Suicide
4152
            conn.terminate_if_dead(now); // Throws
2✔
4153
        }
2✔
4154
    }
144✔
4155
    {
144✔
4156
        auto end = m_sync_connections.end();
144✔
4157
        auto i = m_sync_connections.begin();
144✔
4158
        while (i != end) {
284✔
4159
            SyncConnection& conn = *i->second;
140✔
4160
            ++i;
140✔
4161
            // Suicide
4162
            conn.terminate_if_dead(now); // Throws
140✔
4163
        }
140✔
4164
    }
144✔
4165
}
144✔
4166

4167

4168
void ServerImpl::do_close_connections()
4169
{
16✔
4170
    for (auto& entry : m_sync_connections) {
16✔
4171
        SyncConnection& conn = *entry.second;
16✔
4172
        conn.initiate_soft_close(); // Throws
16✔
4173
    }
16✔
4174
}
16✔
4175

4176

4177
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4178
{
4,800✔
4179
    auto i = m_files.find(virt_path);
4,800✔
4180
    if (i == m_files.end())
4,800✔
4181
        return;
×
4182
    ServerFile& file = *i->second;
4,800✔
4183
    file.recognize_external_change();
4,800✔
4184
}
4,800✔
4185

4186

4187
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4188
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4189
{
×
4190
    static_cast<void>(timeout);
×
4191
    if (m_sync_stopped)
×
4192
        return;
×
4193
    do_close_connections(); // Throws
×
4194
    m_sync_stopped = true;
×
4195
    bool completion_reached = false;
×
4196
    completion_handler(completion_reached); // Throws
×
4197
}
×
4198

4199

4200
// ============================ SyncConnection implementation ============================
4201

4202
SyncConnection::~SyncConnection() noexcept
4203
{
2,084✔
4204
    m_sessions_enlisted_to_send.clear();
2,084✔
4205
    m_sessions.clear();
2,084✔
4206
}
2,084✔
4207

4208

4209
void SyncConnection::initiate()
4210
{
2,084✔
4211
    m_last_activity_at = steady_clock_now();
2,084✔
4212
    logger.debug("Sync Connection initiated");
2,084✔
4213
    m_websocket.initiate_server_websocket_after_handshake();
2,084✔
4214
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
2,084✔
4215
                     m_appservices_request_id);
2,084✔
4216
}
2,084✔
4217

4218

4219
template <class... Params>
4220
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4221
{
1,244✔
4222
    terminate_sessions();                              // Throws
1,244✔
4223
    logger.log(log_level, log_message, log_params...); // Throws
1,244✔
4224
    m_websocket.stop();
1,244✔
4225
    m_ssl_stream.reset();
1,244✔
4226
    m_socket.reset();
1,244✔
4227
    // Suicide
4228
    m_server.remove_sync_connection(m_id);
1,244✔
4229
}
1,244✔
4230

4231

4232
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4233
{
140✔
4234
    milliseconds_type time = steady_duration(m_last_activity_at, now);
140✔
4235
    const Server::Config& config = m_server.get_config();
140✔
4236
    if (m_is_closing) {
140✔
4237
        if (time >= config.soft_close_timeout) {
×
4238
            // Suicide
4239
            terminate(Logger::Level::detail,
×
4240
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4241
        }
×
4242
    }
×
4243
    else {
140✔
4244
        if (time >= config.connection_reaper_timeout) {
140✔
4245
            // Suicide
4246
            terminate(Logger::Level::detail,
4✔
4247
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4248
        }
4✔
4249
    }
140✔
4250
}
140✔
4251

4252

4253
void SyncConnection::enlist_to_send(Session* sess) noexcept
4254
{
108,462✔
4255
    REALM_ASSERT(m_send_trigger);
108,462✔
4256
    REALM_ASSERT(!m_is_closing);
108,462✔
4257
    REALM_ASSERT(!sess->is_enlisted_to_send());
108,462✔
4258
    m_sessions_enlisted_to_send.push_back(sess);
108,462✔
4259
    m_send_trigger->trigger();
108,462✔
4260
}
108,462✔
4261

4262

4263
void SyncConnection::handle_protocol_error(Status status)
4264
{
×
4265
    logger.error("%1", status);
×
4266
    switch (status.code()) {
×
4267
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4268
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4269
            break;
×
4270
        case ErrorCodes::LimitExceeded:
×
4271
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4272
            break;
×
4273
        default:
×
4274
            protocol_error(ProtocolError::other_error);
×
4275
            break;
×
4276
    }
×
4277
}
×
4278

4279
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4280
                                          std::string signed_user_token, bool need_client_file_ident,
4281
                                          bool is_subserver)
4282
{
5,436✔
4283
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
5,436✔
4284
    bool was_inserted = p.second;
5,436✔
4285
    if (REALM_UNLIKELY(!was_inserted)) {
5,436✔
4286
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4287
                     session_ident);                           // Throws
×
4288
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4289
        return;
×
4290
    }
×
4291
    try {
5,436✔
4292
        p.first->second.reset(new Session(*this, session_ident)); // Throws
5,436✔
4293
    }
5,436✔
4294
    catch (...) {
5,436✔
4295
        m_sessions.erase(p.first);
×
4296
        throw;
×
4297
    }
×
4298

4299
    Session& sess = *p.first->second;
5,440✔
4300
    sess.initiate(); // Throws
5,440✔
4301
    ProtocolError error;
5,440✔
4302
    bool success =
5,440✔
4303
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
5,440✔
4304
                                  error); // Throws
5,440✔
4305
    if (REALM_UNLIKELY(!success))         // Throws
5,440✔
4306
        protocol_error(error, &sess);     // Throws
28✔
4307
}
5,440✔
4308

4309

4310
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4311
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4312
                                           version_type scan_client_version, version_type latest_server_version,
4313
                                           salt_type latest_server_version_salt)
4314
{
4,316✔
4315
    auto i = m_sessions.find(session_ident);
4,316✔
4316
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,316✔
4317
        bad_session_ident("IDENT", session_ident); // Throws
×
4318
        return;
×
4319
    }
×
4320
    Session& sess = *i->second;
4,316✔
4321
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,316✔
4322
        message_after_unbind("IDENT", session_ident); // Throws
×
4323
        return;
×
4324
    }
×
4325
    if (REALM_UNLIKELY(sess.error_occurred())) {
4,316✔
4326
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4327
        // messages, other than UNBIND, must be ignored.
4328
        return;
16✔
4329
    }
16✔
4330
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
4,300✔
4331
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4332
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4333
        return;
×
4334
    }
×
4335
    if (REALM_UNLIKELY(sess.ident_message_received())) {
4,300✔
4336
        logger.error("Received second IDENT message for session"); // Throws
×
4337
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4338
        return;
×
4339
    }
×
4340

4341
    ProtocolError error = {};
4,300✔
4342
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
4,300✔
4343
                                              scan_client_version, latest_server_version, latest_server_version_salt,
4,300✔
4344
                                              error); // Throws
4,300✔
4345
    if (REALM_UNLIKELY(!success))                     // Throws
4,300✔
4346
        protocol_error(error, &sess);                 // Throws
32✔
4347
}
4,300✔
4348

4349
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4350
                                            version_type progress_server_version, version_type locked_server_version,
4351
                                            const UploadChangesets& upload_changesets)
4352
{
46,618✔
4353
    auto i = m_sessions.find(session_ident);
46,618✔
4354
    if (REALM_UNLIKELY(i == m_sessions.end())) {
46,618✔
4355
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4356
        return;
×
4357
    }
×
4358
    Session& sess = *i->second;
46,618✔
4359
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
46,618✔
4360
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4361
        return;
×
4362
    }
×
4363
    if (REALM_UNLIKELY(sess.error_occurred())) {
46,618✔
4364
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4365
        // messages, other than UNBIND, must be ignored.
4366
        return;
×
4367
    }
×
4368
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
46,618✔
4369
        message_before_ident("UPLOAD", session_ident); // Throws
×
4370
        return;
×
4371
    }
×
4372

4373
    ProtocolError error = {};
46,618✔
4374
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
46,618✔
4375
                                               locked_server_version, upload_changesets, error); // Throws
46,618✔
4376
    if (REALM_UNLIKELY(!success))                                                                // Throws
46,618✔
4377
        protocol_error(error, &sess);                                                            // Throws
×
4378
}
46,618✔
4379

4380

4381
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4382
{
12,064✔
4383
    auto i = m_sessions.find(session_ident);
12,064✔
4384
    if (REALM_UNLIKELY(i == m_sessions.end())) {
12,064✔
4385
        bad_session_ident("MARK", session_ident);
×
4386
        return;
×
4387
    }
×
4388
    Session& sess = *i->second;
12,064✔
4389
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
12,064✔
4390
        message_after_unbind("MARK", session_ident); // Throws
×
4391
        return;
×
4392
    }
×
4393
    if (REALM_UNLIKELY(sess.error_occurred())) {
12,064✔
4394
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4395
        // messages, other than UNBIND, must be ignored.
4396
        return;
48✔
4397
    }
48✔
4398
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
12,016✔
4399
        message_before_ident("MARK", session_ident); // Throws
×
4400
        return;
×
4401
    }
×
4402

4403
    ProtocolError error;
12,016✔
4404
    bool success = sess.receive_mark_message(request_ident, error); // Throws
12,016✔
4405
    if (REALM_UNLIKELY(!success))                                   // Throws
12,016✔
4406
        protocol_error(error, &sess);                               // Throws
×
4407
}
12,016✔
4408

4409

4410
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4411
{
2,548✔
4412
    auto i = m_sessions.find(session_ident); // Throws
2,548✔
4413
    if (REALM_UNLIKELY(i == m_sessions.end())) {
2,548✔
4414
        bad_session_ident("UNBIND", session_ident); // Throws
×
4415
        return;
×
4416
    }
×
4417
    Session& sess = *i->second;
2,548✔
4418
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
2,548✔
4419
        message_after_unbind("UNBIND", session_ident); // Throws
×
4420
        return;
×
4421
    }
×
4422

4423
    sess.receive_unbind_message(); // Throws
2,548✔
4424
    // NOTE: The session might have gotten destroyed at this time!
4425
}
2,548✔
4426

4427

4428
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4429
{
118✔
4430
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
118✔
4431
    m_send_pong = true;
118✔
4432
    m_last_ping_timestamp = timestamp;
118✔
4433
    if (!m_is_sending)
118✔
4434
        send_next_message();
116✔
4435
}
118✔
4436

4437

4438
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4439
                                           std::string_view error_body)
4440
{
×
4441
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4442
                 session_ident); // Throws
×
4443
    auto i = m_sessions.find(session_ident);
×
4444
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4445
        bad_session_ident("ERROR", session_ident);
×
4446
        return;
×
4447
    }
×
4448
    Session& sess = *i->second;
×
4449
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4450
        message_after_unbind("ERROR", session_ident); // Throws
×
4451
        return;
×
4452
    }
×
4453

4454
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4455
}
×
4456

4457
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4458
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4459
{
6,350✔
4460
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
6,350✔
4461
        return logger.log(level, message.c_str());
×
4462
    }
×
4463

4464
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
6,350✔
4465
    {
6,350✔
4466
        std::lock_guard lock(m_log_mutex);
6,350✔
4467
        m_log_messages.push(std::move(log_msg));
6,350✔
4468
    }
6,350✔
4469
    m_send_trigger->trigger();
6,350✔
4470
}
6,350✔
4471

4472

4473
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4474
{
×
4475
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4476
                 session_ident);                      // Throws
×
4477
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4478
}
×
4479

4480

4481
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4482
{
×
4483
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4484
                 session_ident);                      // Throws
×
4485
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4486
}
×
4487

4488

4489
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4490
{
×
4491
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4492
                 session_ident);                      // Throws
×
4493
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4494
}
×
4495

4496

4497
void SyncConnection::handle_message_received(const char* data, size_t size)
4498
{
71,104✔
4499
    // parse_message_received() parses the message and calls the
4500
    // proper handler on the SyncConnection object (this).
4501
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
71,104✔
4502
    return;
71,104✔
4503
}
71,104✔
4504

4505

4506
void SyncConnection::handle_ping_received(const char* data, size_t size)
4507
{
×
4508
    // parse_message_received() parses the message and calls the
4509
    // proper handler on the SyncConnection object (this).
4510
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4511
    return;
×
4512
}
×
4513

4514

4515
void SyncConnection::send_next_message()
4516
{
105,168✔
4517
    REALM_ASSERT(!m_is_sending);
105,168✔
4518
    REALM_ASSERT(!m_sending_pong);
105,168✔
4519
    if (m_send_pong) {
105,168✔
4520
        send_pong(m_last_ping_timestamp);
118✔
4521
        if (m_sending_pong)
118✔
4522
            return;
118✔
4523
    }
118✔
4524
    for (;;) {
156,652✔
4525
        Session* sess = m_sessions_enlisted_to_send.pop_front();
156,652✔
4526
        if (!sess) {
156,652✔
4527
            // No sessions were enlisted to send
4528
            if (REALM_LIKELY(!m_is_closing))
48,414✔
4529
                break; // Check to see if there are any log messages to go out
48,398✔
4530
            // Send a connection level ERROR
4531
            REALM_ASSERT(!is_session_level_error(m_error_code));
16✔
4532
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
16✔
4533
            return;
16✔
4534
        }
48,414✔
4535
        sess->send_message(); // Throws
108,238✔
4536
        // NOTE: The session might have gotten destroyed at this time!
4537

4538
        // At this point, `m_is_sending` is true if, and only if the session
4539
        // chose to send a message. If it chose to not send a message, we must
4540
        // loop back and give the next session in `m_sessions_enlisted_to_send`
4541
        // a chance.
4542
        if (m_is_sending)
108,238✔
4543
            return;
56,658✔
4544
    }
108,238✔
4545
    {
48,376✔
4546
        std::lock_guard lock(m_log_mutex);
48,376✔
4547
        if (!m_log_messages.empty()) {
48,376✔
4548
            send_log_message(m_log_messages.front());
6,210✔
4549
            m_log_messages.pop();
6,210✔
4550
        }
6,210✔
4551
    }
48,376✔
4552
    // Otherwise, nothing to do
4553
}
48,376✔
4554

4555

4556
void SyncConnection::initiate_write_output_buffer()
4557
{
62,870✔
4558
    auto handler = [this](std::error_code ec, size_t) {
62,870✔
4559
        if (!ec) {
62,844✔
4560
            handle_write_output_buffer();
62,688✔
4561
        }
62,688✔
4562
    };
62,844✔
4563

4564
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
62,870✔
4565
                                   std::move(handler)); // Throws
62,870✔
4566
    m_is_sending = true;
62,870✔
4567
}
62,870✔
4568

4569

4570
void SyncConnection::initiate_pong_output_buffer()
4571
{
118✔
4572
    auto handler = [this](std::error_code ec, size_t) {
118✔
4573
        if (!ec) {
118✔
4574
            handle_pong_output_buffer();
118✔
4575
        }
118✔
4576
    };
118✔
4577

4578
    REALM_ASSERT(!m_is_sending);
118✔
4579
    REALM_ASSERT(!m_sending_pong);
118✔
4580
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
118✔
4581
                                   std::move(handler)); // Throws
118✔
4582

4583
    m_is_sending = true;
118✔
4584
    m_sending_pong = true;
118✔
4585
}
118✔
4586

4587

4588
void SyncConnection::send_pong(milliseconds_type timestamp)
4589
{
118✔
4590
    REALM_ASSERT(m_send_pong);
118✔
4591
    REALM_ASSERT(!m_sending_pong);
118✔
4592
    m_send_pong = false;
118✔
4593
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
118✔
4594

4595
    OutputBuffer& out = get_output_buffer();
118✔
4596
    get_server_protocol().make_pong(out, timestamp); // Throws
118✔
4597

4598
    initiate_pong_output_buffer(); // Throws
118✔
4599
}
118✔
4600

4601
void SyncConnection::send_log_message(const LogMessage& log_msg)
4602
{
6,210✔
4603
    OutputBuffer& out = get_output_buffer();
6,210✔
4604
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
6,210✔
4605
                                           log_msg.co_id); // Throws
6,210✔
4606

4607
    initiate_write_output_buffer(); // Throws
6,210✔
4608
}
6,210✔
4609

4610

4611
void SyncConnection::handle_write_output_buffer()
4612
{
62,690✔
4613
    release_output_buffer();
62,690✔
4614
    m_is_sending = false;
62,690✔
4615
    send_next_message(); // Throws
62,690✔
4616
}
62,690✔
4617

4618

4619
void SyncConnection::handle_pong_output_buffer()
4620
{
118✔
4621
    release_output_buffer();
118✔
4622
    REALM_ASSERT(m_is_sending);
118✔
4623
    REALM_ASSERT(m_sending_pong);
118✔
4624
    m_is_sending = false;
118✔
4625
    m_sending_pong = false;
118✔
4626
    send_next_message(); // Throws
118✔
4627
}
118✔
4628

4629

4630
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4631
{
16✔
4632
    const char* message = get_protocol_error_message(int(error_code));
16✔
4633
    std::size_t message_size = std::strlen(message);
16✔
4634
    bool try_again = determine_try_again(error_code);
16✔
4635

4636
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
16✔
4637
                  message_size, try_again, session_ident); // Throws
16✔
4638

4639
    OutputBuffer& out = get_output_buffer();
16✔
4640
    int protocol_version = get_client_protocol_version();
16✔
4641
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
16✔
4642
                                             session_ident); // Throws
16✔
4643

4644
    auto handler = [this](std::error_code ec, size_t) {
16✔
4645
        handle_write_error(ec); // Throws
16✔
4646
    };
16✔
4647
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
16✔
4648
    m_is_sending = true;
16✔
4649
}
16✔
4650

4651

4652
void SyncConnection::handle_write_error(std::error_code ec)
4653
{
16✔
4654
    m_is_sending = false;
16✔
4655
    REALM_ASSERT(m_is_closing);
16✔
4656
    if (!m_ssl_stream) {
16✔
4657
        m_socket->shutdown(network::Socket::shutdown_send, ec);
16✔
4658
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
16!
4659
            throw std::system_error(ec);
×
4660
    }
16✔
4661
}
16✔
4662

4663

4664
// For connection level errors, `sess` is ignored. For session level errors, a
4665
// session must be specified.
4666
//
4667
// If a session is specified, that session object will have been detached from
4668
// the ServerFile object (and possibly destroyed) upon return from
4669
// protocol_error().
4670
//
4671
// If a session is specified for a protocol level error, that session object
4672
// will have been destroyed upon return from protocol_error(). For session level
4673
// errors, the specified session will have been destroyed upon return from
4674
// protocol_error() if, and only if the negotiated protocol version is less than
4675
// 23.
4676
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4677
{
80✔
4678
    REALM_ASSERT(!m_is_closing);
80✔
4679
    bool session_level = is_session_level_error(error_code);
80✔
4680
    REALM_ASSERT(!session_level || sess);
80✔
4681
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
80✔
4682
    if (logger.would_log(util::Logger::Level::debug)) {
80✔
4683
        const char* message = get_protocol_error_message(int(error_code));
×
4684
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4685
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4686
    }
×
4687
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
80✔
4688
    if (session_level) {
80✔
4689
        sess->initiate_deactivation(error_code); // Throws
80✔
4690
        return;
80✔
4691
    }
80✔
4692
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4693
}
×
4694

4695

4696
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4697
{
16✔
4698
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
16✔
4699

4700
    // With recent versions of the protocol (when the version is greater than,
4701
    // or equal to 23), this function will only be called for connection level
4702
    // errors, never for session specific errors. However, for the purpose of
4703
    // emulating earlier protocol versions, this function might be called for
4704
    // session specific errors too.
4705
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
16✔
4706
    REALM_ASSERT(!is_session_level_error(error_code));
16✔
4707

4708
    REALM_ASSERT(m_send_trigger);
16✔
4709
    REALM_ASSERT(!m_is_closing);
16✔
4710
    m_is_closing = true;
16✔
4711

4712
    m_error_code = error_code;
16✔
4713
    m_error_session_ident = session_ident;
16✔
4714

4715
    // Don't waste time and effort sending any other messages
4716
    m_send_pong = false;
16✔
4717
    m_sessions_enlisted_to_send.clear();
16✔
4718

4719
    m_receiving_session = nullptr;
16✔
4720

4721
    terminate_sessions(); // Throws
16✔
4722

4723
    m_send_trigger->trigger();
16✔
4724
}
16✔
4725

4726

4727
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4728
{
678✔
4729
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
678✔
4730
    // Suicide
4731
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
678✔
4732
}
678✔
4733

4734

4735
void SyncConnection::close_due_to_error(std::error_code ec)
4736
{
562✔
4737
    // Suicide
4738
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
562✔
4739
              ec.message()); // Throws
562✔
4740
}
562✔
4741

4742

4743
void SyncConnection::terminate_sessions()
4744
{
1,260✔
4745
    for (auto& entry : m_sessions) {
1,920✔
4746
        Session& sess = *entry.second;
1,920✔
4747
        sess.terminate(); // Throws
1,920✔
4748
    }
1,920✔
4749
    m_sessions_enlisted_to_send.clear();
1,260✔
4750
    m_sessions.clear();
1,260✔
4751
}
1,260✔
4752

4753

4754
void SyncConnection::initiate_soft_close()
4755
{
16✔
4756
    if (!m_is_closing) {
16✔
4757
        session_ident_type session_ident = 0;                                    // Not session specific
16✔
4758
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
16✔
4759
    }
16✔
4760
}
16✔
4761

4762

4763
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4764
{
2,538✔
4765
    m_sessions.erase(session_ident);
2,538✔
4766
}
2,538✔
4767

4768
} // anonymous namespace
4769

4770

4771
// ============================ sync::Server implementation ============================
4772

4773
class Server::Implementation : public ServerImpl {
4774
public:
4775
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4776
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
548✔
4777
    {
1,200✔
4778
    }
1,200✔
4779
    virtual ~Implementation() {}
1,200✔
4780
};
4781

4782

4783
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4784
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
548✔
4785
{
1,200✔
4786
}
1,200✔
4787

4788

4789
Server::Server(Server&& serv) noexcept
4790
    : m_impl{std::move(serv.m_impl)}
4791
{
×
4792
}
×
4793

4794

4795
Server::~Server() noexcept {}
1,200✔
4796

4797

4798
void Server::start()
4799
{
516✔
4800
    m_impl->start(); // Throws
516✔
4801
}
516✔
4802

4803

4804
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4805
{
684✔
4806
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
684✔
4807
}
684✔
4808

4809

4810
network::Endpoint Server::listen_endpoint() const
4811
{
1,254✔
4812
    return m_impl->listen_endpoint(); // Throws
1,254✔
4813
}
1,254✔
4814

4815

4816
void Server::run()
4817
{
1,144✔
4818
    m_impl->run(); // Throws
1,144✔
4819
}
1,144✔
4820

4821

4822
void Server::stop() noexcept
4823
{
2,052✔
4824
    m_impl->stop();
2,052✔
4825
}
2,052✔
4826

4827

4828
uint_fast64_t Server::errors_seen() const noexcept
4829
{
684✔
4830
    return m_impl->errors_seen;
684✔
4831
}
684✔
4832

4833

4834
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4835
                                                      milliseconds_type timeout)
4836
{
×
4837
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4838
}
×
4839

4840

4841
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4842
{
4✔
4843
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4844
}
4✔
4845

4846

4847
void Server::close_connections()
4848
{
16✔
4849
    m_impl->close_connections();
16✔
4850
}
16✔
4851

4852

4853
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4854
{
72✔
4855
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4856
}
72✔
4857

4858

4859
void Server::recognize_external_change(const std::string& virt_path)
4860
{
4,800✔
4861
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4862
}
4,800✔
4863

4864

4865
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4866
{
×
4867
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4868
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc