• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2211

09 Apr 2024 03:41PM UTC coverage: 92.633% (+0.5%) from 92.106%
2211

push

Evergreen

web-flow
Merge pull request #7300 from realm/tg/rework-metadata-storage

Rework sync user handling and metadata storage

102820 of 195548 branches covered (52.58%)

3165 of 3247 new or added lines in 46 files covered. (97.47%)

31 existing lines in 8 files now uncovered.

249584 of 269432 relevant lines covered (92.63%)

49986309.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.29
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/load_file.hpp>
29
#include <realm/util/memory_stream.hpp>
30
#include <realm/util/optional.hpp>
31
#include <realm/util/platform_info.hpp>
32
#include <realm/util/random.hpp>
33
#include <realm/util/safe_int_ops.hpp>
34
#include <realm/util/scope_exit.hpp>
35
#include <realm/util/scratch_allocator.hpp>
36
#include <realm/util/thread.hpp>
37
#include <realm/util/thread_exec_guard.hpp>
38
#include <realm/util/value_reset_guard.hpp>
39
#include <realm/version.hpp>
40

41
#include <algorithm>
42
#include <atomic>
43
#include <cctype>
44
#include <chrono>
45
#include <cmath>
46
#include <condition_variable>
47
#include <cstdint>
48
#include <cstdio>
49
#include <cstring>
50
#include <functional>
51
#include <locale>
52
#include <map>
53
#include <memory>
54
#include <queue>
55
#include <sstream>
56
#include <stdexcept>
57
#include <thread>
58
#include <vector>
59

60
// NOTE: The protocol specification is in `/doc/protocol.md`
61

62

63
// FIXME: Verify that session identifier spoofing cannot be used to get access
64
// to sessions belonging to other network conections in any way.
65
// FIXME: Seems that server must close connection with zero sessions after a
66
// certain timeout.
67

68

69
using namespace realm;
70
using namespace realm::sync;
71
using namespace realm::util;
72

73
// clang-format off
74
using ServerHistory         = _impl::ServerHistory;
75
using ServerProtocol        = _impl::ServerProtocol;
76
using ServerFileAccessCache = _impl::ServerFileAccessCache;
77
using ServerImplBase = _impl::ServerImplBase;
78

79
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
80
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
81
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
82
using IntegrationResult = ServerHistory::IntegrationResult;
83
using BootstrapError = ServerHistory::BootstrapError;
84
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
85
using ClientType = ServerHistory::ClientType;
86
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
87
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
88

89
using UploadChangeset = ServerProtocol::UploadChangeset;
90
// clang-format on
91

92

93
using UploadChangesets = std::vector<UploadChangeset>;
94

95
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
96

97

98
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
99
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
100
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
101
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
102

103

104
namespace {
105

106
enum class SchedStatus { done = 0, pending, in_progress };
107

108
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
109
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
110
{
×
111
    return "com.mongodb.realm-sync/";
×
112
}
×
113

114
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
115
{
3,074✔
116
    if (str.size() > cutoff) {
3,074✔
117
        return "..." + str.substr(str.size() - cutoff);
×
118
    }
×
119
    else {
3,074✔
120
        return str;
3,074✔
121
    }
3,074✔
122
}
3,074✔
123

124

125
class HttpListHeaderValueParser {
126
public:
127
    HttpListHeaderValueParser(std::string_view string) noexcept
128
        : m_string{string}
129
    {
17,292✔
130
    }
17,292✔
131
    bool next(std::string_view& elem) noexcept
132
    {
207,536✔
133
        while (m_pos < m_string.size()) {
207,540✔
134
            size_type i = m_pos;
190,238✔
135
            size_type j = m_string.find(',', i);
190,238✔
136
            if (j != std::string_view::npos) {
190,238✔
137
                m_pos = j + 1;
172,946✔
138
            }
172,946✔
139
            else {
17,292✔
140
                j = m_string.size();
17,292✔
141
                m_pos = j;
17,292✔
142
            }
17,292✔
143

95,234✔
144
            // Exclude leading and trailing white space
95,234✔
145
            while (i < j && is_http_lws(m_string[i]))
363,188✔
146
                ++i;
172,942✔
147
            while (j > i && is_http_lws(m_string[j - 1]))
190,246✔
148
                --j;
×
149

95,234✔
150
            if (i != j) {
190,242✔
151
                elem = m_string.substr(i, j - i);
190,236✔
152
                return true;
190,236✔
153
            }
190,236✔
154
        }
190,238✔
155
        return false;
112,536✔
156
    }
207,536✔
157

158
private:
159
    using size_type = std::string_view::size_type;
160
    const std::string_view m_string;
161
    size_type m_pos = 0;
162
    static bool is_http_lws(char ch) noexcept
163
    {
553,378✔
164
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
553,382✔
165
    }
553,378✔
166
};
167

168

169
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
170
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
171
using SteadyTimePoint = SteadyClock::time_point;
172

173
SteadyTimePoint steady_clock_now() noexcept
174
{
1,414,898✔
175
    return SteadyClock::now();
1,414,898✔
176
}
1,414,898✔
177

178
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
179
{
346,796✔
180
    auto duration = end_time - start_time;
346,796✔
181
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
346,796✔
182
    return milliseconds_type(millis_duration);
346,796✔
183
}
346,796✔
184

185

186
bool determine_try_again(ProtocolError error_code) noexcept
187
{
888✔
188
    return (error_code == ProtocolError::connection_closed);
888✔
189
}
888✔
190

191

192
class ServerFile;
193
class ServerImpl;
194
class HTTPConnection;
195
class SyncConnection;
196
class Session;
197

198

199
using Formatter = util::ResettableExpandableBufferOutputStream;
200
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
201

202
using ProtocolVersionRange = std::pair<int, int>;
203

204
class MiscBuffers {
205
public:
206
    Formatter formatter;
207
    OutputBuffer download_message;
208

209
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
210
    ProtocolVersionRanges protocol_version_ranges;
211

212
    std::vector<char> compress;
213

214
    MiscBuffers()
215
    {
12,256✔
216
        formatter.imbue(std::locale::classic());
12,256✔
217
        download_message.imbue(std::locale::classic());
12,256✔
218
    }
12,256✔
219
};
220

221

222
struct DownloadCache {
223
    std::unique_ptr<char[]> body;
224
    std::size_t uncompressed_body_size;
225
    std::size_t compressed_body_size;
226
    bool body_is_compressed;
227
    version_type end_version;
228
    DownloadCursor download_progress;
229
    std::uint_fast64_t downloadable_bytes;
230
    std::size_t num_changesets;
231
    std::size_t accum_original_size;
232
    std::size_t accum_compacted_size;
233
};
234

235

236
// An unblocked work unit is comprised of one Work object for each of the files
237
// that contribute work to the work unit, generally one reference file and a
238
// number of partial files.
239
class Work {
240
public:
241
    // In general, primary work is all forms of modifying work, including file
242
    // deletion.
243
    bool has_primary_work = false;
244

245
    // Only for reference files
246
    bool might_produce_new_sync_version = false;
247

248
    bool produced_new_realm_version = false;
249
    bool produced_new_sync_version = false;
250
    bool expired_reference_version = false;
251

252
    // True if, and only if changesets_from_downstream contains at least one
253
    // changeset.
254
    bool have_changesets_from_downstream = false;
255

256
    FileIdentAllocSlots file_ident_alloc_slots;
257
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
258
    IntegratableChangesets changesets_from_downstream;
259

260
    VersionInfo version_info;
261

262
    // Result of integration of changesets from downstream clients
263
    IntegrationResult integration_result;
264

265
    void reset() noexcept
266
    {
346,528✔
267
        has_primary_work = false;
346,528✔
268

172,402✔
269
        might_produce_new_sync_version = false;
346,528✔
270

172,402✔
271
        produced_new_realm_version = false;
346,528✔
272
        produced_new_sync_version = false;
346,528✔
273
        expired_reference_version = false;
346,528✔
274
        have_changesets_from_downstream = false;
346,528✔
275

172,402✔
276
        file_ident_alloc_slots.clear();
346,528✔
277
        changeset_buffers.clear();
346,528✔
278
        changesets_from_downstream.clear();
346,528✔
279

172,402✔
280
        version_info = {};
346,528✔
281
        integration_result = {};
346,528✔
282
    }
346,528✔
283
};
284

285

286
class WorkerState {
287
public:
288
    FileIdentAllocSlots file_ident_alloc_slots;
289
    util::ScratchMemory scratch_memory;
290
    bool use_file_cache = true;
291
    std::unique_ptr<ServerHistory> reference_hist;
292
    DBRef reference_sg;
293
};
294

295

296
// ============================ SessionQueue ============================
297

298
class SessionQueue {
299
public:
300
    void push_back(Session*) noexcept;
301
    Session* pop_front() noexcept;
302
    void clear() noexcept;
303

304
private:
305
    Session* m_back = nullptr;
306
};
307

308

309
// ============================ FileIdentReceiver ============================
310

311
class FileIdentReceiver {
312
public:
313
    virtual void receive_file_ident(SaltedFileIdent) = 0;
314

315
protected:
316
    ~FileIdentReceiver() {}
58,102✔
317
};
318

319

320
// ============================ WorkerBox =============================
321

322
class WorkerBox {
323
public:
324
    using JobType = util::UniqueFunction<void(WorkerState&)>;
325
    void add_work(WorkerState& state, JobType job)
326
    {
×
327
        std::unique_lock<std::mutex> lock(m_mutex);
×
328
        if (m_jobs.size() >= m_queue_limit) {
×
329
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
330
            // than to queue it.
×
331
            run_a_job(lock, state, job);
×
332
        }
×
333
        else {
×
334
            // Create worker threads on demand (if all existing threads are active):
×
335
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
336
                m_threads.emplace_back([this]() {
×
337
                    WorkerState state;
×
338
                    state.use_file_cache = false;
×
339
                    JobType the_job;
×
340
                    std::unique_lock<std::mutex> lock(m_mutex);
×
341
                    for (;;) {
×
342
                        while (m_jobs.empty() && !m_finish_up)
×
343
                            m_changes.wait(lock);
×
344
                        if (m_finish_up)
×
345
                            break; // terminate thread
×
346
                        the_job = std::move(m_jobs.back());
×
347
                        m_jobs.pop_back();
×
348
                        run_a_job(lock, state, the_job);
×
349
                        m_changes.notify_all();
×
350
                    }
×
351
                });
×
352
            }
×
353

×
354
            // Submit the job for execution:
×
355
            m_jobs.emplace_back(std::move(job));
×
356
            m_changes.notify_all();
×
357
        }
×
358
    }
×
359

360
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
361
    // propagation of exceptions.
362
    void wait_completion(WorkerState& state)
363
    {
×
364
        std::unique_lock<std::mutex> lock(m_mutex);
×
365
        while (!m_jobs.empty() || m_active > 0) {
×
366
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
367
                JobType the_job = std::move(m_jobs.back());
×
368
                m_jobs.pop_back();
×
369
                run_a_job(lock, state, the_job);
×
370
            }
×
371
            else {
×
372
                m_changes.wait(lock);
×
373
            }
×
374
        }
×
375
        if (m_epr) {
×
376
            std::rethrow_exception(m_epr);
×
377
        }
×
378
    }
×
379

380
    WorkerBox(unsigned int num_threads)
381
    {
×
382
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
383
        m_max_num_threads = num_threads;
×
384
    }
×
385

386
    ~WorkerBox()
387
    {
×
388
        {
×
389
            std::unique_lock<std::mutex> lock(m_mutex);
×
390
            m_finish_up = true;
×
391
            m_changes.notify_all();
×
392
        }
×
393
        for (auto& e : m_threads)
×
394
            e.join();
×
395
    }
×
396

397
private:
398
    std::mutex m_mutex;
399
    std::condition_variable m_changes;
400
    std::vector<std::thread> m_threads;
401
    std::vector<JobType> m_jobs;
402
    unsigned int m_active = 0;
403
    bool m_finish_up = false;
404
    unsigned int m_queue_limit = 0;
405
    unsigned int m_max_num_threads = 0;
406
    std::exception_ptr m_epr;
407

408
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
409
    {
×
410
        ++m_active;
×
411
        lock.unlock();
×
412
        try {
×
413
            job(state);
×
414
            lock.lock();
×
415
        }
×
416
        catch (...) {
×
417
            lock.lock();
×
418
            if (!m_epr)
×
419
                m_epr = std::current_exception();
×
420
        }
×
421
        --m_active;
×
422
    }
×
423
};
424

425

426
// ============================ ServerFile ============================
427

428
class ServerFile : public util::RefCountBase {
429
public:
430
    util::PrefixLogger logger;
431

432
    // Logger to be used by the worker thread
433
    util::PrefixLogger wlogger;
434

435
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
436
               bool disable_sync_to_disk);
437
    ~ServerFile() noexcept;
438

439
    void initialize();
440
    void activate();
441

442
    ServerImpl& get_server() noexcept
443
    {
362,080✔
444
        return m_server;
362,080✔
445
    }
362,080✔
446

447
    const std::string& get_real_path() const noexcept
448
    {
×
449
        return m_file.realm_path;
×
450
    }
×
451

452
    const std::string& get_virt_path() const noexcept
453
    {
×
454
        return m_file.virt_path;
×
455
    }
×
456

457
    ServerFileAccessCache::File& access()
458
    {
464,978✔
459
        return m_file.access(); // Throws
464,978✔
460
    }
464,978✔
461

462
    ServerFileAccessCache::File& worker_access()
463
    {
345,980✔
464
        return m_worker_file.access(); // Throws
345,980✔
465
    }
345,980✔
466

467
    version_type get_realm_version() const noexcept
468
    {
×
469
        return m_version_info.realm_version;
×
470
    }
×
471

472
    version_type get_sync_version() const noexcept
473
    {
×
474
        return m_version_info.sync_version.version;
×
475
    }
×
476

477
    SaltedVersion get_salted_sync_version() const noexcept
478
    {
948,356✔
479
        return m_version_info.sync_version;
948,356✔
480
    }
948,356✔
481

482
    DownloadCache& get_download_cache() noexcept;
483

484
    void register_client_access(file_ident_type client_file_ident);
485

486
    using file_ident_request_type = std::int_fast64_t;
487

488
    // Initiate a request for a new client file identifier.
489
    //
490
    // Unless the request is cancelled, the identifier will be delivered to the
491
    // receiver by way of an invocation of
492
    // FileIdentReceiver::receive_file_ident().
493
    //
494
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
495
    // until after request_file_ident() has returned (no callback reentrance).
496
    //
497
    // New client file identifiers will be delivered to receivers in the order
498
    // that they were requested.
499
    //
500
    // The returned value is a nonzero integer that can be used to cancel the
501
    // request before the file identifier is delivered using
502
    // cancel_file_ident_request().
503
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
504

505
    // Cancel the specified file identifier request.
506
    //
507
    // It is an error to call this function after the identifier has been
508
    // delivered.
509
    void cancel_file_ident_request(file_ident_request_type) noexcept;
510

511
    void add_unidentified_session(Session*);
512
    void identify_session(Session*, file_ident_type client_file_ident);
513

514
    void remove_unidentified_session(Session*) noexcept;
515
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
516

517
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
518

519
    bool can_add_changesets_from_downstream() const noexcept;
520
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
521
                                        version_type locked_server_version, const UploadChangeset*,
522
                                        std::size_t num_changesets);
523

524
    // bootstrap_client_session calls the function of same name in server_history
525
    // but corrects the upload_progress with information from pending
526
    // integratable changesets. A situation can occur where a client terminates
527
    // a session and starts a new session and re-uploads changesets that are known
528
    // by the ServerFile object but not by the ServerHistory.
529
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
530
                                            SaltedVersion server_version, ClientType client_type,
531
                                            UploadCursor& upload_progress, version_type& locked_server_version,
532
                                            Logger&);
533

534
    // NOTE: This function is executed by the worker thread
535
    void worker_process_work_unit(WorkerState&);
536

537
    void recognize_external_change();
538

539
private:
540
    ServerImpl& m_server;
541
    ServerFileAccessCache::Slot m_file;
542

543
    // In general, `m_version_info` refers to the last snapshot of the Realm
544
    // file that is supposed to be visible to remote peers engaging in regular
545
    // Realm file synchronization.
546
    VersionInfo m_version_info;
547

548
    file_ident_request_type m_last_file_ident_request = 0;
549

550
    // The set of sessions whose client file identifier is not yet known, i.e.,
551
    // those for which an IDENT message has not yet been received,
552
    std::set<Session*> m_unidentified_sessions;
553

554
    // A map of the sessions whose client file identifier is known, i.e, those
555
    // for which an IDENT message has been received.
556
    std::map<file_ident_type, Session*> m_identified_sessions;
557

558
    // Used when a file used as partial view wants to allocate a client file
559
    // identifier from the reference Realm.
560
    file_ident_request_type m_file_ident_request = 0;
561

562
    struct FileIdentRequestInfo {
563
        FileIdentReceiver* receiver;
564
        file_ident_type proxy_file;
565
        ClientType client_type;
566
    };
567

568
    // When nonempty, it counts towards outstanding blocked work (see
569
    // `m_has_blocked_work`).
570
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
571

572
    // Changesets received from the downstream clients, and waiting to be
573
    // integrated, as well as information about the clients progress in terms of
574
    // integrating changesets received from the server. When nonempty, it counts
575
    // towards outstanding blocked work (see `m_has_blocked_work`).
576
    //
577
    // At any given time, the set of changesets from a particular client-side
578
    // file may be comprised of changesets received via distinct sessions.
579
    //
580
    // See also `m_num_changesets_from_downstream`.
581
    IntegratableChangesets m_changesets_from_downstream;
582

583
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
584
    //
585
    // Its purpose is also to initialize
586
    // `Work::have_changesets_from_downstream`.
587
    std::size_t m_num_changesets_from_downstream = 0;
588

589
    // The total size, in bytes, of the changesets that were received from
590
    // clients, are targeting this file, and are currently part of the blocked
591
    // work unit.
592
    //
593
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
594
    // purpose is to allow the server to keep track of the accumulated size of
595
    // changesets being processed, or waiting to be processed (metric
596
    // `upload.pending.bytes`) (see
597
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
598
    //
599
    // Its purpose is also to enable the "very poor man's" backpressure solution
600
    // (see can_add_changesets_from_downstream()).
601
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
602

603
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
604
    // currently unblocked work unit.
605
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
606

607
    // When nonempty, it counts towards outstanding blocked work (see
608
    // `m_has_blocked_work`).
609
    std::vector<std::string> m_permission_changes;
610

611
    // True iff this file, or any of its associated partial files (when
612
    // applicable), has a nonzero amount of outstanding work that is currently
613
    // held back from being passed to the worker thread because a previously
614
    // accumulated chunk of work related to this file is currently in progress.
615
    bool m_has_blocked_work = false;
616

617
    // A file, that is not a partial file, is considered *exposed to the worker
618
    // thread* from the point in time where it is submitted to the worker
619
    // (Worker::enqueue()) and up until the point in time where
620
    // group_postprocess_stage_1() starts to execute. A partial file is
621
    // considered *exposed to the worker thread* precisely when the associated
622
    // reference file is exposed to the worker thread, but only if it was in
623
    // `m_reference_file->m_work.partial_files` at the point in time where the
624
    // reference file was passed to the worker.
625
    //
626
    // While this file is exposed to the worker thread, all members of `m_work`
627
    // other than `changesets_from_downstream` may be accessed and modified by
628
    // the worker thread only.
629
    //
630
    // While this file is exposed to the worker thread,
631
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
632
    // must not be modified by any thread. This special status of
633
    // `m_work.changesets_from_downstream` is required to allow
634
    // ServerFile::bootstrap_client_session() to read from it at any time.
635
    Work m_work;
636

637
    // For reference files, set to true when work is unblocked, and reset back
638
    // to false when the work finalization process completes
639
    // (group_postprocess_stage_3()). Always zero for partial files.
640
    bool m_has_work_in_progress = 0;
641

642
    // This one must only be accessed by the worker thread.
643
    //
644
    // More specifically, `m_worker_file.access()` must only be called by the
645
    // worker thread, and if it was ever called, it must be closed by the worker
646
    // thread before the ServerFile object is destroyed, if destruction happens
647
    // before the destruction of the server object itself.
648
    ServerFileAccessCache::Slot m_worker_file;
649

650
    std::vector<std::int_fast64_t> m_deleting_connections;
651

652
    DownloadCache m_download_cache;
653

654
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
655
    void on_work_added();
656
    void group_unblock_work();
657
    void unblock_work();
658

659
    /// Resume history scanning in all sessions bound to this file. To be called
660
    /// after a successfull integration of a changeset.
661
    void resume_download() noexcept;
662

663
    // NOTE: These functions are executed by the worker thread
664
    void worker_allocate_file_identifiers();
665
    bool worker_integrate_changes_from_downstream(WorkerState&);
666
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
667
                                           DBRef& sg_ptr);
668
    ServerHistory& get_reference_file_history(WorkerState& state);
669
    void group_postprocess_stage_1();
670
    void group_postprocess_stage_2();
671
    void group_postprocess_stage_3();
672
    void group_finalize_work_stage_1();
673
    void group_finalize_work_stage_2();
674
    void finalize_work_stage_1();
675
    void finalize_work_stage_2();
676
};
677

678

679
inline DownloadCache& ServerFile::get_download_cache() noexcept
680
{
364,350✔
681
    return m_download_cache;
364,350✔
682
}
364,350✔
683

684
inline void ServerFile::group_finalize_work_stage_1()
685
{
342,542✔
686
    finalize_work_stage_1(); // Throws
342,542✔
687
}
342,542✔
688

689
inline void ServerFile::group_finalize_work_stage_2()
690
{
342,534✔
691
    finalize_work_stage_2(); // Throws
342,534✔
692
}
342,534✔
693

694

695
// ============================ Worker ============================
696

697
// All write transaction on server-side Realm files performed on behalf of the
698
// server, must be performed by the worker thread, not the network event loop
699
// thread. This is to ensure that the network event loop thread never gets
700
// blocked waiting for the worker thread to end a long running write
701
// transaction.
702
//
703
// FIXME: Currently, the event loop thread does perform a number of write
704
// transactions, but only on subtier nodes of a star topology server cluster.
705
class Worker : public ServerHistory::Context {
706
public:
707
    std::shared_ptr<util::Logger> logger_ptr;
708
    util::Logger& logger;
709

710
    explicit Worker(ServerImpl&);
711

712
    ServerFileAccessCache& get_file_access_cache() noexcept;
713

714
    void enqueue(ServerFile*);
715

716
    // Overriding members of ServerHistory::Context
717
    std::mt19937_64& server_history_get_random() noexcept override final;
718

719
private:
720
    ServerImpl& m_server;
721
    std::mt19937_64 m_random;
722
    ServerFileAccessCache m_file_access_cache;
723

724
    util::Mutex m_mutex;
725
    util::CondVar m_cond; // Protected by `m_mutex`
726

727
    bool m_stop = false; // Protected by `m_mutex`
728

729
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
730

731
    WorkerState m_state;
732

733
    void run();
734
    void stop() noexcept;
735

736
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
737
};
738

739

740
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
741
{
9,760✔
742
    return m_file_access_cache;
9,760✔
743
}
9,760✔
744

745

746
// ============================ ServerImpl ============================
747

748
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
749
public:
750
    std::uint_fast64_t errors_seen = 0;
751

752
    std::atomic<milliseconds_type> m_par_time;
753
    std::atomic<milliseconds_type> m_seq_time;
754

755
    util::Mutex last_client_accesses_mutex;
756

757
    const std::shared_ptr<util::Logger> logger_ptr;
758
    util::Logger& logger;
759

760
    network::Service& get_service() noexcept
761
    {
474,188✔
762
        return m_service;
474,188✔
763
    }
474,188✔
764

765
    const network::Service& get_service() const noexcept
766
    {
×
767
        return m_service;
×
768
    }
×
769

770
    std::mt19937_64& get_random() noexcept
771
    {
606,676✔
772
        return m_random;
606,676✔
773
    }
606,676✔
774

775
    const Server::Config& get_config() const noexcept
776
    {
1,021,304✔
777
        return m_config;
1,021,304✔
778
    }
1,021,304✔
779

780
    std::size_t get_max_upload_backlog() const noexcept
781
    {
407,422✔
782
        return m_max_upload_backlog;
407,422✔
783
    }
407,422✔
784

785
    const std::string& get_root_dir() const noexcept
786
    {
58,098✔
787
        return m_root_dir;
58,098✔
788
    }
58,098✔
789

790
    network::ssl::Context& get_ssl_context() noexcept
791
    {
432✔
792
        return *m_ssl_context;
432✔
793
    }
432✔
794

795
    const AccessControl& get_access_control() const noexcept
796
    {
×
797
        return m_access_control;
×
798
    }
×
799

800
    ProtocolVersionRange get_protocol_version_range() const noexcept
801
    {
17,296✔
802
        return m_protocol_version_range;
17,296✔
803
    }
17,296✔
804

805
    ServerProtocol& get_server_protocol() noexcept
806
    {
1,266,902✔
807
        return m_server_protocol;
1,266,902✔
808
    }
1,266,902✔
809

810
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
811
    {
38,008✔
812
        return m_compress_memory_arena;
38,008✔
813
    }
38,008✔
814

815
    MiscBuffers& get_misc_buffers() noexcept
816
    {
419,646✔
817
        return m_misc_buffers;
419,646✔
818
    }
419,646✔
819

820
    int_fast64_t get_current_server_session_ident() const noexcept
821
    {
×
822
        return m_current_server_session_ident;
×
823
    }
×
824

825
    util::ScratchMemory& get_scratch_memory() noexcept
826
    {
×
827
        return m_scratch_memory;
×
828
    }
×
829

830
    Worker& get_worker() noexcept
831
    {
365,602✔
832
        return m_worker;
365,602✔
833
    }
365,602✔
834

835
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
836
    {
×
837
        parallel_section = m_par_time;
×
838
        sequential_section = m_seq_time;
×
839
    }
×
840

841
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
842
    ~ServerImpl() noexcept;
843

844
    void start();
845

846
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
847
    {
6,120✔
848
        m_config.listen_address = listen_address;
6,120✔
849
        m_config.listen_port = listen_port;
6,120✔
850
        m_config.reuse_address = reuse_address;
6,120✔
851

3,078✔
852
        start(); // Throws
6,120✔
853
    }
6,120✔
854

855
    network::Endpoint listen_endpoint() const
856
    {
12,338✔
857
        return m_acceptor.local_endpoint();
12,338✔
858
    }
12,338✔
859

860
    void run();
861
    void stop() noexcept;
862

863
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
864

865
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
866
    void remove_sync_connection(int_fast64_t connection_id);
867

868
    size_t get_number_of_http_connections()
869
    {
×
870
        return m_http_connections.size();
×
871
    }
×
872

873
    size_t get_number_of_sync_connections()
874
    {
×
875
        return m_sync_connections.size();
×
876
    }
×
877

878
    bool is_sync_stopped()
879
    {
363,828✔
880
        return m_sync_stopped;
363,828✔
881
    }
363,828✔
882

883
    const std::set<std::string>& get_realm_names() const noexcept
884
    {
×
885
        return m_realm_names;
×
886
    }
×
887

888
    // virt_path must be valid when get_or_create_file() is called.
889
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
890
    {
57,848✔
891
        util::bind_ptr<ServerFile> file = get_file(virt_path);
57,848✔
892
        if (REALM_LIKELY(file))
57,848✔
893
            return file;
51,948✔
894

3,864✔
895
        _impl::VirtualPathComponents virt_path_components =
9,764✔
896
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
9,764✔
897
        REALM_ASSERT(virt_path_components.is_valid);
9,764✔
898

3,864✔
899
        _impl::make_dirs(m_root_dir, virt_path); // Throws
9,764✔
900
        m_realm_names.insert(virt_path);         // Throws
9,764✔
901
        {
9,764✔
902
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
9,764✔
903
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
9,764✔
904
                                      disable_sync_to_disk)); // Throws
9,764✔
905
        }
9,764✔
906

3,864✔
907
        file->initialize();
9,764✔
908
        m_files[virt_path] = file; // Throws
9,764✔
909
        file->activate();          // Throws
9,764✔
910
        return file;
9,764✔
911
    }
9,764✔
912

913
    std::unique_ptr<ServerHistory> make_history_for_path()
914
    {
×
915
        return std::make_unique<ServerHistory>(*this);
×
916
    }
×
917

918
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
919
    {
57,842✔
920
        auto i = m_files.find(virt_path);
57,842✔
921
        if (REALM_LIKELY(i != m_files.end()))
57,842✔
922
            return i->second;
51,946✔
923
        return {};
9,750✔
924
    }
9,750✔
925

926
    // Returns the number of seconds since the Epoch of
927
    // std::chrono::system_clock.
928
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
929
    {
×
930
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
931
            return m_config.token_expiration_clock->now();
×
932
        return std::chrono::system_clock::now();
×
933
    }
×
934

935
    void set_connection_reaper_timeout(milliseconds_type);
936

937
    void close_connections();
938
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
939

940
    void recognize_external_change(const std::string& virt_path);
941

942
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
943
                                                  milliseconds_type timeout);
944

945
    // Server global outputbuffers that can be reused.
946
    // The server is single threaded, so there are no
947
    // synchronization issues.
948
    // output_buffers_count is equal to the
949
    // maximum number of buffers needed at any point.
950
    static constexpr int output_buffers_count = 1;
951
    OutputBuffer output_buffers[output_buffers_count];
952

953
    bool is_load_balancing_allowed() const
954
    {
×
955
        return m_allow_load_balancing;
×
956
    }
×
957

958
    // inc_byte_size_for_pending_downstream_changesets() must be called by
959
    // ServerFile objects when changesets from downstream clients have been
960
    // received.
961
    //
962
    // dec_byte_size_for_pending_downstream_changesets() must be called by
963
    // ServerFile objects when changesets from downstream clients have been
964
    // processed or discarded.
965
    //
966
    // ServerImpl uses this information to keep a running tally (metric
967
    // `upload.pending.bytes`) of the total byte size of pending changesets from
968
    // downstream clients.
969
    //
970
    // These functions must be called on the network thread.
971
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
972
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
973

974
    // Overriding member functions in _impl::ServerHistory::Context
975
    std::mt19937_64& server_history_get_random() noexcept override final;
976

977
private:
978
    Server::Config m_config;
979
    network::Service m_service;
980
    std::mt19937_64 m_random;
981
    const std::size_t m_max_upload_backlog;
982
    const std::string m_root_dir;
983
    const AccessControl m_access_control;
984
    const ProtocolVersionRange m_protocol_version_range;
985

986
    // The reserved files will be closed in situations where the server
987
    // runs out of file descriptors.
988
    std::unique_ptr<File> m_reserved_files[5];
989

990
    // The set of all Realm files known to this server, represented by their
991
    // virtual path.
992
    //
993
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
994
    // reported by an invocation of _impl::get_realm_names()), then the
995
    // corresponding virtual path is in `m_realm_names`, assuming no external
996
    // file-system level intervention.
997
    std::set<std::string> m_realm_names;
998

999
    std::unique_ptr<network::ssl::Context> m_ssl_context;
1000
    ServerFileAccessCache m_file_access_cache;
1001
    Worker m_worker;
1002
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1003
    network::Acceptor m_acceptor;
1004
    std::int_fast64_t m_next_conn_id = 0;
1005
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1006
    network::Endpoint m_next_http_conn_endpoint;
1007
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1008
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1009
    ServerProtocol m_server_protocol;
1010
    compression::CompressMemoryArena m_compress_memory_arena;
1011
    MiscBuffers m_misc_buffers;
1012
    int_fast64_t m_current_server_session_ident;
1013
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1014
    bool m_allow_load_balancing = false;
1015

1016
    util::Mutex m_mutex;
1017

1018
    bool m_stopped = false; // Protected by `m_mutex`
1019

1020
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1021
    // When m_sync_stopped is true, the server does not perform any sync.
1022
    bool m_sync_stopped = false;
1023

1024
    std::atomic<bool> m_running{false}; // Debugging facility
1025

1026
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1027

1028
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1029

1030
    util::ScratchMemory m_scratch_memory;
1031

1032
    void listen();
1033
    void initiate_accept();
1034
    void handle_accept(std::error_code);
1035

1036
    void reap_connections();
1037
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1038
    void do_close_connections();
1039

1040
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1041
    {
12,256✔
1042
        if (config.max_upload_backlog == 0)
12,256✔
1043
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
12,256✔
1044
        return config.max_upload_backlog;
×
1045
    }
×
1046

1047
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1048
    {
12,256✔
1049
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
12,256✔
1050
        const int actual_max = get_current_protocol_version();
12,256✔
1051
        static_assert(actual_min <= actual_max, "");
12,256✔
1052
        int min = actual_min;
12,256✔
1053
        int max = actual_max;
12,256✔
1054
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
12,256!
1055
            if (config.max_protocol_version < min)
×
1056
                throw Server::NoSupportedProtocolVersions();
×
1057
            max = config.max_protocol_version;
×
1058
        }
×
1059
        return {min, max};
12,256✔
1060
    }
12,256✔
1061

1062
    void do_recognize_external_change(const std::string& virt_path);
1063

1064
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1065
                                                     milliseconds_type timeout);
1066
};
1067

1068
// ============================ SyncConnection ============================
1069

1070
class SyncConnection : public websocket::Config {
1071
public:
1072
    const std::shared_ptr<util::Logger> logger_ptr;
1073
    util::Logger& logger;
1074

1075
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1076
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1077
    // Clients with sync protocol version less than 10 do not support log messages
1078
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1079

1080
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1081
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1082
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1083
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1084
        : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1085
                                                          serv.logger_ptr)} // Throws
1086
        , logger{*logger_ptr}
1087
        , m_server{serv}
1088
        , m_id{id}
1089
        , m_socket{std::move(socket)}
1090
        , m_ssl_stream{std::move(ssl_stream)}
1091
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
1092
        , m_websocket{*this}
1093
        , m_client_protocol_version{client_protocol_version}
1094
        , m_client_user_agent{std::move(client_user_agent)}
1095
        , m_remote_endpoint{std::move(remote_endpoint)}
1096
        , m_appservices_request_id{std::move(appservices_request_id)}
1097
    {
17,296✔
1098
        // Make the output buffer stream throw std::bad_alloc if it fails to
8,658✔
1099
        // expand the buffer
8,658✔
1100
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
17,296✔
1101

8,658✔
1102
        network::Service& service = m_server.get_service();
17,296✔
1103
        auto handler = [this](Status status) {
892,322✔
1104
            if (!status.is_ok())
892,322✔
1105
                return;
×
1106
            if (!m_is_sending)
892,322✔
1107
                send_next_message(); // Throws
389,174✔
1108
        };
892,322✔
1109
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
17,296✔
1110
    }
17,296✔
1111

1112
    ~SyncConnection() noexcept;
1113

1114
    ServerImpl& get_server() noexcept
1115
    {
1,054,042✔
1116
        return m_server;
1,054,042✔
1117
    }
1,054,042✔
1118

1119
    ServerProtocol& get_server_protocol() noexcept
1120
    {
1,266,906✔
1121
        return m_server.get_server_protocol();
1,266,906✔
1122
    }
1,266,906✔
1123

1124
    int get_client_protocol_version()
1125
    {
954,520✔
1126
        return m_client_protocol_version;
954,520✔
1127
    }
954,520✔
1128

1129
    const std::string& get_client_user_agent() const noexcept
1130
    {
57,842✔
1131
        return m_client_user_agent;
57,842✔
1132
    }
57,842✔
1133

1134
    const std::string& get_remote_endpoint() const noexcept
1135
    {
57,842✔
1136
        return m_remote_endpoint;
57,842✔
1137
    }
57,842✔
1138

1139
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1140
    {
17,296✔
1141
        return logger_ptr;
17,296✔
1142
    }
17,296✔
1143

1144
    std::mt19937_64& websocket_get_random() noexcept final override
1145
    {
596,916✔
1146
        return m_server.get_random();
596,916✔
1147
    }
596,916✔
1148

1149
    bool websocket_binary_message_received(const char* data, size_t size) final override
1150
    {
674,334✔
1151
        using sf = _impl::SimulatedFailure;
674,334✔
1152
        if (sf::check_trigger(sf::sync_server__read_head)) {
674,334✔
1153
            // Suicide
2,234✔
1154
            read_error(sf::sync_server__read_head);
4,176✔
1155
            return false;
4,176✔
1156
        }
4,176✔
1157
        // After a connection level error has occurred, all incoming messages
312,286✔
1158
        // will be ignored. By continuing to read until end of input, the server
312,286✔
1159
        // is able to know when the client closes the connection, which in
312,286✔
1160
        // general means that is has received the ERROR message.
312,286✔
1161
        if (REALM_LIKELY(!m_is_closing)) {
670,158✔
1162
            m_last_activity_at = steady_clock_now();
669,974✔
1163
            handle_message_received(data, size);
669,974✔
1164
        }
669,974✔
1165
        return true;
670,158✔
1166
    }
670,158✔
1167

1168
    bool websocket_ping_message_received(const char* data, size_t size) final override
1169
    {
×
1170
        if (REALM_LIKELY(!m_is_closing)) {
×
1171
            m_last_activity_at = steady_clock_now();
×
1172
            handle_ping_received(data, size);
×
1173
        }
×
1174
        return true;
×
1175
    }
×
1176

1177
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1178
    {
596,926✔
1179
        if (m_ssl_stream) {
596,926✔
1180
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
632✔
1181
        }
632✔
1182
        else {
596,294✔
1183
            m_socket->async_write(data, size, std::move(handler)); // Throws
596,294✔
1184
        }
596,294✔
1185
    }
596,926✔
1186

1187
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1188
    {
2,035,924✔
1189
        if (m_ssl_stream) {
2,035,924✔
1190
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
1,572✔
1191
        }
1,572✔
1192
        else {
2,034,352✔
1193
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
2,034,352✔
1194
        }
2,034,352✔
1195
    }
2,035,924✔
1196

1197
    void async_read_until(char* buffer, size_t size, char delim,
1198
                          websocket::ReadCompletionHandler handler) final override
1199
    {
×
1200
        if (m_ssl_stream) {
×
1201
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1202
                                           std::move(handler)); // Throws
×
1203
        }
×
1204
        else {
×
1205
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1206
                                       std::move(handler)); // Throws
×
1207
        }
×
1208
    }
×
1209

1210
    void websocket_read_error_handler(std::error_code ec) final override
1211
    {
5,702✔
1212
        read_error(ec);
5,702✔
1213
    }
5,702✔
1214

1215
    void websocket_write_error_handler(std::error_code ec) final override
1216
    {
×
1217
        write_error(ec);
×
1218
    }
×
1219

1220
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
1221
                                           const std::string_view*) final override
1222
    {
1223
        // WebSocket class has already logged a message for this error
1224
        close_due_to_error(ec); // Throws
×
1225
    }
1226

1227
    void websocket_protocol_error_handler(std::error_code ec) final override
1228
    {
×
1229
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1230
        close_due_to_error(ec);                                              // Throws
×
1231
    }
1232

1233
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1234
    {
1235
        // This is not called since we handle HTTP request in handle_request_for_sync()
1236
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
1237
    }
1238

1239
    int_fast64_t get_id() const noexcept
1240
    {
×
1241
        return m_id;
×
1242
    }
1243

1244
    network::Socket& get_socket() noexcept
1245
    {
×
1246
        return *m_socket;
×
1247
    }
1248

1249
    void initiate();
1250

1251
    // Commits suicide
1252
    template <class... Params>
1253
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1254

1255
    // Commits suicide
1256
    void terminate_if_dead(SteadyTimePoint now);
1257

1258
    void enlist_to_send(Session*) noexcept;
1259

1260
    // Sessions should get the output_buffer and insert a message, after which
1261
    // they call initiate_write_output_buffer().
1262
    OutputBuffer& get_output_buffer()
69,580✔
1263
    {
596,930✔
1264
        m_output_buffer.reset();
596,930✔
1265
        return m_output_buffer;
596,930✔
1266
    }
527,350✔
1267

1268
    // More advanced memory strategies can be implemented if needed.
69,340✔
1269
    void release_output_buffer() {}
525,824✔
1270

1271
    // When this function is called, the connection will initiate a write with
1272
    // its output_buffer. Sessions use this method.
1273
    void initiate_write_output_buffer();
1274

1275
    void initiate_pong_output_buffer();
1276

1277
    void handle_protocol_error(Status status);
1278

1279
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1280
                              bool need_client_file_ident, bool is_subserver);
1281

1282
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1283
                               salt_type client_file_ident_salt, version_type scan_server_version,
1284
                               version_type scan_client_version, version_type latest_server_version,
1285
                               salt_type latest_server_version_salt);
1286

1287
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1288
                                version_type progress_server_version, version_type locked_server_version,
1289
                                const UploadChangesets&);
1290

1291
    void receive_mark_message(session_ident_type, request_ident_type);
1292

1293
    void receive_unbind_message(session_ident_type);
1294

1295
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1296

1297
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1298

1299
    void protocol_error(ProtocolError, Session* = nullptr);
1300

1301
    void initiate_soft_close();
1302

1303
    void discard_session(session_ident_type) noexcept;
1304

1305
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1306
                          std::optional<std::string> co_id = std::nullopt);
1307

1308
private:
1309
    ServerImpl& m_server;
1310
    const int_fast64_t m_id;
1311
    std::unique_ptr<network::Socket> m_socket;
1312
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1313
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1314

1315
    websocket::Socket m_websocket;
1316
    std::unique_ptr<char[]> m_input_body_buffer;
1317
    OutputBuffer m_output_buffer;
1318
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1319

1320
    // The protocol version in use by the connected client.
1321
    const int m_client_protocol_version;
1322

1323
    // The user agent description passed by the client.
1324
    const std::string m_client_user_agent;
1325

1326
    const std::string m_remote_endpoint;
1327

1328
    const std::string m_appservices_request_id;
1329

1330
    // A queue of sessions that have enlisted for an opportunity to send a
1331
    // message. Sessions will be served in the order that they enlist. A session
1332
    // can only occur once in this queue (linked list). If the queue is not
1333
    // empty, and no message is currently being written to the socket, the first
1334
    // session is taken out of the queue, and then granted an opportunity to
1335
    // send a message.
1336
    //
1337
    // Sessions will never be destroyed while in this queue. This is ensured
1338
    // because the connection owns the sessions that are associated with it, and
1339
    // the connection only removes a session from m_sessions at points in time
1340
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1341
    // (Connection::send_next_message() and Connection::~Connection()).
1342
    SessionQueue m_sessions_enlisted_to_send;
1343

1344
    Session* m_receiving_session = nullptr;
1345

1346
    bool m_is_sending = false;
1347
    bool m_is_closing = false;
1348

1349
    bool m_send_pong = false;
1350
    bool m_sending_pong = false;
1351

1352
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1353

1354
    milliseconds_type m_last_ping_timestamp = 0;
1355

1356
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1357
    // set to true (initiation of soft close). Otherwise, if no messages have
1358
    // been received from the client, this is the time at which the connection
1359
    // object was initiated (completion of WebSocket handshake). Otherwise this
1360
    // is the time at which the last message was received from the client.
1361
    SteadyTimePoint m_last_activity_at;
1362

1363
    // These are initialized by do_initiate_soft_close().
1364
    //
1365
    // With recent versions of the protocol (when the version is greater than,
1366
    // or equal to 23), `m_error_session_ident` is always zero.
1367
    ProtocolError m_error_code = {};
1368
    session_ident_type m_error_session_ident = 0;
1369

1370
    struct LogMessage {
1371
        session_ident_type sess_ident;
1372
        util::Logger::Level level;
1373
        std::string message;
1374
        std::optional<std::string> co_id;
1375
    };
1376

1377
    std::mutex m_log_mutex;
1378
    std::queue<LogMessage> m_log_messages;
1379

1380
    static std::string make_logger_prefix(int_fast64_t id)
2,014✔
1381
    {
17,294✔
1382
        std::ostringstream out;
17,294✔
1383
        out.imbue(std::locale::classic());
17,294✔
1384
        out << "Sync Connection[" << id << "]: "; // Throws
17,294✔
1385
        return out.str();                         // Throws
17,294✔
1386
    }
15,280✔
1387

1388
    // The return value of handle_message_received() designates whether
1389
    // message processing should continue. If the connection object is
1390
    // destroyed during execution of handle_message_received(), the return
1391
    // value must be false.
1392
    void handle_message_received(const char* data, size_t size);
1393

1394
    void handle_ping_received(const char* data, size_t size);
1395

1396
    void send_next_message();
1397
    void send_pong(milliseconds_type timestamp);
1398
    void send_log_message(const LogMessage& log_msg);
1399

1400
    void handle_write_output_buffer();
1401
    void handle_pong_output_buffer();
1402

1403
    void initiate_write_error(ProtocolError, session_ident_type);
1404
    void handle_write_error(std::error_code ec);
1405

1406
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1407
    void read_error(std::error_code);
1408
    void write_error(std::error_code);
1409

1410
    void close_due_to_close_by_client(std::error_code);
1411
    void close_due_to_error(std::error_code);
1412

1413
    void terminate_sessions();
1414

1415
    void bad_session_ident(const char* message_type, session_ident_type);
1416
    void message_after_unbind(const char* message_type, session_ident_type);
1417
    void message_before_ident(const char* message_type, session_ident_type);
1418
};
1419

1420

1421
inline void SyncConnection::read_error(std::error_code ec)
1,186✔
1422
{
9,878✔
1423
    REALM_ASSERT(ec != util::error::operation_aborted);
9,878✔
1424
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
9,058✔
1425
        // Suicide
3,496✔
1426
        close_due_to_close_by_client(ec); // Throws
5,702✔
1427
        return;
5,702✔
1428
    }
5,592✔
1429
    if (ec == util::MiscExtErrors::delim_not_found) {
3,638✔
1430
        logger.error("Input message head delimited not found"); // Throws
×
1431
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1432
        return;
×
1433
    }
334✔
1434

2,438✔
1435
    logger.error("Reading failed: %1", ec.message()); // Throws
3,972✔
1436

2,234✔
1437
    // Suicide
2,438✔
1438
    close_due_to_error(ec); // Throws
4,176✔
1439
}
3,638✔
1440

1441
inline void SyncConnection::write_error(std::error_code ec)
1442
{
×
1443
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1444
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1445
        // Suicide
1446
        close_due_to_close_by_client(ec); // Throws
×
1447
        return;
×
1448
    }
×
1449
    logger.error("Writing failed: %1", ec.message()); // Throws
1450

1451
    // Suicide
1452
    close_due_to_error(ec); // Throws
×
1453
}
1454

1455

1456
// ============================ HTTPConnection ============================
1457

1458
std::string g_user_agent = "User-Agent";
1459

1460
class HTTPConnection {
1461
public:
1462
    const std::shared_ptr<Logger> logger_ptr;
1463
    util::Logger& logger;
1464

1465
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1466
        : logger_ptr{std::make_shared<PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1467
                                                    serv.logger_ptr)} // Throws
1468
        , logger{*logger_ptr}
1469
        , m_server{serv}
1470
        , m_id{id}
1471
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1472
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1473
        , m_http_server{*this, logger_ptr}
3,218✔
1474
    {
28,298✔
1475
        // Make the output buffer stream throw std::bad_alloc if it fails to
14,482✔
1476
        // expand the buffer
16,080✔
1477
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
28,298✔
1478

16,080✔
1479
        if (is_ssl) {
26,726✔
1480
            using namespace network::ssl;
432✔
1481
            Context& ssl_context = serv.get_ssl_context();
432✔
1482
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
432✔
1483
                                                    Stream::server); // Throws
432✔
1484
        }
3,602✔
1485
    }
26,678✔
1486

1487
    ServerImpl& get_server() noexcept
1488
    {
×
1489
        return m_server;
×
1490
    }
1491

1492
    int_fast64_t get_id() const noexcept
2,066✔
1493
    {
17,642✔
1494
        return m_id;
17,642✔
1495
    }
15,576✔
1496

1497
    network::Socket& get_socket() noexcept
4,944✔
1498
    {
44,694✔
1499
        return *m_socket;
44,694✔
1500
    }
39,750✔
1501

1502
    template <class H>
1503
    void async_write(const char* data, size_t size, H handler)
2,034✔
1504
    {
17,474✔
1505
        if (m_ssl_stream) {
15,454✔
1506
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
126✔
1507
        }
2,132✔
1508
        else {
17,348✔
1509
            m_socket->async_write(data, size, std::move(handler)); // Throws
17,348✔
1510
        }
17,362✔
1511
    }
15,440✔
1512

1513
    template <class H>
1514
    void async_read(char* buffer, size_t size, H handler)
8✔
1515
    {
72!
1516
        if (m_ssl_stream) {
64!
1517
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1518
                                     std::move(handler)); // Throws
×
1519
        }
8✔
1520
        else {
72✔
1521
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
72✔
1522
                                 std::move(handler)); // Throws
72✔
1523
        }
72✔
1524
    }
64✔
1525

1526
    template <class H>
1527
    void async_read_until(char* buffer, size_t size, char delim, H handler)
18,208✔
1528
    {
156,270!
1529
        if (m_ssl_stream) {
138,188!
1530
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
1,134✔
1531
                                           std::move(handler)); // Throws
1,134✔
1532
        }
19,090✔
1533
        else {
155,136✔
1534
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
155,136✔
1535
                                       std::move(handler)); // Throws
155,136✔
1536
        }
155,262✔
1537
    }
138,062✔
1538

1539
    void initiate(std::string remote_endpoint)
2,066✔
1540
    {
17,640✔
1541
        m_last_activity_at = steady_clock_now();
17,640✔
1542
        m_remote_endpoint = std::move(remote_endpoint);
16,670✔
1543

9,792✔
1544
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
16,670✔
1545

9,792✔
1546
        if (m_ssl_stream) {
15,598✔
1547
            initiate_ssl_handshake(); // Throws
216✔
1548
        }
2,234✔
1549
        else {
17,424✔
1550
            initiate_http(); // Throws
17,424✔
1551
        }
17,448✔
1552
    }
15,574✔
1553

1554
    void respond_200_ok()
1555
    {
×
1556
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1557
    }
1558

1559
    void respond_404_not_found()
1560
    {
×
1561
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1562
    }
1563

1564
    void respond_503_service_unavailable()
1565
    {
×
1566
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1567
    }
1568

1569
    // Commits suicide
1570
    template <class... Params>
1571
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
50✔
1572
    {
340✔
1573
        logger.log(log_level, log_message, log_params...); // Throws
340✔
1574
        m_ssl_stream.reset();
340✔
1575
        m_socket.reset();
340✔
1576
        m_server.remove_http_connection(m_id); // Suicide
340✔
1577
    }
290✔
1578

1579
    // Commits suicide
1580
    void terminate_if_dead(SteadyTimePoint now)
4✔
1581
    {
12✔
1582
        milliseconds_type time = steady_duration(m_last_activity_at, now);
12✔
1583
        const Server::Config& config = m_server.get_config();
12✔
1584
        if (m_is_sending) {
8✔
1585
            if (time >= config.http_response_timeout) {
6✔
1586
                // Suicide
1587
                terminate(Logger::Level::detail,
×
1588
                          "HTTP connection closed (request timeout)"); // Throws
×
UNCOV
1589
            }
×
1590
        }
10✔
1591
        else {
6✔
1592
            if (time >= config.http_request_timeout) {
2✔
1593
                // Suicide
1594
                terminate(Logger::Level::detail,
×
1595
                          "HTTP connection closed (response timeout)"); // Throws
×
1596
            }
4✔
1597
        }
6✔
1598
    }
8✔
1599

1600
    std::string get_appservices_request_id() const
2,034✔
1601
    {
17,470✔
1602
        return m_appservices_request_id.to_string();
17,470✔
1603
    }
15,436✔
1604

1605
private:
1606
    ServerImpl& m_server;
1607
    const int_fast64_t m_id;
1608
    const ObjectId m_appservices_request_id = ObjectId::gen();
1609
    std::unique_ptr<network::Socket> m_socket;
1610
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1611
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1612
    HTTPServer<HTTPConnection> m_http_server;
1613
    OutputBuffer m_output_buffer;
1614
    bool m_is_sending = false;
1615
    SteadyTimePoint m_last_activity_at;
1616
    std::string m_remote_endpoint;
1617
    int m_negotiated_protocol_version = 0;
1618

1619
    void initiate_ssl_handshake()
24✔
1620
    {
214✔
1621
        auto handler = [this](std::error_code ec) {
210✔
1622
            if (ec != util::error::operation_aborted)
210✔
1623
                handle_ssl_handshake(ec); // Throws
210✔
1624
        };
212✔
1625
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
216✔
1626
    }
192✔
1627

1628
    void handle_ssl_handshake(std::error_code ec)
22✔
1629
    {
210✔
1630
        if (ec) {
196✔
1631
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
84✔
1632
            close_due_to_error(ec);                                         // Throws
84✔
1633
            return;
84✔
1634
        }
90✔
1635
        initiate_http(); // Throws
126✔
1636
    }
112✔
1637

1638
    void initiate_http()
2,056✔
1639
    {
17,552✔
1640
        logger.debug("Connection initiates HTTP receipt");
16,586✔
1641

9,734✔
1642
        auto handler = [this](HTTPRequest request, std::error_code ec) {
17,552✔
1643
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
16,586✔
1644
                return;
9,734✔
1645
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
15,496✔
1646
                logger.error("Malformed HTTP request");
×
1647
                close_due_to_error(ec); // Throws
×
1648
                return;
×
1649
            }
2,056✔
1650
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
15,504✔
1651
                logger.error("Bad HTTP request");
72✔
1652
                const char* body = "The HTTP request was corrupted";
72✔
1653
                handle_400_bad_request(body); // Throws
72✔
1654
                return;
72✔
1655
            }
2,112✔
1656
            if (REALM_UNLIKELY(ec)) {
15,454✔
1657
                read_error(ec); // Throws
76✔
1658
                return;
76✔
1659
            }
2,080✔
1660
            handle_http_request(std::move(request)); // Throws
17,404✔
1661
        };
17,434✔
1662
        m_http_server.async_receive_request(std::move(handler)); // Throws
17,552✔
1663
    }
15,496✔
1664

1665
    void handle_http_request(const HTTPRequest& request)
2,026✔
1666
    {
17,404✔
1667
        StringData path = request.path;
16,450✔
1668

9,666✔
1669
        logger.debug("HTTP request received, request = %1", request);
16,450✔
1670

9,666✔
1671
        m_is_sending = true;
17,404✔
1672
        m_last_activity_at = steady_clock_now();
16,450✔
1673

8,712✔
1674
        // FIXME: When thinking of this function as a switching device, it seem
8,712✔
1675
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
8,712✔
1676
        // supposed to be mandatory, then that check ought to be delegated to
8,712✔
1677
        // handle_request_for_sync(), as that will yield a sharper separation of
8,712✔
1678
        // concerns.
9,666✔
1679
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
17,392✔
1680
            handle_request_for_sync(request); // Throws
17,296✔
1681
        }
15,294✔
1682
        else {
108✔
1683
            handle_404_not_found(request); // Throws
108✔
1684
        }
2,122✔
1685
    }
15,378✔
1686

1687
    void handle_request_for_sync(const HTTPRequest& request)
2,014✔
1688
    {
17,296✔
1689
        if (m_server.is_sync_stopped()) {
15,282✔
1690
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1691
                         "stopped"); // Throws
×
1692
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1693
                                                    "connections"); // Throws
×
1694
            return;
×
1695
        }
1,066✔
1696

9,606✔
1697
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
16,348✔
1698

8,658✔
1699
        // Figure out whether there are any protocol versions supported by both
8,658✔
1700
        // the client and the server, and if so, choose the newest one of them.
9,606✔
1701
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
17,296✔
1702
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
17,296✔
1703
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
17,296✔
1704
        {
17,296✔
1705
            protocol_version_ranges.clear();
17,296✔
1706
            util::MemoryInputStream in;
17,296✔
1707
            in.imbue(std::locale::classic());
17,296✔
1708
            in.unsetf(std::ios_base::skipws);
17,296✔
1709
            std::string_view value;
17,296✔
1710
            if (sec_websocket_protocol)
17,296✔
1711
                value = *sec_websocket_protocol;
17,294✔
1712
            HttpListHeaderValueParser parser{value};
17,296✔
1713
            std::string_view elem;
39,450✔
1714
            while (parser.next(elem)) {
195,100✔
1715
                // FIXME: Use std::string_view::begins_with() in C++20.
105,652✔
1716
                const StringData protocol{elem};
190,230✔
1717
                std::string_view prefix;
190,230✔
1718
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
190,232✔
1719
                    prefix = get_pbs_websocket_protocol_prefix();
2,147,651,729✔
1720
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
6,442,450,947✔
1721
                    prefix = get_old_pbs_websocket_protocol_prefix();
22,152✔
1722
                if (!prefix.empty()) {
190,240✔
1723
                    auto parse_version = [&](std::string_view str) {
190,238✔
1724
                        in.set_buffer(str.data(), str.data() + str.size());
190,238✔
1725
                        int version = 0;
190,238✔
1726
                        in >> version;
190,238✔
1727
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
190,240✔
1728
                            return version;
2,147,651,741✔
1729
                        return -1;
10,737,418,243✔
1730
                    };
8,589,956,748✔
1731
                    int min, max;
190,230✔
1732
                    std::string_view range = elem.substr(prefix.size());
190,230✔
1733
                    auto i = range.find('-');
190,230✔
1734
                    if (i != std::string_view::npos) {
168,078✔
1735
                        min = parse_version(range.substr(0, i));
×
1736
                        max = parse_version(range.substr(i + 1));
×
1737
                    }
22,152✔
1738
                    else {
190,230✔
1739
                        min = parse_version(range);
190,230✔
1740
                        max = min;
190,230✔
1741
                    }
190,232✔
1742
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
190,246✔
1743
                        protocol_version_ranges.emplace_back(min, max); // Throws
190,246✔
1744
                        continue;
190,246✔
1745
                    }
2,147,651,739✔
1746
                    logger.error("Protocol version negotiation failed: Client sent malformed "
12,884,901,882✔
1747
                                 "specification of supported protocol versions: '%1'",
12,884,901,882✔
1748
                                 elem); // Throws
12,884,901,882✔
1749
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
12,884,901,882✔
1750
                                           "specification of supported protocol "
12,884,901,882✔
1751
                                           "versions\n"); // Throws
12,884,901,882✔
1752
                    return;
12,884,901,882✔
1753
                }
10,737,418,235✔
1754
                logger.warn("Unrecognized protocol token in HTTP response header "
6,442,450,951✔
1755
                            "Sec-WebSocket-Protocol: '%1'",
6,442,450,951✔
1756
                            elem); // Throws
6,442,450,951✔
1757
            }
6,442,452,967✔
1758
            if (protocol_version_ranges.empty()) {
15,296✔
1759
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1760
                             "specification of supported protocol versions"); // Throws
×
1761
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1762
                                       "of supported protocol versions\n"); // Throws
×
1763
                return;
×
1764
            }
2,016✔
1765
        }
17,312✔
1766
        {
17,312✔
1767
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
17,312✔
1768
            int server_min = server_range.first;
17,312✔
1769
            int server_max = server_range.second;
17,312✔
1770
            int best_match = 0;
17,312✔
1771
            int overall_client_min = std::numeric_limits<int>::max();
17,312✔
1772
            int overall_client_max = std::numeric_limits<int>::min();
37,450✔
1773
            for (const auto& range : protocol_version_ranges) {
190,212✔
1774
                int client_min = range.first;
190,212✔
1775
                int client_max = range.second;
190,212✔
1776
                if (client_max >= server_min && client_min <= server_max) {
179,784✔
1777
                    // Overlap
105,666✔
1778
                    int version = std::min(client_max, server_max);
190,212✔
1779
                    if (version > best_match) {
170,072✔
1780
                        best_match = version;
17,296✔
1781
                    }
37,436✔
1782
                }
190,212✔
1783
                if (client_min < overall_client_min)
190,212✔
1784
                    overall_client_min = client_min;
190,208✔
1785
                if (client_max > overall_client_max)
170,072✔
1786
                    overall_client_max = client_max;
37,436✔
1787
            }
170,074✔
1788
            Formatter& formatter = misc_buffers.formatter;
17,312✔
1789
            if (REALM_UNLIKELY(best_match == 0)) {
15,296✔
1790
                const char* elaboration = "No version supported by both client and server";
×
1791
                const char* identifier_hint = nullptr;
×
1792
                if (overall_client_max < server_min) {
×
1793
                    // Client is too old
1794
                    elaboration = "Client is too old for server";
×
1795
                    identifier_hint = "CLIENT_TOO_OLD";
×
1796
                }
×
1797
                else if (overall_client_min > server_max) {
×
1798
                    // Client is too new
1799
                    elaboration = "Client is too new for server";
×
1800
                    identifier_hint = "CLIENT_TOO_NEW";
×
1801
                }
×
1802
                auto format_ranges = [&](const auto& list) {
×
1803
                    bool nonfirst = false;
×
1804
                    for (auto range : list) {
×
1805
                        if (nonfirst)
×
1806
                            formatter << ", "; // Throws
×
1807
                        int min = range.first, max = range.second;
×
1808
                        REALM_ASSERT(min <= max);
×
1809
                        formatter << min;
×
1810
                        if (max != min)
×
1811
                            formatter << "-" << max;
×
1812
                        nonfirst = true;
×
1813
                    }
×
1814
                };
×
1815
                using Range = ProtocolVersionRange;
×
1816
                formatter.reset();
×
1817
                format_ranges(protocol_version_ranges); // Throws
×
1818
                logger.error("Protocol version negotiation failed: %1 "
×
1819
                             "(client supports: %2)",
×
1820
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1821
                formatter.reset();
×
1822
                formatter << "Protocol version negotiation failed: "
×
1823
                             ""
×
1824
                          << elaboration << ".\n\n";                                   // Throws
×
1825
                formatter << "Server supports: ";                                      // Throws
×
1826
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1827
                formatter << "\n";                                                     // Throws
×
1828
                formatter << "Client supports: ";                                      // Throws
×
1829
                format_ranges(protocol_version_ranges);                                // Throws
×
1830
                formatter << "\n\n";                                                   // Throws
×
1831
                formatter << "REALM_SYNC_PROTOCOL_MISMATCH";                           // Throws
×
1832
                if (identifier_hint)
×
1833
                    formatter << ":" << identifier_hint;                      // Throws
×
1834
                formatter << "\n";                                            // Throws
×
1835
                handle_400_bad_request({formatter.data(), formatter.size()}); // Throws
×
1836
                return;
×
1837
            }
2,016✔
1838
            m_negotiated_protocol_version = best_match;
17,312✔
1839
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
17,312✔
1840
                         m_negotiated_protocol_version); // Throws
17,312✔
1841
            formatter.reset();
17,312✔
1842
        }
16,362✔
1843

9,614✔
1844
        std::string sec_websocket_protocol_2;
17,312✔
1845
        {
17,312✔
1846
            std::string_view prefix =
17,312✔
1847
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
16,362✔
1848
                    ? get_old_pbs_websocket_protocol_prefix()
9,614✔
1849
                    : get_pbs_websocket_protocol_prefix();
17,312✔
1850
            std::ostringstream out;
17,312✔
1851
            out.imbue(std::locale::classic());
17,312✔
1852
            out << prefix << m_negotiated_protocol_version; // Throws
17,312✔
1853
            sec_websocket_protocol_2 = std::move(out).str();
17,312✔
1854
        }
16,362✔
1855

9,614✔
1856
        std::error_code ec;
17,312✔
1857
        util::Optional<HTTPResponse> response =
17,312✔
1858
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
16,362✔
1859

9,614✔
1860
        if (ec) {
15,296✔
1861
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1862
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1863
            }
×
1864
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1865
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1866
            }
×
1867
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1868
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1869
            }
×
1870
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1871
                logger.error("The header Sec-WebSocket-Key is missing");
×
1872
            }
1873

1874
            logger.error("The HTTP request with the error is:\n%1", request);
×
1875
            logger.error("Check the proxy configuration and make sure that the "
×
1876
                         "HTTP request is a valid Websocket request.");
×
1877
            close_due_to_error(ec);
×
1878
            return;
×
1879
        }
2,016✔
1880
        REALM_ASSERT(response);
17,312✔
1881
        add_common_http_response_headers(*response);
16,362✔
1882

9,614✔
1883
        std::string user_agent;
17,312✔
1884
        {
17,312✔
1885
            auto i = request.headers.find(g_user_agent);
17,312✔
1886
            if (i != request.headers.end())
17,310✔
1887
                user_agent = i->second; // Throws (copy)
17,298✔
1888
        }
16,362✔
1889

9,614✔
1890
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
17,310✔
1891
                        this](std::error_code ec) {
16,350✔
1892
            // If the operation is aborted, the socket object may have been destroyed.
9,606✔
1893
            if (ec != util::error::operation_aborted) {
17,290✔
1894
                if (ec) {
15,278✔
1895
                    write_error(ec);
×
1896
                    return;
×
1897
                }
1,066✔
1898

9,604✔
1899
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
17,290✔
1900
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
17,290✔
1901
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
17,290✔
1902
                    get_appservices_request_id()); // Throws
17,290✔
1903
                SyncConnection& sync_conn_ref = *sync_conn;
17,290✔
1904
                m_server.add_sync_connection(m_id, std::move(sync_conn));
17,290✔
1905
                m_server.remove_http_connection(m_id);
17,290✔
1906
                sync_conn_ref.initiate();
17,290✔
1907
            }
17,292✔
1908
        };
17,294✔
1909
        m_http_server.async_send_response(*response, std::move(handler));
17,312✔
1910
    }
15,296✔
1911

1912
    void handle_text_response(HTTPStatus http_status, std::string_view body)
20✔
1913
    {
180✔
1914
        std::string body_2 = std::string(body); // Throws
170✔
1915

100✔
1916
        HTTPResponse response;
180✔
1917
        response.status = http_status;
180✔
1918
        add_common_http_response_headers(response);
180✔
1919
        response.headers["Connection"] = "close";
170✔
1920

100✔
1921
        if (!body_2.empty()) {
180✔
1922
            response.headers["Content-Length"] = util::to_string(body_2.size());
180✔
1923
            response.body = std::move(body_2);
180✔
1924
        }
170✔
1925

100✔
1926
        auto handler = [this](std::error_code ec) {
180✔
1927
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
170✔
1928
                return;
100✔
1929
            if (REALM_UNLIKELY(ec)) {
160✔
1930
                write_error(ec);
×
1931
                return;
×
1932
            }
20✔
1933
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
180✔
1934
        };
180✔
1935
        m_http_server.async_send_response(response, std::move(handler));
180✔
1936
    }
160✔
1937

1938
    void handle_400_bad_request(std::string_view body)
8✔
1939
    {
72✔
1940
        logger.detail("400 Bad Request");
72✔
1941
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
72✔
1942
    }
64✔
1943

1944
    void handle_404_not_found(const HTTPRequest&)
12✔
1945
    {
108✔
1946
        logger.detail("404 Not Found"); // Throws
108✔
1947
        handle_text_response(HTTPStatus::NotFound,
108✔
1948
                             "Realm sync server\n\nPage not found\n"); // Throws
108✔
1949
    }
96✔
1950

1951
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1952
    {
×
1953
        logger.debug("503 Service Unavailable");                       // Throws
×
1954
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1955
    }
1956

1957
    void add_common_http_response_headers(HTTPResponse& response)
2,034✔
1958
    {
17,474✔
1959
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
17,474✔
1960
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
15,450✔
1961
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
100✔
1962
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
180✔
1963
        }
2,194✔
1964
    }
15,440✔
1965

1966
    void read_error(std::error_code ec)
22✔
1967
    {
76✔
1968
        REALM_ASSERT(ec != util::error::operation_aborted);
76!
1969
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
68!
1970
            // Suicide
28✔
1971
            close_due_to_close_by_client(ec); // Throws
76✔
1972
            return;
76✔
1973
        }
54!
1974
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1975
            logger.error("Input message head delimited not found"); // Throws
×
1976
            close_due_to_error(ec);                                 // Throws
×
1977
            return;
×
1978
        }
1979

1980
        logger.error("Reading failed: %1", ec.message()); // Throws
1981

1982
        // Suicide
1983
        close_due_to_error(ec); // Throws
×
1984
    }
1985

1986
    void write_error(std::error_code ec)
1987
    {
×
1988
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1989
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1990
            // Suicide
1991
            close_due_to_close_by_client(ec); // Throws
×
1992
            return;
×
1993
        }
×
1994
        logger.error("Writing failed: %1", ec.message()); // Throws
1995

1996
        // Suicide
1997
        close_due_to_error(ec); // Throws
×
1998
    }
1999

2000
    void close_due_to_close_by_client(std::error_code ec)
22✔
2001
    {
76✔
2002
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
68✔
2003
        // Suicide
28✔
2004
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
76✔
2005
    }
54✔
2006

2007
    void close_due_to_error(std::error_code ec)
8✔
2008
    {
82✔
2009
        // Suicide
56✔
2010
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
84✔
2011
                  ec.message()); // Throws
84✔
2012
    }
76✔
2013

2014
    static std::string make_logger_prefix(int_fast64_t id)
3,218✔
2015
    {
29,896✔
2016
        std::ostringstream out;
29,896✔
2017
        out.imbue(std::locale::classic());
29,896✔
2018
        out << "HTTP Connection[" << id << "]: "; // Throws
29,896✔
2019
        return out.str();                         // Throws
29,896✔
2020
    }
26,678✔
2021
};
2022

2023

2024
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2025
public:
2026
    std::size_t num_changesets = 0;
2027
    std::size_t accum_original_size = 0;
2028
    std::size_t accum_compacted_size = 0;
2029

2030
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2031
        : m_protocol{protocol}
2032
        , m_buffer{buffer}
2033
        , m_logger{logger}
41,658✔
2034
    {
364,336✔
2035
    }
322,678✔
2036

2037
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
41,376✔
2038
    {
376,514✔
2039
        version_type client_version = entry.remote_version;
376,514✔
2040
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
376,514✔
2041
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
376,514✔
2042
        ++num_changesets;
376,514✔
2043
        accum_original_size += original_size;
376,514✔
2044
        accum_compacted_size += entry.changeset.size();
376,514✔
2045
    }
335,138✔
2046

2047
private:
2048
    ServerProtocol& m_protocol;
2049
    OutputBuffer& m_buffer;
2050
    util::Logger& m_logger;
2051
};
2052

2053

2054
// ============================ Session ============================
2055

2056
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2057
//   Protocol             ent file    IDENT    message   message   Error     message
2058
//   state                identifier  message  received  received  occurred  sent
2059
// ---------------------------------------------------------------------------------
2060
//   AllocatingIdent      yes         yes      no        no        no        no
2061
//   SendIdent            no          yes      no        no        no        no
2062
//   WaitForIdent         no          no       no        no        no        no
2063
//   WaitForUnbind        maybe       no       yes       no        no        no
2064
//   SendError            maybe       maybe    maybe     no        yes       no
2065
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2066
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2067
//
2068
//
2069
//   Condition                      Expression
2070
// ----------------------------------------------------------
2071
//   Need client file identifier    need_client_file_ident()
2072
//   Send IDENT message             must_send_ident_message()
2073
//   IDENT message received         ident_message_received()
2074
//   UNBIND message received        unbind_message_received()
2075
//   Error occurred                 error_occurred()
2076
//   ERROR message sent             m_error_message_sent
2077
//
2078
//
2079
//   Protocol
2080
//   state                Will send              Can receive
2081
// -----------------------------------------------------------------------
2082
//   AllocatingIdent      none                   UNBIND
2083
//   SendIdent            IDENT                  UNBIND
2084
//   WaitForIdent         none                   IDENT, UNBIND
2085
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2086
//                        MARK, ALLOC            ALLOC, UNBIND
2087
//   SendError            ERROR                  any
2088
//   WaitForUnbindErr     none                   any
2089
//   SendUnbound          UNBOUND                none
2090
//
2091
class Session final : private FileIdentReceiver {
2092
public:
2093
    util::PrefixLogger logger;
2094

2095
    Session(SyncConnection& conn, session_ident_type session_ident)
2096
        : logger{util::LogCategory::server, make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2097
        , m_connection{conn}
2098
        , m_session_ident{session_ident}
6,940✔
2099
    {
58,100✔
2100
    }
51,160✔
2101

2102
    ~Session() noexcept
6,940✔
2103
    {
58,104✔
2104
        REALM_ASSERT(!is_enlisted_to_send());
58,104✔
2105
        detach_from_server_file();
58,104✔
2106
    }
51,164✔
2107

2108
    SyncConnection& get_connection() noexcept
41,672✔
2109
    {
364,500✔
2110
        return m_connection;
364,500✔
2111
    }
322,828✔
2112

2113
    const Optional<std::array<char, 64>>& get_encryption_key()
2114
    {
×
2115
        return m_connection.get_server().get_config().encryption_key;
×
2116
    }
2117

2118
    session_ident_type get_session_ident() const noexcept
156✔
2119
    {
1,428✔
2120
        return m_session_ident;
1,428✔
2121
    }
1,272✔
2122

2123
    ServerProtocol& get_server_protocol() noexcept
61,232✔
2124
    {
530,642✔
2125
        return m_connection.get_server_protocol();
530,642✔
2126
    }
469,410✔
2127

2128
    bool need_client_file_ident() const noexcept
8,720✔
2129
    {
71,022✔
2130
        return (m_file_ident_request != 0);
71,022✔
2131
    }
62,302✔
2132

2133
    bool must_send_ident_message() const noexcept
6,204✔
2134
    {
47,884✔
2135
        return m_send_ident_message;
47,884✔
2136
    }
41,680✔
2137

2138
    bool ident_message_received() const noexcept
361,056✔
2139
    {
3,140,060✔
2140
        return m_client_file_ident != 0;
3,140,060✔
2141
    }
2,779,004✔
2142

2143
    bool unbind_message_received() const noexcept
366,788✔
2144
    {
3,186,830✔
2145
        return m_unbind_message_received;
3,186,830✔
2146
    }
2,820,042✔
2147

2148
    bool error_occurred() const noexcept
354,434✔
2149
    {
3,084,854✔
2150
        return int(m_error_code) != 0;
3,084,854✔
2151
    }
2,730,420✔
2152

2153
    bool relayed_alloc_request_in_progress() const noexcept
2154
    {
×
2155
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2156
    }
2157

2158
    // Returns the file identifier (always a nonzero value) of the client side
2159
    // file if ident_message_received() returns true. Otherwise it returns zero.
2160
    file_ident_type get_client_file_ident() const noexcept
2161
    {
×
2162
        return m_client_file_ident;
×
2163
    }
2164

2165
    void initiate()
6,940✔
2166
    {
58,102✔
2167
        logger.detail("Session initiated", m_session_ident); // Throws
58,102✔
2168
    }
51,162✔
2169

2170
    void terminate()
5,996✔
2171
    {
49,538✔
2172
        logger.detail("Session terminated", m_session_ident); // Throws
49,538✔
2173
    }
43,542✔
2174

2175
    // Initiate the deactivation process, if it has not been initiated already
2176
    // by the client.
2177
    //
2178
    // IMPORTANT: This function must not be called with protocol versions
2179
    // earlier than 23.
2180
    //
2181
    // The deactivation process will eventually lead to termination of the
2182
    // session.
2183
    //
2184
    // The session will detach itself from the server file when the deactivation
2185
    // process is initiated, regardless of whether it is initiated by the
2186
    // client, or by calling this function.
2187
    void initiate_deactivation(ProtocolError error_code)
78✔
2188
    {
714✔
2189
        REALM_ASSERT(is_session_level_error(error_code));
714✔
2190
        REALM_ASSERT(!error_occurred()); // Must only be called once
672✔
2191

346✔
2192
        // If the UNBIND message has been received, then the client has
346✔
2193
        // initiated the deactivation process already.
388✔
2194
        if (REALM_LIKELY(!unbind_message_received())) {
714✔
2195
            detach_from_server_file();
714✔
2196
            m_error_code = error_code;
672✔
2197
            // Protocol state is now SendError
388✔
2198
            ensure_enlisted_to_send();
714✔
2199
            return;
714✔
2200
        }
672✔
2201
        // Protocol state was SendUnbound, and remains unchanged
388✔
2202
    }
636✔
2203

2204
    bool is_enlisted_to_send() const noexcept
285,996✔
2205
    {
2,486,358✔
2206
        return m_next != nullptr;
2,486,358✔
2207
    }
2,200,362✔
2208

2209
    void ensure_enlisted_to_send() noexcept
55,160✔
2210
    {
483,944✔
2211
        if (!is_enlisted_to_send())
481,616✔
2212
            enlist_to_send();
466,432✔
2213
    }
428,784✔
2214

2215
    void enlist_to_send() noexcept
114,784✔
2216
    {
996,150✔
2217
        m_connection.enlist_to_send(this);
996,150✔
2218
    }
881,366✔
2219

2220
    // Overriding memeber function in FileIdentReceiver
2221
    void receive_file_ident(SaltedFileIdent file_ident) override final
1,258✔
2222
    {
10,884✔
2223
        // Protocol state must be AllocatingIdent or WaitForUnbind
5,808✔
2224
        if (!ident_message_received()) {
11,570✔
2225
            REALM_ASSERT(need_client_file_ident());
11,570✔
2226
            REALM_ASSERT(m_send_ident_message);
11,570✔
2227
        }
10,312✔
2228
        else {
×
2229
            REALM_ASSERT(!m_send_ident_message);
×
2230
        }
1,258✔
2231
        REALM_ASSERT(!unbind_message_received());
11,570✔
2232
        REALM_ASSERT(!error_occurred());
11,570✔
2233
        REALM_ASSERT(!m_error_message_sent);
10,884✔
2234

5,808✔
2235
        m_file_ident_request = 0;
11,570✔
2236
        m_allocated_file_ident = file_ident;
10,884✔
2237

5,122✔
2238
        // If the protocol state was AllocatingIdent, it is now SendIdent,
5,122✔
2239
        // otherwise it continues to be WaitForUnbind.
5,122✔
2240

5,808✔
2241
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
11,570✔
2242
                     file_ident.salt); // Throws
10,884✔
2243

5,808✔
2244
        ensure_enlisted_to_send();
11,570✔
2245
    }
10,312✔
2246

2247
    // Called by the associated connection object when this session is granted
2248
    // an opportunity to initiate the sending of a message.
2249
    //
2250
    // This function may lead to the destruction of the session object
2251
    // (suicide).
2252
    void send_message()
114,580✔
2253
    {
994,514✔
2254
        if (REALM_LIKELY(!unbind_message_received())) {
990,402✔
2255
            if (REALM_LIKELY(!error_occurred())) {
960,558✔
2256
                if (REALM_LIKELY(ident_message_received())) {
904,360✔
2257
                    // State is WaitForUnbind.
543,712✔
2258
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
948,346✔
2259
                    if (REALM_LIKELY(!relayed_alloc)) {
894,046✔
2260
                        // Send DOWNLOAD or MARK.
543,712✔
2261
                        continue_history_scan(); // Throws
894,044✔
2262
                        // Session object may have been
489,410✔
2263
                        // destroyed at this point (suicide)
543,712✔
2264
                        return;
948,346✔
2265
                    }
839,218✔
2266
                    send_alloc_message(); // Throws
2,147,483,649✔
2267
                    return;
2,147,483,649✔
2268
                }
2,147,484,223✔
2269
                // State is SendIdent
5,810✔
2270
                send_ident_message(); // Throws
11,578✔
2271
                return;
11,578✔
2272
            }
10,352✔
2273
            // State is SendError
388✔
2274
            send_error_message(); // Throws
712✔
2275
            return;
712✔
2276
        }
1,852✔
2277
        // State is SendUnbound
14,634✔
2278
        send_unbound_message(); // Throws
33,878✔
2279
        terminate();            // Throws
33,878✔
2280
        m_connection.discard_session(m_session_ident);
30,984✔
2281
        // This session is now destroyed!
14,634✔
2282
    }
29,766✔
2283

2284
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2285
                              bool is_subserver, ProtocolError& error)
6,938✔
2286
    {
58,098✔
2287
        if (logger.would_log(util::Logger::Level::info)) {
51,482✔
2288
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
2,822✔
2289
                          "need_client_file_ident=%3, is_subserver=%4)",
2,822✔
2290
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
2,822✔
2291
                          int(is_subserver)); // Throws
2,822✔
2292
        }
5,284✔
2293

28,674✔
2294
        ServerImpl& server = m_connection.get_server();
58,098✔
2295
        _impl::VirtualPathComponents virt_path_components =
58,098✔
2296
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
53,944✔
2297

28,674✔
2298
        if (!virt_path_components.is_valid) {
51,188✔
2299
            logger.error("Bad virtual path (message_type='bind', path='%1', "
252✔
2300
                         "signed_user_token='%2')",
252✔
2301
                         path,
252✔
2302
                         short_token_fmt(signed_user_token)); // Throws
252✔
2303
            error = ProtocolError::illegal_realm_path;
252✔
2304
            return false;
252✔
2305
        }
2,994✔
2306

24,394✔
2307
        // The user has proper permissions at this stage.
24,394✔
2308

28,534✔
2309
        m_server_file = server.get_or_create_file(path); // Throws
53,706✔
2310

28,534✔
2311
        m_server_file->add_unidentified_session(this); // Throws
53,706✔
2312

28,534✔
2313
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
57,846✔
2314
                    m_connection.get_client_protocol_version(),
57,846✔
2315
                    m_connection.get_client_user_agent()); // Throws
53,706✔
2316

28,534✔
2317
        m_is_subserver = is_subserver;
57,846✔
2318
        if (REALM_LIKELY(!need_client_file_ident)) {
52,672✔
2319
            // Protocol state is now WaitForUnbind
17,642✔
2320
            return true;
37,704✔
2321
        }
33,662✔
2322

10,092✔
2323
        // FIXME: We must make a choice about client file ident for read only
10,092✔
2324
        // sessions. They should have a special read-only client file ident.
10,892✔
2325
        file_ident_type proxy_file = 0; // No proxy
20,142✔
2326
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
20,142✔
2327
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
20,142✔
2328
        m_send_ident_message = true;
19,342✔
2329
        // Protocol state is now AllocatingIdent
10,092✔
2330

10,892✔
2331
        return true;
20,142✔
2332
    }
18,308✔
2333

2334
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2335
                               version_type scan_server_version, version_type scan_client_version,
2336
                               version_type latest_server_version, salt_type latest_server_version_salt,
2337
                               ProtocolError& error)
6,204✔
2338
    {
43,898✔
2339
        // Protocol state must be WaitForIdent
22,682✔
2340
        REALM_ASSERT(!need_client_file_ident());
47,884✔
2341
        REALM_ASSERT(!m_send_ident_message);
47,884✔
2342
        REALM_ASSERT(!ident_message_received());
47,884✔
2343
        REALM_ASSERT(!unbind_message_received());
47,884✔
2344
        REALM_ASSERT(!error_occurred());
47,884✔
2345
        REALM_ASSERT(!m_error_message_sent);
43,898✔
2346

22,682✔
2347
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
47,884✔
2348
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
47,884✔
2349
                     "latest_server_version_salt=%6)",
47,884✔
2350
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
47,884✔
2351
                     latest_server_version, latest_server_version_salt); // Throws
43,898✔
2352

22,682✔
2353
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
47,884✔
2354
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
47,884✔
2355
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
47,884✔
2356
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
47,884✔
2357
        UploadCursor upload_threshold = {0, 0};
47,884✔
2358
        version_type locked_server_version = 0;
47,884✔
2359
        BootstrapError error_2 =
47,884✔
2360
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
47,884✔
2361
                                                    client_type, upload_threshold, locked_server_version,
47,884✔
2362
                                                    logger); // Throws
47,884✔
2363
        switch (error_2) {
47,850✔
2364
            case BootstrapError::no_error:
47,594✔
2365
                break;
41,424✔
2366
            case BootstrapError::client_file_expired:
✔
2367
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2368
                error = ProtocolError::client_file_expired;
×
2369
                return false;
✔
2370
            case BootstrapError::bad_client_file_ident:
✔
2371
                logger.error("Bad client file ident (%1) in IDENT message",
×
2372
                             client_file_ident); // Throws
×
2373
                error = ProtocolError::bad_client_file_ident;
×
2374
                return false;
4✔
2375
            case BootstrapError::bad_client_file_ident_salt:
36✔
2376
                logger.error("Bad client file identifier salt (%1) in IDENT message",
36✔
2377
                             client_file_ident_salt); // Throws
36✔
2378
                error = ProtocolError::diverging_histories;
36✔
2379
                return false;
32✔
2380
            case BootstrapError::bad_download_server_version:
✔
2381
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2382
                error = ProtocolError::bad_server_version;
×
2383
                return false;
4✔
2384
            case BootstrapError::bad_download_client_version:
36✔
2385
                logger.error("Bad download progress client version in IDENT message"); // Throws
36✔
2386
                error = ProtocolError::bad_client_version;
36✔
2387
                return false;
52✔
2388
            case BootstrapError::bad_server_version:
180✔
2389
                logger.error("Bad server version (message_type='ident')"); // Throws
180✔
2390
                error = ProtocolError::bad_server_version;
180✔
2391
                return false;
164✔
2392
            case BootstrapError::bad_server_version_salt:
36✔
2393
                logger.error("Bad server version salt in IDENT message"); // Throws
36✔
2394
                error = ProtocolError::diverging_histories;
36✔
2395
                return false;
32✔
2396
            case BootstrapError::bad_client_type:
✔
2397
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2398
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
2399
                                                              // `bad_client_type`.
2400
                return false;
6,170✔
2401
        }
43,624✔
2402

18,550✔
2403
        // Make sure there is no other session currently associcated with the
18,550✔
2404
        // same client-side file
22,520✔
2405
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
41,424✔
2406
            SyncConnection& other_conn = other_sess->get_connection();
2407
            // It is a protocol violation if the other session is associated
2408
            // with the same connection
×
2409
            if (&other_conn == &m_connection) {
×
2410
                logger.error("Client file already bound in other session associated with "
×
2411
                             "the same connection"); // Throws
×
2412
                error = ProtocolError::bound_in_other_session;
×
2413
                return false;
×
2414
            }
2415
            // When the other session is associated with a different connection
2416
            // (`other_conn`), the clash may be due to the server not yet having
2417
            // realized that the other connection has been closed by the
2418
            // client. If so, the other connention is a "zombie". In the
2419
            // interest of getting rid of zombie connections as fast as
2420
            // possible, we shall assume that a clash with a session in another
2421
            // connection is always due to that other connection being a
2422
            // zombie. And when such a situation is detected, we want to close
2423
            // the zombie connection immediately.
2424
            auto log_level = Logger::Level::detail;
×
2425
            other_conn.terminate(log_level,
×
2426
                                 "Sync connection closed (superseded session)"); // Throws
×
2427
        }
2,200✔
2428

22,520✔
2429
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
43,624✔
2430

22,520✔
2431
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
47,594✔
2432
                                                                  m_session_ident, client_file_ident));
43,624✔
2433

22,520✔
2434
        m_server_file->identify_session(this, client_file_ident); // Throws
43,624✔
2435

22,520✔
2436
        m_client_file_ident = client_file_ident;
47,594✔
2437
        m_download_progress = download_progress;
47,594✔
2438
        m_upload_threshold = upload_threshold;
47,594✔
2439
        m_locked_server_version = locked_server_version;
43,624✔
2440

22,520✔
2441
        ServerImpl& server = m_connection.get_server();
47,594✔
2442
        const Server::Config& config = server.get_config();
47,594✔
2443
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
43,624✔
2444

22,520✔
2445
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
41,424✔
2446
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2447
                                              client_file_ident); // Throws
×
2448
        }
2,200✔
2449

18,550✔
2450
        // Protocol  state is now WaitForUnbind
22,520✔
2451
        enlist_to_send();
47,594✔
2452
        return true;
47,594✔
2453
    }
41,424✔
2454

2455
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2456
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2457
                                ProtocolError& error)
45,732✔
2458
    {
384,246✔
2459
        // Protocol state must be WaitForUnbind
225,336✔
2460
        REALM_ASSERT(!m_send_ident_message);
407,420✔
2461
        REALM_ASSERT(ident_message_received());
407,420✔
2462
        REALM_ASSERT(!unbind_message_received());
407,420✔
2463
        REALM_ASSERT(!error_occurred());
407,420✔
2464
        REALM_ASSERT(!m_error_message_sent);
384,246✔
2465

225,336✔
2466
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
407,420✔
2467
                      "locked_server_version=%3, num_changesets=%4)",
407,420✔
2468
                      progress_client_version, progress_server_version, locked_server_version,
407,420✔
2469
                      upload_changesets.size()); // Throws
384,246✔
2470

202,162✔
2471
        // We are unable to reproduce the cursor object for the upload progress
202,162✔
2472
        // when the protocol version is less than 29, because the client does
202,162✔
2473
        // not provide the required information. When the protocol version is
202,162✔
2474
        // less than 25, we can always get a consistent cursor by taking it from
202,162✔
2475
        // the changeset that was uploaded last, but in protocol versions 25,
202,162✔
2476
        // 26, 27, and 28, things are more complicated. Here, we receive new
202,162✔
2477
        // values for `last_integrated_server_version` which we cannot afford to
202,162✔
2478
        // ignore, but we do not know what client versions they correspond
202,162✔
2479
        // to. Fortunately, we can produce a cursor that works, and is mutually
202,162✔
2480
        // consistent with previous cursors, by simply bumping
202,162✔
2481
        // `upload_progress.client_version` when
202,162✔
2482
        // `upload_progress.last_intgerated_server_version` grows.
202,162✔
2483
        //
202,162✔
2484
        // To see that this scheme works, consider the last changeset, A, that
202,162✔
2485
        // will have already been uploaded and integrated at the beginning of
202,162✔
2486
        // the next session, and the first changeset, B, that follows A in the
202,162✔
2487
        // client side history, and is not upload skippable (of local origin and
202,162✔
2488
        // nonempty). We then need to show that A will be skipped, if uploaded
202,162✔
2489
        // in the next session, but B will not.
202,162✔
2490
        //
202,162✔
2491
        // Let V be the client version produced by A, and let T be the value of
202,162✔
2492
        // `upload_progress.client_version` as determined in this session, which
202,162✔
2493
        // is used as threshold in the next session. Then we know that A is
202,162✔
2494
        // skipped during the next session if V is less than, or equal to T. If
202,162✔
2495
        // the protocol version is at least 29, the protocol requires that T is
202,162✔
2496
        // greater than, or equal to V. If the protocol version is less than 25,
202,162✔
2497
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
202,162✔
2498
        // or 28, we construct T such that it is always greater than, or equal
202,162✔
2499
        // to V, so in all cases, A will be skipped during the next session.
202,162✔
2500
        //
202,162✔
2501
        // Let W be the client version on which B is based. We then know that B
202,162✔
2502
        // will be retained if, and only if W is greater than, or equalto T. If
202,162✔
2503
        // the protocol version is at least 29, we know that T is less than, or
202,162✔
2504
        // equal to W, since B is not integrated until the next session. If the
202,162✔
2505
        // protocol version is less tahn 25, we know that T is V. Since V must
202,162✔
2506
        // be less than, or equal to W, we again know that T is less than, or
202,162✔
2507
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
202,162✔
2508
        // construct T such that it is equal to V + N, where N is the number of
202,162✔
2509
        // observed increments in `last_integrated_server_version` since the
202,162✔
2510
        // client version prodiced by A. For each of these observed increments,
202,162✔
2511
        // there must have been a distinct new client version, but all these
202,162✔
2512
        // client versions must be less than, or equal to W, since B is not
202,162✔
2513
        // integrated until the next session. Therefore, we know that T = V + N
202,162✔
2514
        // is less than, or qual to W. So, in all cases, B will not skipped
202,162✔
2515
        // during the next session.
225,336✔
2516
        int protocol_version = m_connection.get_client_protocol_version();
407,420✔
2517
        static_cast<void>(protocol_version); // No protocol diversion (yet)
384,246✔
2518

225,336✔
2519
        UploadCursor upload_progress;
407,420✔
2520
        upload_progress = {progress_client_version, progress_server_version};
384,246✔
2521

202,162✔
2522
        // `upload_progress.client_version` must be nondecreasing across the
202,162✔
2523
        // session.
225,336✔
2524
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
407,420✔
2525
        if (REALM_UNLIKELY(!good_1)) {
361,688✔
2526
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2527
                         m_upload_progress.client_version); // Throws
×
2528
            error = ProtocolError::bad_client_version;
×
2529
            return false;
×
2530
        }
22,558✔
2531
        // `upload_progress.last_integrated_server_version` must be a version
202,162✔
2532
        // that the client can have heard about.
225,336✔
2533
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
407,420✔
2534
        if (REALM_UNLIKELY(!good_2)) {
361,688✔
2535
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2536
                         upload_progress.last_integrated_server_version,
×
2537
                         m_download_progress.server_version); // Throws
×
2538
            error = ProtocolError::bad_server_version;
×
2539
            return false;
×
2540
        }
22,558✔
2541

202,162✔
2542
        // `upload_progress` must be consistent.
225,336✔
2543
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
361,688✔
2544
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2545
                         upload_progress.last_integrated_server_version); // Throws
×
2546
            error = ProtocolError::bad_server_version;
×
2547
            return false;
×
2548
        }
22,558✔
2549
        // `upload_progress` and `m_upload_threshold` must be mutually
202,162✔
2550
        // consistent.
225,336✔
2551
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
361,688✔
2552
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2553
                         "threshold (%3, %4)",
×
2554
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2555
                         m_upload_threshold.client_version,
×
2556
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2557
            error = ProtocolError::bad_server_version;
×
2558
            return false;
×
2559
        }
22,558✔
2560
        // `upload_progress` and `m_upload_progress` must be mutually
202,162✔
2561
        // consistent.
225,336✔
2562
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
361,688✔
2563
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2564
                         "upload progress (%3, %4)",
×
2565
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2566
                         m_upload_progress.client_version,
×
2567
                         m_upload_progress.last_integrated_server_version); // Throws
×
2568
            error = ProtocolError::bad_server_version;
×
2569
            return false;
×
2570
        }
22,558✔
2571

225,336✔
2572
        version_type locked_server_version_2 = locked_server_version;
384,246✔
2573

202,162✔
2574
        // `locked_server_version_2` must be nondecreasing over the lifetime of
202,162✔
2575
        // the client-side file.
225,336✔
2576
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
361,688✔
2577
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2578
                         m_locked_server_version); // Throws
×
2579
            error = ProtocolError::bad_server_version;
×
2580
            return false;
×
2581
        }
22,558✔
2582
        // `locked_server_version_2` must be a version that the client can have
202,162✔
2583
        // heard about.
225,336✔
2584
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
361,688✔
2585
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2586
                         m_download_progress.server_version); // Throws
×
2587
            error = ProtocolError::bad_server_version;
×
2588
            return false;
×
2589
        }
22,558✔
2590

225,336✔
2591
        std::size_t num_previously_integrated_changesets = 0;
407,420✔
2592
        if (!upload_changesets.empty()) {
384,594✔
2593
            UploadCursor up = m_upload_progress;
222,998✔
2594
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
312,928✔
2595
                // `uc.upload_cursor.client_version` must be increasing across
160,660✔
2596
                // all the changesets in this UPLOAD message, and all must be
160,660✔
2597
                // greater than upload_progress.client_version of previous
160,660✔
2598
                // UPLOAD message.
179,676✔
2599
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
294,818✔
2600
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2601
                                 "changeset (%1 <= %2)",
×
2602
                                 uc.upload_cursor.client_version,
×
2603
                                 up.client_version); // Throws
×
2604
                    error = ProtocolError::bad_client_version;
×
2605
                    return false;
×
2606
                }
18,110✔
2607
                // `uc.upload_progress` must be consistent.
179,676✔
2608
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
294,818✔
2609
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2610
                                 uc.upload_cursor.client_version,
×
2611
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2612
                    error = ProtocolError::bad_server_version;
×
2613
                    return false;
×
2614
                }
18,110✔
2615
                // `uc.upload_progress` must be mutually consistent with
160,660✔
2616
                // previous upload cursor.
179,676✔
2617
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
294,818✔
2618
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2619
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2620
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2621
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2622
                    error = ProtocolError::bad_server_version;
×
2623
                    return false;
×
2624
                }
18,110✔
2625
                // `uc.upload_progress` must be mutually consistent with
160,660✔
2626
                // threshold, that is, for changesets that have not previously
160,660✔
2627
                // been integrated, it is important that the specified value of
160,660✔
2628
                // `last_integrated_server_version` is greater than, or equal to
160,660✔
2629
                // the reciprocal history base version.
179,676✔
2630
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
331,944✔
2631
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
294,818✔
2632
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2633
                                 "inconsistent with threshold (%3, %4)",
×
2634
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2635
                                 m_upload_threshold.client_version,
×
2636
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2637
                    error = ProtocolError::bad_server_version;
×
2638
                    return false;
×
2639
                }
37,126✔
2640
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
331,944✔
2641
                if (previously_integrated)
297,466✔
2642
                    ++num_previously_integrated_changesets;
56,080✔
2643
                up = uc.upload_cursor;
331,944✔
2644
            }
306,790✔
2645
            // `upload_progress.client_version` must be greater than, or equal
108,006✔
2646
            // to client versions produced by each of the changesets in this
108,006✔
2647
            // UPLOAD message.
118,940✔
2648
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
185,872✔
2649
                logger.error("Upload progress less than client version produced by uploaded "
×
2650
                             "changeset (%1 > %2)",
×
2651
                             up.client_version,
×
2652
                             upload_progress.client_version); // Throws
×
2653
                error = ProtocolError::bad_client_version;
×
2654
                return false;
×
2655
            }
11,972✔
2656
            // The upload cursor of last uploaded changeset must be mutually
108,006✔
2657
            // consistent with the reported upload progress.
118,940✔
2658
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
185,872✔
2659
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2660
                             "inconsistent with upload progress (%3, %4)",
×
2661
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2662
                             upload_progress.last_integrated_server_version); // Throws
×
2663
                error = ProtocolError::bad_server_version;
×
2664
                return false;
×
2665
            }
45,732✔
2666
        }
384,246✔
2667

202,162✔
2668
        // FIXME: Part of a very poor man's substitute for a proper backpressure
202,162✔
2669
        // scheme.
225,336✔
2670
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
361,688✔
2671
            logger.debug("Terminating uploading session because buffer is full"); // Throws
2672
            // Using this exact error code, because it causes `try_again` flag
2673
            // to be set to true, which causes the client to wait for about 5
2674
            // minuites before trying to connect again.
2675
            error = ProtocolError::connection_closed;
×
2676
            return false;
×
2677
        }
22,558✔
2678

225,336✔
2679
        m_upload_progress = upload_progress;
384,246✔
2680

225,336✔
2681
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
407,420✔
2682
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
384,246✔
2683

225,336✔
2684
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
407,420✔
2685
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
384,246✔
2686

225,336✔
2687
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
407,420✔
2688
        if (!have_anything_to_do)
361,932✔
2689
            return true;
24,066✔
2690

224,168✔
2691
        if (!have_real_upload_progress)
360,018✔
2692
            upload_progress = m_upload_threshold;
22,396✔
2693

224,168✔
2694
        if (num_previously_integrated_changesets > 0) {
360,748✔
2695
            logger.detail("Ignoring %1 previously integrated changesets",
6,474✔
2696
                          num_previously_integrated_changesets); // Throws
6,474✔
2697
        }
51,232✔
2698
        if (num_changesets_to_integrate > 0) {
382,582✔
2699
            logger.detail("Initiate integration of %1 remote changesets",
206,002✔
2700
                          num_changesets_to_integrate); // Throws
206,002✔
2701
        }
205,834✔
2702

224,168✔
2703
        REALM_ASSERT(m_server_file);
405,506✔
2704
        ServerFile& file = *m_server_file;
405,506✔
2705
        std::size_t offset = num_previously_integrated_changesets;
405,506✔
2706
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
405,506✔
2707
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
382,414✔
2708

224,168✔
2709
        m_locked_server_version = locked_server_version_2;
405,506✔
2710
        return true;
405,506✔
2711
    }
360,018✔
2712

2713
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
14,130✔
2714
    {
112,062✔
2715
        // Protocol state must be WaitForUnbind
62,076✔
2716
        REALM_ASSERT(!m_send_ident_message);
120,186✔
2717
        REALM_ASSERT(ident_message_received());
120,186✔
2718
        REALM_ASSERT(!unbind_message_received());
120,186✔
2719
        REALM_ASSERT(!error_occurred());
120,186✔
2720
        REALM_ASSERT(!m_error_message_sent);
112,062✔
2721

62,076✔
2722
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
112,062✔
2723

62,076✔
2724
        m_download_completion_request = request_ident;
112,062✔
2725

62,076✔
2726
        ensure_enlisted_to_send();
120,186✔
2727
        return true;
120,186✔
2728
    }
106,056✔
2729

2730
    // Returns true if the deactivation process has been completed, at which
2731
    // point the caller (SyncConnection::receive_unbind_message()) should
2732
    // terminate the session.
2733
    //
2734
    // CAUTION: This function may commit suicide!
2735
    void receive_unbind_message()
4,148✔
2736
    {
31,298✔
2737
        // Protocol state may be anything but SendUnbound
14,822✔
2738
        REALM_ASSERT(!m_unbind_message_received);
31,298✔
2739

14,822✔
2740
        logger.detail("Received: UNBIND"); // Throws
31,298✔
2741

14,822✔
2742
        detach_from_server_file();
34,214✔
2743
        m_unbind_message_received = true;
31,298✔
2744

11,906✔
2745
        // Detect completion of the deactivation process
14,822✔
2746
        if (m_error_message_sent) {
30,080✔
2747
            // Deactivation process completed
140✔
2748
            terminate(); // Throws
248✔
2749
            m_connection.discard_session(m_session_ident);
234✔
2750
            // This session is now destroyed!
140✔
2751
            return;
248✔
2752
        }
1,438✔
2753

11,780✔
2754
        // Protocol state is now SendUnbound
14,682✔
2755
        ensure_enlisted_to_send();
33,966✔
2756
    }
29,846✔
2757

2758
    void receive_error_message(session_ident_type, int, std::string_view)
2759
    {
×
2760
        REALM_ASSERT(!m_unbind_message_received);
×
2761

2762
        logger.detail("Received: ERROR"); // Throws
×
2763
    }
2764

2765
private:
2766
    SyncConnection& m_connection;
2767

2768
    const session_ident_type m_session_ident;
2769

2770
    // Not null if, and only if this session is in
2771
    // m_connection.m_sessions_enlisted_to_send.
2772
    Session* m_next = nullptr;
2773

2774
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2775
    // reset to null when the deactivation process is initiated, either when the
2776
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2777
    util::bind_ptr<ServerFile> m_server_file;
2778

2779
    bool m_disable_download = false;
2780
    bool m_is_subserver = false;
2781

2782
    using file_ident_request_type = ServerFile::file_ident_request_type;
2783

2784
    // When nonzero, this session has an outstanding request for a client file
2785
    // identifier.
2786
    file_ident_request_type m_file_ident_request = 0;
2787

2788
    // Payload for next outgoing ALLOC message.
2789
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2790

2791
    // Zero until the session receives an IDENT message from the client.
2792
    file_ident_type m_client_file_ident = 0;
2793

2794
    // Zero until initiate_deactivation() is called.
2795
    ProtocolError m_error_code = {};
2796

2797
    // The current point of progression of the download process. Set to (<server
2798
    // version>, <client version>) of the IDENT message when the IDENT message
2799
    // is received. At the time of return from continue_history_scan(), it
2800
    // points to the latest server version such that all preceding changesets in
2801
    // the server-side history have been downloaded, are currently being
2802
    // downloaded, or are *download excluded*.
2803
    DownloadCursor m_download_progress = {0, 0};
2804

2805
    request_ident_type m_download_completion_request = 0;
2806

2807
    // Records the progress of the upload process. Used to check that the client
2808
    // uploads changesets in order. Also, when m_upload_progress >
2809
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2810
    // version of the upload progress.
2811
    UploadCursor m_upload_progress = {0, 0};
2812

2813
    // Initialized on reception of the IDENT message. Specifies the actual
2814
    // upload progress (as recorded on the server-side) at the beginning of the
2815
    // session, and it remains fixed throughout the session.
2816
    //
2817
    // m_upload_threshold includes the progress resulting from the received
2818
    // changesets that have not yet been integrated (only relevant for
2819
    // synchronous backup).
2820
    UploadCursor m_upload_threshold = {0, 0};
2821

2822
    // Works partially as a cache of the persisted value, and partially as a way
2823
    // of checking that the client respects that it can never decrease.
2824
    version_type m_locked_server_version = 0;
2825

2826
    bool m_send_ident_message = false;
2827
    bool m_unbind_message_received = false;
2828
    bool m_error_message_sent = false;
2829

2830
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2831
    /// has been sent in the current session. The variable is used to ensure
2832
    /// that a DOWNLOAD message is always sent in a session. The received
2833
    /// DOWNLOAD message is needed by the client to ensure that its current
2834
    /// download progress is up to date.
2835
    bool m_one_download_message_sent = false;
2836

2837
    static std::string make_logger_prefix(session_ident_type session_ident)
6,940✔
2838
    {
58,100✔
2839
        std::ostringstream out;
58,100✔
2840
        out.imbue(std::locale::classic());
58,100✔
2841
        out << "Session[" << session_ident << "]: "; // Throws
58,100✔
2842
        return out.str();                            // Throws
58,100✔
2843
    }
51,160✔
2844

2845
    // Scan the history for changesets to be downloaded.
2846
    // If the history is longer than the end point of the previous scan,
2847
    // a DOWNLOAD message will be sent.
2848
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2849
    // requested to be notified about download completion.
2850
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2851
    //
2852
    // This function may lead to the destruction of the session object
2853
    // (suicide).
2854
    void continue_history_scan()
109,128✔
2855
    {
894,056✔
2856
        // Protocol state must be WaitForUnbind
543,710✔
2857
        REALM_ASSERT(!m_send_ident_message);
948,358✔
2858
        REALM_ASSERT(ident_message_received());
948,358✔
2859
        REALM_ASSERT(!unbind_message_received());
948,358✔
2860
        REALM_ASSERT(!error_occurred());
948,358✔
2861
        REALM_ASSERT(!m_error_message_sent);
948,358✔
2862
        REALM_ASSERT(!is_enlisted_to_send());
894,056✔
2863

543,710✔
2864
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
948,358✔
2865
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
894,056✔
2866

543,710✔
2867
        ServerImpl& server = m_connection.get_server();
948,358✔
2868
        const Server::Config& config = server.get_config();
948,358✔
2869
        if (REALM_UNLIKELY(m_disable_download))
894,056✔
2870
            return;
489,408✔
2871

543,710✔
2872
        bool have_more_to_scan =
948,358✔
2873
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
948,358✔
2874
        if (have_more_to_scan) {
880,886✔
2875
            m_server_file->register_client_access(m_client_file_ident);     // Throws
364,324✔
2876
            const ServerHistory& history = m_server_file->access().history; // Throws
364,324✔
2877
            const char* body;
364,324✔
2878
            std::size_t uncompressed_body_size;
364,324✔
2879
            std::size_t compressed_body_size = 0;
364,324✔
2880
            bool body_is_compressed = false;
364,324✔
2881
            version_type end_version = last_server_version.version;
364,324✔
2882
            DownloadCursor download_progress;
364,324✔
2883
            UploadCursor upload_progress = {0, 0};
364,324✔
2884
            std::uint_fast64_t downloadable_bytes = 0;
364,324✔
2885
            std::size_t num_changesets;
364,324✔
2886
            std::size_t accum_original_size;
364,324✔
2887
            std::size_t accum_compacted_size;
364,324✔
2888
            ServerProtocol& protocol = get_server_protocol();
364,324✔
2889
            bool disable_download_compaction = config.disable_download_compaction;
364,324!
2890
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
344,412!
2891
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
213,604!
2892
            DownloadCache& cache = m_server_file->get_download_cache();
364,324!
2893
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
364,324!
2894
            if (fetch_from_cache) {
322,668✔
2895
                body = cache.body.get();
×
2896
                uncompressed_body_size = cache.uncompressed_body_size;
×
2897
                compressed_body_size = cache.compressed_body_size;
×
2898
                body_is_compressed = cache.body_is_compressed;
×
2899
                download_progress = cache.download_progress;
×
2900
                downloadable_bytes = cache.downloadable_bytes;
×
2901
                num_changesets = cache.num_changesets;
×
2902
                accum_original_size = cache.accum_original_size;
×
2903
                accum_compacted_size = cache.accum_compacted_size;
×
2904
            }
41,656✔
2905
            else {
344,412✔
2906
                // Discard the old cached DOWNLOAD body before generating a new
193,692✔
2907
                // one to be cached. This can make a big difference because the
193,692✔
2908
                // size of that body can be very large (10GiB has been seen in a
193,692✔
2909
                // real-world case).
213,604✔
2910
                if (enable_cache)
322,668✔
2911
                    cache.body = {};
21,744✔
2912

213,604✔
2913
                OutputBuffer& out = server.get_misc_buffers().download_message;
364,324✔
2914
                out.reset();
364,324✔
2915
                download_progress = m_download_progress;
364,326✔
2916
                auto fetch_and_compress = [&](std::size_t max_download_size) {
364,342✔
2917
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
364,338✔
2918
                    std::uint_fast64_t cumulative_byte_size_current;
364,338✔
2919
                    std::uint_fast64_t cumulative_byte_size_total;
364,338✔
2920
                    bool not_expired = history.fetch_download_info(
364,338✔
2921
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
364,338✔
2922
                        cumulative_byte_size_current, cumulative_byte_size_total, disable_download_compaction,
364,338✔
2923
                        max_download_size); // Throws
364,338✔
2924
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
364,338✔
2925
                    SyncConnection& conn = get_connection();
364,338✔
2926
                    if (REALM_UNLIKELY(!not_expired)) {
322,680✔
2927
                        logger.debug("History scanning failed: Client file entry "
×
2928
                                     "expired during session"); // Throws
×
2929
                        conn.protocol_error(ProtocolError::client_file_expired, this);
2930
                        // Session object may have been destroyed at this point
2931
                        // (suicide).
2932
                        return false;
×
2933
                    }
21,746✔
2934

213,606✔
2935
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
364,338✔
2936
                    uncompressed_body_size = out.size();
364,338✔
2937
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
364,338✔
2938
                    body = uncompressed.data();
364,338✔
2939
                    std::size_t max_uncompressed = 1024;
364,338✔
2940
                    if (uncompressed.size() > max_uncompressed) {
326,900✔
2941
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
38,008✔
2942
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
38,008✔
2943
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
38,008✔
2944
                        if (buffer.size() < uncompressed.size()) {
38,008✔
2945
                            body = buffer.data();
38,008✔
2946
                            compressed_body_size = buffer.size();
38,008✔
2947
                            body_is_compressed = true;
38,008✔
2948
                        }
38,008✔
2949
                    }
75,446✔
2950
                    num_changesets = handler.num_changesets;
364,338✔
2951
                    accum_original_size = handler.accum_original_size;
364,338✔
2952
                    accum_compacted_size = handler.accum_compacted_size;
364,338✔
2953
                    return true;
364,338✔
2954
                };
364,336✔
2955
                if (enable_cache) {
322,668✔
2956
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2957
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2958
                        // Session object may have been destroyed at this point
2959
                        // (suicide).
2960
                        return;
×
2961
                    }
×
2962
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2963
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2964
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2965
                    std::copy(body, body + body_size, cache.body.get());
×
2966
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2967
                    cache.compressed_body_size = compressed_body_size;
×
2968
                    cache.body_is_compressed = body_is_compressed;
×
2969
                    cache.end_version = end_version;
×
2970
                    cache.download_progress = download_progress;
×
2971
                    cache.downloadable_bytes = downloadable_bytes;
×
2972
                    cache.num_changesets = num_changesets;
×
2973
                    cache.accum_original_size = accum_original_size;
×
2974
                    cache.accum_compacted_size = accum_compacted_size;
×
2975
                }
41,656✔
2976
                else {
364,324✔
2977
                    std::size_t max_download_size = config.max_download_size;
364,324✔
2978
                    if (!fetch_and_compress(max_download_size)) { // Throws
322,668✔
2979
                        // Session object may have been destroyed at this point
2980
                        // (suicide).
2981
                        return;
×
2982
                    }
41,656✔
2983
                }
364,324✔
2984
            }
344,412✔
2985

213,604✔
2986
            OutputBuffer& out = m_connection.get_output_buffer();
364,324✔
2987
            protocol.make_download_message(
364,324✔
2988
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
364,324✔
2989
                download_progress.last_integrated_client_version, last_server_version.version,
364,324✔
2990
                last_server_version.salt, upload_progress.client_version,
364,324✔
2991
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
364,324✔
2992
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
344,412✔
2993

213,608✔
2994
            if (!disable_download_compaction) {
364,348✔
2995
                std::size_t saved = accum_original_size - accum_compacted_size;
355,008✔
2996
                double saved_2 = (accum_original_size == 0 ? 0 : std::round(saved * 100.0 / accum_original_size));
292,416✔
2997
                logger.detail("Download compaction: Saved %1 bytes (%2%%)", saved, saved_2); // Throws
364,342✔
2998
            }
344,426✔
2999

213,604✔
3000
            m_download_progress = download_progress;
364,324✔
3001
            logger.debug("Setting of m_download_progress.server_version = %1",
364,324✔
3002
                         m_download_progress.server_version); // Throws
364,324✔
3003
            send_download_message();
364,324✔
3004
            m_one_download_message_sent = true;
344,412✔
3005

213,604✔
3006
            enlist_to_send();
364,324✔
3007
        }
390,140✔
3008
        else if (m_download_completion_request) {
522,566✔
3009
            // Send a MARK message
62,038✔
3010
            request_ident_type request_ident = m_download_completion_request;
120,132✔
3011
            send_mark_message(request_ident);  // Throws
120,132✔
3012
            m_download_completion_request = 0; // Request handled
120,132✔
3013
            enlist_to_send();
120,132✔
3014
        }
215,136✔
3015
    }
839,230✔
3016

3017
    void send_ident_message()
1,258✔
3018
    {
10,884✔
3019
        // Protocol state must be SendIdent
5,808✔
3020
        REALM_ASSERT(!need_client_file_ident());
11,570✔
3021
        REALM_ASSERT(m_send_ident_message);
11,570✔
3022
        REALM_ASSERT(!ident_message_received());
11,570✔
3023
        REALM_ASSERT(!unbind_message_received());
11,570✔
3024
        REALM_ASSERT(!error_occurred());
11,570✔
3025
        REALM_ASSERT(!m_error_message_sent);
10,884✔
3026

5,808✔
3027
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
10,884✔
3028

5,808✔
3029
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
11,570✔
3030
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
10,884✔
3031

5,808✔
3032
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
11,570✔
3033
                     client_file_ident_salt); // Throws
10,884✔
3034

5,808✔
3035
        ServerProtocol& protocol = get_server_protocol();
11,570✔
3036
        OutputBuffer& out = m_connection.get_output_buffer();
11,570✔
3037
        int protocol_version = m_connection.get_client_protocol_version();
11,570✔
3038
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
11,570✔
3039
                                    client_file_ident_salt); // Throws
11,570✔
3040
        m_connection.initiate_write_output_buffer();         // Throws
10,884✔
3041

5,808✔
3042
        m_allocated_file_ident.ident = 0; // Consumed
11,570✔
3043
        m_send_ident_message = false;
10,884✔
3044
        // Protocol state is now WaitForStateRequest or WaitForIdent
5,808✔
3045
    }
10,312✔
3046

3047
    void send_download_message()
41,660✔
3048
    {
364,344✔
3049
        m_connection.initiate_write_output_buffer(); // Throws
364,344✔
3050
    }
322,684✔
3051

3052
    void send_mark_message(request_ident_type request_ident)
14,124✔
3053
    {
120,132✔
3054
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
112,012✔
3055

62,038✔
3056
        ServerProtocol& protocol = get_server_protocol();
120,132✔
3057
        OutputBuffer& out = m_connection.get_output_buffer();
120,132✔
3058
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
120,132✔
3059
        m_connection.initiate_write_output_buffer();                     // Throws
120,132✔
3060
    }
106,008✔
3061

3062
    void send_alloc_message()
3063
    {
3064
        // Protocol state must be WaitForUnbind
×
3065
        REALM_ASSERT(!m_send_ident_message);
×
3066
        REALM_ASSERT(ident_message_received());
×
3067
        REALM_ASSERT(!unbind_message_received());
×
3068
        REALM_ASSERT(!error_occurred());
×
3069
        REALM_ASSERT(!m_error_message_sent);
×
3070

×
3071
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3072

3073
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3074
        REALM_ASSERT(false);
3075

3076
        file_ident_type file_ident = m_allocated_file_ident.ident;
3077

3078
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
3079

3080
        ServerProtocol& protocol = get_server_protocol();
×
3081
        OutputBuffer& out = m_connection.get_output_buffer();
×
3082
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3083
        m_connection.initiate_write_output_buffer();                   // Throws
3084

3085
        m_allocated_file_ident.ident = 0; // Consumed
3086

3087
        // Other messages may be waiting to be sent.
3088
        enlist_to_send();
×
3089
    }
3090

3091
    void send_unbound_message()
4,112✔
3092
    {
30,986✔
3093
        // Protocol state must be SendUnbound
14,638✔
3094
        REALM_ASSERT(unbind_message_received());
33,882✔
3095
        REALM_ASSERT(!m_error_message_sent);
30,986✔
3096

14,638✔
3097
        logger.debug("Sending: UNBOUND"); // Throws
30,986✔
3098

14,638✔
3099
        ServerProtocol& protocol = get_server_protocol();
33,882✔
3100
        OutputBuffer& out = m_connection.get_output_buffer();
33,882✔
3101
        protocol.make_unbound_message(out, m_session_ident); // Throws
33,882✔
3102
        m_connection.initiate_write_output_buffer();         // Throws
33,882✔
3103
    }
29,770✔
3104

3105
    void send_error_message()
78✔
3106
    {
666✔
3107
        // Protocol state must be SendError
386✔
3108
        REALM_ASSERT(!unbind_message_received());
708✔
3109
        REALM_ASSERT(error_occurred());
708✔
3110
        REALM_ASSERT(!m_error_message_sent);
666✔
3111

386✔
3112
        REALM_ASSERT(is_session_level_error(m_error_code));
666✔
3113

386✔
3114
        ProtocolError error_code = m_error_code;
708✔
3115
        const char* message = get_protocol_error_message(int(error_code));
708✔
3116
        std::size_t message_size = std::strlen(message);
708✔
3117
        bool try_again = determine_try_again(error_code);
666✔
3118

386✔
3119
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
708✔
3120
                      try_again); // Throws
666✔
3121

386✔
3122
        ServerProtocol& protocol = get_server_protocol();
708✔
3123
        OutputBuffer& out = m_connection.get_output_buffer();
708✔
3124
        int protocol_version = m_connection.get_client_protocol_version();
708✔
3125
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
708✔
3126
                                    m_session_ident); // Throws
708✔
3127
        m_connection.initiate_write_output_buffer();  // Throws
666✔
3128

386✔
3129
        m_error_message_sent = true;
666✔
3130
        // Protocol state is now WaitForUnbindErr
386✔
3131
    }
630✔
3132

3133
    void send_log_message(util::Logger::Level level, const std::string&& message)
6,172✔
3134
    {
47,596✔
3135
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
41,424✔
3136
            return logger.log(level, message.c_str());
×
3137
        }
2,202✔
3138

22,522✔
3139
        m_connection.send_log_message(level, std::move(message), m_session_ident);
47,596✔
3140
    }
41,424✔
3141

3142
    // Idempotent
3143
    void detach_from_server_file() noexcept
11,168✔
3144
    {
93,036✔
3145
        if (!m_server_file)
86,124✔
3146
            return;
37,844✔
3147
        ServerFile& file = *m_server_file;
57,848✔
3148
        if (ident_message_received()) {
57,108✔
3149
            file.remove_identified_session(m_client_file_ident);
47,596✔
3150
        }
42,164✔
3151
        else {
10,252✔
3152
            file.remove_unidentified_session(this);
10,252✔
3153
        }
16,424✔
3154
        if (m_file_ident_request != 0)
51,514✔
3155
            file.cancel_file_ident_request(m_file_ident_request);
14,912✔
3156
        m_server_file.reset();
57,848✔
3157
    }
50,936✔
3158

3159
    friend class SessionQueue;
3160
};
3161

3162

3163
// ============================ SessionQueue implementation ============================
3164

3165
void SessionQueue::push_back(Session* sess) noexcept
114,778✔
3166
{
996,102✔
3167
    REALM_ASSERT(!sess->m_next);
996,102✔
3168
    if (m_back) {
919,690✔
3169
        sess->m_next = m_back->m_next;
341,664✔
3170
        m_back->m_next = sess;
341,664✔
3171
    }
379,710✔
3172
    else {
654,438✔
3173
        sess->m_next = sess;
654,438✔
3174
    }
692,804✔
3175
    m_back = sess;
996,102✔
3176
}
881,324✔
3177

3178

3179
Session* SessionQueue::pop_front() noexcept
167,536✔
3180
{
1,448,224✔
3181
    Session* sess = nullptr;
1,448,224✔
3182
    if (m_back) {
1,395,268✔
3183
        sess = m_back->m_next;
994,502✔
3184
        if (sess != m_back) {
918,216✔
3185
            m_back->m_next = sess->m_next;
341,030✔
3186
        }
379,022✔
3187
        else {
653,472✔
3188
            m_back = nullptr;
653,472✔
3189
        }
691,766✔
3190
        sess->m_next = nullptr;
994,502✔
3191
    }
1,047,458✔
3192
    return sess;
1,448,224✔
3193
}
1,280,688✔
3194

3195

3196
void SessionQueue::clear() noexcept
3,244✔
3197
{
27,570✔
3198
    if (m_back) {
24,444✔
3199
        Session* sess = m_back;
1,034✔
3200
        for (;;) {
1,588✔
3201
            Session* next = sess->m_next;
1,588✔
3202
            sess->m_next = nullptr;
1,588✔
3203
            if (next == m_back)
1,512✔
3204
                break;
916✔
3205
            sess = next;
630✔
3206
        }
672✔
3207
        m_back = nullptr;
958✔
3208
    }
4,084✔
3209
}
24,326✔
3210

3211

3212
// ============================ ServerFile implementation ============================
3213

3214
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3215
                       std::string real_path, bool disable_sync_to_disk)
3216
    : logger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.logger_ptr}               // Throws
3217
    , wlogger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.get_worker().logger_ptr} // Throws
3218
    , m_server{server}
3219
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
3220
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
1,092✔
3221
{
9,760✔
3222
}
8,668✔
3223

3224

3225
ServerFile::~ServerFile() noexcept
1,092✔
3226
{
9,760✔
3227
    REALM_ASSERT(m_unidentified_sessions.empty());
9,760✔
3228
    REALM_ASSERT(m_identified_sessions.empty());
9,760✔
3229
    REALM_ASSERT(m_file_ident_request == 0);
9,760✔
3230
}
8,668✔
3231

3232

3233
void ServerFile::initialize()
1,092✔
3234
{
9,760✔
3235
    const ServerHistory& history = access().history; // Throws
9,760✔
3236
    file_ident_type partial_file_ident = 0;
9,760✔
3237
    version_type partial_progress_reference_version = 0;
9,760✔
3238
    bool has_upstream_sync_status;
9,760✔
3239
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
9,760✔
3240
                       partial_progress_reference_version); // Throws
9,760✔
3241
    REALM_ASSERT(!has_upstream_sync_status);
9,760✔
3242
    REALM_ASSERT(partial_file_ident == 0);
9,760✔
3243
}
8,668✔
3244

3245

1,090✔
3246
void ServerFile::activate() {}
8,668✔
3247

3248

3249
// This function must be called only after a completed invocation of
3250
// initialize(). Both functinos must only ever be called by the network event
3251
// loop thread.
93,312✔
3252
void ServerFile::register_client_access(file_ident_type) {}
724,082✔
3253

3254

3255
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file, ClientType client_type)
3256
    -> file_ident_request_type
1,836✔
3257
{
20,148✔
3258
    auto request = ++m_last_file_ident_request;
20,148✔
3259
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
19,348✔
3260

10,898✔
3261
    on_work_added(); // Throws
20,148✔
3262
    return request;
20,148✔
3263
}
18,312✔
3264

3265

3266
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
578✔
3267
{
8,578✔
3268
    auto i = m_file_ident_requests.find(request);
8,578✔
3269
    REALM_ASSERT(i != m_file_ident_requests.end());
8,578✔
3270
    FileIdentRequestInfo& info = i->second;
8,578✔
3271
    REALM_ASSERT(info.receiver);
8,578✔
3272
    info.receiver = nullptr;
8,578✔
3273
}
8,000✔
3274

3275

3276
void ServerFile::add_unidentified_session(Session* sess)
6,912✔
3277
{
57,848✔
3278
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
57,848✔
3279
    m_unidentified_sessions.insert(sess); // Throws
57,848✔
3280
}
50,936✔
3281

3282

3283
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
6,172✔
3284
{
47,594✔
3285
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
47,594✔
3286
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
43,624✔
3287

22,520✔
3288
    m_identified_sessions[client_file_ident] = sess; // Throws
47,594✔
3289
    m_unidentified_sessions.erase(sess);
47,594✔
3290
}
41,422✔
3291

3292

3293
void ServerFile::remove_unidentified_session(Session* sess) noexcept
740✔
3294
{
10,256✔
3295
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
10,256✔
3296
    m_unidentified_sessions.erase(sess);
10,256✔
3297
}
9,516✔
3298

3299

3300
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
6,172✔
3301
{
47,594✔
3302
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
47,594✔
3303
    m_identified_sessions.erase(client_file_ident);
47,594✔
3304
}
41,422✔
3305

3306

3307
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
6,172✔
3308
{
47,596✔
3309
    auto i = m_identified_sessions.find(client_file_ident);
47,596✔
3310
    if (i == m_identified_sessions.end())
47,596✔
3311
        return nullptr;
41,424✔
3312
    return i->second;
×
3313
}
3314

3315
bool ServerFile::can_add_changesets_from_downstream() const noexcept
45,732✔
3316
{
407,412✔
3317
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
407,412✔
3318
}
361,680✔
3319

3320

3321
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3322
                                                version_type locked_server_version, const UploadChangeset* changesets,
3323
                                                std::size_t num_changesets)
45,486✔
3324
{
405,498✔
3325
    register_client_access(client_file_ident); // Throws
382,408✔
3326

224,156✔
3327
    bool dirty = false;
382,408✔
3328

224,156✔
3329
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
405,498✔
3330
    std::size_t num_bytes = 0;
439,976✔
3331
    for (std::size_t i = 0; i < num_changesets; ++i) {
670,344✔
3332
        const UploadChangeset& uc = changesets[i];
310,332✔
3333
        auto& changesets = list.changesets;
310,332✔
3334
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
310,332✔
3335
                                uc.changeset); // Throws
310,332✔
3336
        num_bytes += uc.changeset.size();
310,332✔
3337
        dirty = true;
310,332✔
3338
    }
298,250✔
3339

224,156✔
3340
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
405,498✔
3341
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
405,500✔
3342
    if (upload_progress.client_version > list.upload_progress.client_version) {
405,520✔
3343
        list.upload_progress = upload_progress;
405,514✔
3344
        dirty = true;
405,514✔
3345
    }
382,424✔
3346

224,156✔
3347
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
405,498✔
3348
    if (locked_server_version > list.locked_server_version) {
398,806✔
3349
        list.locked_server_version = locked_server_version;
346,878✔
3350
        dirty = true;
346,878✔
3351
    }
330,480✔
3352

224,158✔
3353
    if (REALM_LIKELY(dirty)) {
405,522✔
3354
        if (num_changesets > 0) {
382,592✔
3355
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
205,998✔
3356
        }
206,360✔
3357
        else {
199,518✔
3358
            on_work_added(); // Throws
199,518✔
3359
        }
222,080✔
3360
    }
405,516✔
3361
}
360,012✔
3362

3363

3364
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3365
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3366
                                                    ClientType client_type, UploadCursor& upload_progress,
3367
                                                    version_type& locked_server_version, Logger& logger)
6,202✔
3368
{
43,890✔
3369
    // The Realm file may contain a later snapshot than the one reflected by
18,688✔
3370
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
22,674✔
3371
    if (server_version.version > m_version_info.sync_version.version)
41,694✔
3372
        return BootstrapError::bad_server_version;
2,366✔
3373

22,574✔
3374
    const ServerHistory& hist = access().history; // Throws
47,696✔
3375
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
47,696✔
3376
                                                         client_type, upload_progress, locked_server_version,
47,696✔
3377
                                                         logger); // Throws
43,720✔
3378

18,598✔
3379
    // FIXME: Rather than taking previously buffered changesets from the same
18,598✔
3380
    // client file into account when determining the upload progress, and then
18,598✔
3381
    // allowing for an error during the integration of those changesets to be
18,598✔
3382
    // reported to, and terminate the new session, consider to instead postpone
18,598✔
3383
    // the bootstrapping of the new session until all previously buffered
18,598✔
3384
    // changesets from same client file have been fully processed.
18,598✔
3385

22,574✔
3386
    if (error == BootstrapError::no_error) {
47,686✔
3387
        register_client_access(client_file_ident.ident); // Throws
43,624✔
3388

18,550✔
3389
        // If upload, or releaseing of server versions progressed further during
18,550✔
3390
        // previous sessions than the persisted points, take that into account
22,520✔
3391
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
47,594✔
3392
        if (i != m_work.changesets_from_downstream.end()) {
44,826✔
3393
            const IntegratableChangesetList& list = i->second;
23,126✔
3394
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
23,126✔
3395
            upload_progress = list.upload_progress;
23,126✔
3396
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
23,126✔
3397
            locked_server_version = list.locked_server_version;
23,126✔
3398
        }
25,894✔
3399
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
47,594✔
3400
        if (j != m_changesets_from_downstream.end()) {
41,638✔
3401
            const IntegratableChangesetList& list = j->second;
1,432✔
3402
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,432✔
3403
            upload_progress = list.upload_progress;
1,432✔
3404
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,432✔
3405
            locked_server_version = list.locked_server_version;
1,432✔
3406
        }
7,388✔
3407
    }
43,628✔
3408

22,574✔
3409
    return error;
47,696✔
3410
}
41,514✔
3411

3412
// NOTE: This function is executed by the worker thread
3413
void ServerFile::worker_process_work_unit(WorkerState& state)
39,180✔
3414
{
345,934✔
3415
    SteadyTimePoint start_time = steady_clock_now();
345,934✔
3416
    milliseconds_type parallel_time = 0;
326,010✔
3417

192,264✔
3418
    Work& work = m_work;
345,934✔
3419
    wlogger.debug("Work unit execution started"); // Throws
326,010✔
3420

192,266✔
3421
    if (work.has_primary_work) {
345,950✔
3422
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
326,788✔
3423
            worker_allocate_file_identifiers(); // Throws
180,542✔
3424

192,266✔
3425
        if (!m_work.changesets_from_downstream.empty())
344,614✔
3426
            worker_integrate_changes_from_downstream(state); // Throws
333,200✔
3427
    }
326,016✔
3428

192,264✔
3429
    wlogger.debug("Work unit execution completed"); // Throws
326,010✔
3430

192,264✔
3431
    milliseconds_type time = steady_duration(start_time);
345,934✔
3432
    milliseconds_type seq_time = time - parallel_time;
345,934✔
3433
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
345,934✔
3434
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
326,010✔
3435

172,340✔
3436
    // Pass control back to the network event loop thread
192,264✔
3437
    network::Service& service = m_server.get_service();
345,782✔
3438
    service.post([this](Status) {
324,690✔
3439
        // FIXME: The safety of capturing `this` here, relies on the fact
170,174✔
3440
        // that ServerFile objects currently are not destroyed until the
170,174✔
3441
        // server object is destroyed.
189,946✔
3442
        group_postprocess_stage_1(); // Throws
322,768✔
3443
        // Suicide may have happened at this point
189,946✔
3444
    }); // Throws
342,936✔
3445
}
306,754✔
3446

3447

3448
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
22,562✔
3449
{
205,996✔
3450
    m_num_changesets_from_downstream += num_changesets;
195,200✔
3451

117,248✔
3452
    if (num_bytes > 0) {
205,998✔
3453
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
205,996✔
3454
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
205,996✔
3455
    }
195,200✔
3456

117,248✔
3457
    on_work_added(); // Throws
205,996✔
3458
}
183,434✔
3459

3460

3461
void ServerFile::on_work_added()
47,326✔
3462
{
425,660✔
3463
    if (m_has_blocked_work)
386,404✔
3464
        return;
109,892✔
3465
    m_has_blocked_work = true;
326,998✔
3466
    // Reference file
192,672✔
3467
    if (m_has_work_in_progress)
318,184✔
3468
        return;
117,262✔
3469
    group_unblock_work(); // Throws
247,976✔
3470
}
219,206✔
3471

3472

3473
void ServerFile::group_unblock_work()
39,210✔
3474
{
346,534✔
3475
    REALM_ASSERT(!m_has_work_in_progress);
346,534✔
3476
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
346,544✔
3477
        unblock_work(); // Throws
346,526✔
3478
        const Work& work = m_work;
346,526✔
3479
        if (REALM_LIKELY(work.has_primary_work)) {
346,514✔
3480
            logger.trace("Work unit unblocked"); // Throws
346,046✔
3481
            m_has_work_in_progress = true;
346,046✔
3482
            Worker& worker = m_server.get_worker();
346,046✔
3483
            worker.enqueue(this); // Throws
346,046✔
3484
        }
346,060✔
3485
    }
346,526✔
3486
}
307,324✔
3487

3488

3489
void ServerFile::unblock_work()
39,210✔
3490
{
346,524✔
3491
    REALM_ASSERT(m_has_blocked_work);
326,580✔
3492

192,350✔
3493
    m_work.reset();
326,580✔
3494

172,406✔
3495
    // Discard requests for file identifiers whose receiver is no longer
172,406✔
3496
    // waiting.
192,350✔
3497
    {
346,524✔
3498
        auto i = m_file_ident_requests.begin();
346,524✔
3499
        auto end = m_file_ident_requests.end();
348,356✔
3500
        while (i != end) {
327,156✔
3501
            auto j = i++;
19,842✔
3502
            const FileIdentRequestInfo& info = j->second;
19,842✔
3503
            if (!info.receiver)
18,460✔
3504
                m_file_ident_requests.erase(j);
6,626✔
3505
        }
57,220✔
3506
    }
346,524✔
3507
    std::size_t n = m_file_ident_requests.size();
346,524✔
3508
    if (n > 0) {
308,644✔
3509
        m_work.file_ident_alloc_slots.resize(n); // Throws
14,136✔
3510
        std::size_t i = 0;
14,188✔
3511
        for (const auto& pair : m_file_ident_requests) {
14,600✔
3512
            const FileIdentRequestInfo& info = pair.second;
14,600✔
3513
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
14,600✔
3514
            slot.proxy_file = info.proxy_file;
14,600✔
3515
            slot.client_type = info.client_type;
14,600✔
3516
            ++i;
14,600✔
3517
        }
14,548✔
3518
        m_work.has_primary_work = true;
14,136✔
3519
    }
32,072✔
3520

172,406✔
3521
    // FIXME: `ServerFile::m_changesets_from_downstream` and
172,406✔
3522
    // `Work::changesets_from_downstream` should be renamed to something else,
172,406✔
3523
    // as it may contain kinds of data other than changesets.
172,406✔
3524

192,350✔
3525
    using std::swap;
346,524✔
3526
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
346,524✔
3527
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
346,524✔
3528
    bool has_changesets = !m_work.changesets_from_downstream.empty();
346,524✔
3529
    if (has_changesets) {
345,186✔
3530
        m_work.has_primary_work = true;
331,952✔
3531
    }
313,346✔
3532

172,406✔
3533
    // Keep track of the size of pending changesets
192,350✔
3534
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
346,524✔
3535
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
346,524✔
3536
    m_blocked_changesets_from_downstream_byte_size = 0;
326,580✔
3537

192,350✔
3538
    m_num_changesets_from_downstream = 0;
346,524✔
3539
    m_has_blocked_work = false;
346,524✔
3540
}
307,314✔
3541

3542

3543
void ServerFile::resume_download() noexcept
22,192✔
3544
{
212,476✔
3545
    for (const auto& entry : m_identified_sessions) {
317,508✔
3546
        Session& sess = *entry.second;
317,508✔
3547
        sess.ensure_enlisted_to_send();
317,508✔
3548
    }
304,126✔
3549
}
176,902✔
3550

3551

3552
void ServerFile::recognize_external_change()
4,798✔
3553
{
43,194✔
3554
    VersionInfo prev_version_info = m_version_info;
43,194✔
3555
    const ServerHistory& history = access().history;       // Throws
43,194✔
3556
    bool has_upstream_status;                              // Dummy
43,194✔
3557
    sync::file_ident_type partial_file_ident;              // Dummy
43,194✔
3558
    sync::version_type partial_progress_reference_version; // Dummy
43,194✔
3559
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
43,194✔
3560
                       partial_progress_reference_version); // Throws
40,796✔
3561

23,998✔
3562
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
43,194✔
3563
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
43,194✔
3564
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
43,194✔
3565
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
43,184✔
3566
        resume_download();
43,184✔
3567
    }
43,184✔
3568
}
38,396✔
3569

3570

3571
// NOTE: This function is executed by the worker thread
3572
void ServerFile::worker_allocate_file_identifiers()
1,328✔
3573
{
14,096✔
3574
    Work& work = m_work;
14,096✔
3575
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
14,096✔
3576
    ServerHistory& hist = worker_access().history;                                      // Throws
14,096✔
3577
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
14,096✔
3578
    m_work.produced_new_realm_version = true;
14,096✔
3579
}
12,768✔
3580

3581

3582
// Returns true when, and only when this function produces a new sync version
3583
// (adds a new entry to the sync history).
3584
//
3585
// NOTE: This function is executed by the worker thread
3586
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
37,854✔
3587
{
331,872✔
3588
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
312,718✔
3589

186,400✔
3590
    std::unique_ptr<ServerHistory> hist_ptr;
331,872✔
3591
    DBRef sg_ptr;
331,872✔
3592
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
331,872✔
3593
    bool backup_whole_realm = false;
331,872✔
3594
    bool produced_new_realm_version = hist.integrate_client_changesets(
331,872✔
3595
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
331,872✔
3596
        wlogger); // Throws
331,872✔
3597
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
331,872✔
3598
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
331,872✔
3599
    if (produced_new_realm_version) {
331,860✔
3600
        m_work.produced_new_realm_version = true;
331,696✔
3601
        if (produced_new_sync_version) {
311,258✔
3602
            m_work.produced_new_sync_version = true;
156,004✔
3603
        }
176,442✔
3604
    }
331,708✔
3605
    return produced_new_sync_version;
331,872✔
3606
}
294,018✔
3607

3608
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3609
                                                   DBRef& sg_ptr)
37,858✔
3610
{
331,880✔
3611
    if (state.use_file_cache)
331,880✔
3612
        return worker_access().history; // Throws
294,028✔
3613
    const std::string& path = m_worker_file.realm_path;
6,442,450,943✔
3614
    hist_ptr = m_server.make_history_for_path();                   // Throws
6,442,450,943✔
3615
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
6,442,450,943✔
3616
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
6,442,450,943✔
3617
    sg_ptr->claim_sync_agent();                                    // Throws
6,442,450,943✔
3618
    return *hist_ptr;                                              // Throws
6,442,450,943✔
3619
}
6,442,450,943✔
3620

3621

3622
// When worker thread finishes work unit.
3623
void ServerFile::group_postprocess_stage_1()
38,784✔
3624
{
342,540✔
3625
    REALM_ASSERT(m_has_work_in_progress);
322,768✔
3626

189,946✔
3627
    group_finalize_work_stage_1(); // Throws
342,540✔
3628
    group_finalize_work_stage_2(); // Throws
342,540✔
3629
    group_postprocess_stage_2();   // Throws
342,540✔
3630
}
303,756✔
3631

3632

3633
void ServerFile::group_postprocess_stage_2()
38,784✔
3634
{
342,540✔
3635
    REALM_ASSERT(m_has_work_in_progress);
342,540✔
3636
    group_postprocess_stage_3(); // Throws
322,768✔
3637
    // Suicide may have happened at this point
189,950✔
3638
}
303,756✔
3639

3640

3641
// When all files, including the reference file, have been backed up.
3642
void ServerFile::group_postprocess_stage_3()
38,784✔
3643
{
342,536✔
3644
    REALM_ASSERT(m_has_work_in_progress);
342,536✔
3645
    m_has_work_in_progress = false;
322,764✔
3646

189,950✔
3647
    logger.trace("Work unit postprocessing complete"); // Throws
342,536✔
3648
    if (m_has_blocked_work)
314,192✔
3649
        group_unblock_work(); // Throws
126,896✔
3650
}
303,752✔
3651

3652

3653
void ServerFile::finalize_work_stage_1()
38,784✔
3654
{
342,544✔
3655
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
312,996✔
3656
        // Report the byte size of completed downstream changesets.
91,304✔
3657
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
156,090✔
3658
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
156,090✔
3659
        m_unblocked_changesets_from_downstream_byte_size = 0;
156,090✔
3660
    }
157,690✔
3661

170,176✔
3662
    // Deal with errors (bad changesets) pertaining to downstream clients
189,948✔
3663
    std::size_t num_changesets_removed = 0;
342,544✔
3664
    std::size_t num_bytes_removed = 0;
322,784✔
3665
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
151,268✔
3666
        file_ident_type client_file_ident = entry.first;
174✔
3667
        ExtendedIntegrationError error = entry.second;
174✔
3668
        ProtocolError error_2 = ProtocolError::other_session_error;
174✔
3669
        switch (error) {
156✔
3670
            case ExtendedIntegrationError::client_file_expired:
✔
3671
                logger.debug("Changeset integration failed: Client file entry "
×
3672
                             "expired during session"); // Throws
×
3673
                error_2 = ProtocolError::client_file_expired;
×
3674
                break;
✔
3675
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3676
                error_2 = ProtocolError::bad_origin_file_ident;
×
3677
                break;
18✔
3678
            case ExtendedIntegrationError::bad_changeset:
174✔
3679
                error_2 = ProtocolError::bad_changeset;
174✔
3680
                break;
174✔
3681
        }
174✔
3682
        auto i = m_identified_sessions.find(client_file_ident);
174✔
3683
        if (i != m_identified_sessions.end()) {
174✔
3684
            Session& sess = *i->second;
174✔
3685
            SyncConnection& conn = sess.get_connection();
174✔
3686
            conn.protocol_error(error_2, &sess); // Throws
174✔
3687
        }
174✔
3688
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
174✔
3689
        std::size_t num_changesets = list.changesets.size();
174✔
3690
        std::size_t num_bytes = 0;
174✔
3691
        for (const IntegratableChangeset& ic : list.changesets)
156✔
3692
            num_bytes += ic.changeset.size();
18✔
3693
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
174✔
3694
                    client_file_ident); // Throws
174✔
3695
        num_changesets_removed += num_changesets;
174✔
3696
        num_bytes_removed += num_bytes;
174✔
3697
        m_changesets_from_downstream.erase(client_file_ident);
174✔
3698
    }
19,168✔
3699

189,948✔
3700
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
342,544✔
3701
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
322,772✔
3702

189,948✔
3703
    if (num_changesets_removed == 0)
342,538✔
3704
        return;
303,754✔
3705

10✔
3706
    m_num_changesets_from_downstream -= num_changesets_removed;
2,147,483,655✔
3707

4✔
3708
    // The byte size of the blocked changesets must be decremented.
10✔
3709
    if (num_bytes_removed > 0) {
2,147,483,655✔
3710
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3711
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3712
    }
6✔
3713
}
2,147,483,655✔
3714

3715

3716
void ServerFile::finalize_work_stage_2()
38,778✔
3717
{
322,764✔
3718
    // Expose new snapshot to remote peers
189,938✔
3719
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
342,530✔
3720
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
342,508✔
3721
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
342,204✔
3722
        m_version_info = m_work.version_info;
342,204✔
3723
    }
322,460✔
3724

189,938✔
3725
    bool resume_download_and_upload = m_work.produced_new_sync_version;
322,764✔
3726

170,172✔
3727
    // Deliver allocated file identifiers to requesters
189,938✔
3728
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
342,530✔
3729
    auto begin = m_file_ident_requests.begin();
342,530✔
3730
    auto i = begin;
323,538✔
3731
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
160,786✔
3732
        FileIdentRequestInfo& info = i->second;
14,386✔
3733
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
14,386✔
3734
        REALM_ASSERT(info.client_type == slot.client_type);
14,386✔
3735
        if (FileIdentReceiver* receiver = info.receiver) {
14,286✔
3736
            info.receiver = nullptr;
11,570✔
3737
            receiver->receive_file_ident(slot.file_ident); // Throws
11,570✔
3738
        }
11,670✔
3739
        ++i;
14,386✔
3740
    }
51,806✔
3741
    m_file_ident_requests.erase(begin, i);
322,764✔
3742

170,172✔
3743
    // Resume download to downstream clients
189,938✔
3744
    if (resume_download_and_upload) {
321,146✔
3745
        resume_download();
155,912✔
3746
    }
177,296✔
3747
}
303,752✔
3748

3749
// ============================ Worker implementation ============================
3750

3751
Worker::Worker(ServerImpl& server)
3752
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, "Worker: ", server.logger_ptr)}
3753
    // Throws
3754
    , logger(*logger_ptr)
3755
    , m_server{server}
3756
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
1,152✔
3757
{
12,252✔
3758
    util::seed_prng_nondeterministically(m_random); // Throws
12,252✔
3759
}
11,100✔
3760

3761

3762
void Worker::enqueue(ServerFile* file)
39,202✔
3763
{
346,084✔
3764
    util::LockGuard lock{m_mutex};
346,084✔
3765
    m_queue.push_back(file); // Throws
346,084✔
3766
    m_cond.notify_all();
346,084✔
3767
}
306,882✔
3768

3769

3770
std::mt19937_64& Worker::server_history_get_random() noexcept
2,402✔
3771
{
23,722✔
3772
    return m_random;
23,722✔
3773
}
21,320✔
3774

3775

3776
void Worker::run()
1,096✔
3777
{
50,932✔
3778
    for (;;) {
357,698✔
3779
        ServerFile* file = nullptr;
357,698✔
3780
        {
357,698✔
3781
            util::LockGuard lock{m_mutex};
397,346✔
3782
            for (;;) {
709,848✔
3783
                if (REALM_UNLIKELY(m_stop))
669,672✔
3784
                    return;
397,620✔
3785
                if (!m_queue.empty()) {
658,450✔
3786
                    file = m_queue.front();
345,950✔
3787
                    m_queue.pop_front();
345,950✔
3788
                    break;
345,950✔
3789
                }
346,414✔
3790
                m_cond.wait(lock);
352,146✔
3791
            }
352,776✔
3792
        }
357,098✔
3793
        file->worker_process_work_unit(m_state); // Throws
350,858✔
3794
    }
307,862✔
3795
}
10,656✔
3796

3797

3798
void Worker::stop() noexcept
1,096✔
3799
{
11,752✔
3800
    util::LockGuard lock{m_mutex};
11,752✔
3801
    m_stop = true;
11,752✔
3802
    m_cond.notify_all();
11,752✔
3803
}
10,656✔
3804

3805

3806
// ============================ ServerImpl implementation ============================
3807

3808
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3809
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::server, std::move(config.logger))}
3810
    , logger{*logger_ptr}
3811
    , m_config{std::move(config)}
3812
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
3813
    , m_root_dir{root_dir} // Throws
3814
    , m_access_control{std::move(pkey)}
3815
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
3816
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
3817
    , m_worker{*this}                                                                    // Throws
3818
    , m_acceptor{get_service()}
3819
    , m_server_protocol{}       // Throws
3820
    , m_compress_memory_arena{} // Throws
1,152✔
3821
{
12,254✔
3822
    if (m_config.ssl) {
11,126✔
3823
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
216✔
3824
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
216✔
3825
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
216✔
3826
    }
1,344✔
3827
}
11,102✔
3828

3829

3830
ServerImpl::~ServerImpl() noexcept
1,152✔
3831
{
12,256✔
3832
    bool server_destroyed_while_still_running = m_running;
12,256✔
3833
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
12,256✔
3834
}
11,104✔
3835

3836

3837
void ServerImpl::start()
1,152✔
3838
{
12,256✔
3839
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
12,256✔
3840
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
12,256✔
3841
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
12,256✔
3842
                m_protocol_version_range.first,
12,256✔
3843
                m_protocol_version_range.second); // Throws
12,256✔
3844
    logger.info("Platform: %1", util::get_platform_info());
12,256✔
3845
    bool is_debug_build = false;
12,256✔
3846
#if REALM_DEBUG
12,256✔
3847
    is_debug_build = true;
12,256✔
3848
#endif
12,256✔
3849
    {
12,256✔
3850
        const char* lead_text = "Build mode";
12,256✔
3851
        if (is_debug_build) {
12,256✔
3852
            logger.info("%1: Debug", lead_text); // Throws
12,256✔
3853
        }
11,104✔
3854
        else {
×
3855
            logger.info("%1: Release", lead_text); // Throws
×
3856
        }
1,152✔
3857
    }
12,256✔
3858
    if (is_debug_build) {
12,256✔
3859
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
12,256✔
3860
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
12,256✔
3861
    }
12,256✔
3862
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
12,256✔
3863
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
12,256✔
3864
    {
12,256✔
3865
        const char* lead_text = "Encryption";
12,256✔
3866
        if (m_config.encryption_key) {
11,108✔
3867
            logger.info("%1: Yes", lead_text); // Throws
36✔
3868
        }
1,180✔
3869
        else {
12,220✔
3870
            logger.info("%1: No", lead_text); // Throws
12,220✔
3871
        }
12,224✔
3872
    }
12,256✔
3873
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
12,256✔
3874
    {
12,256✔
3875
        const char* lead_text = "Disable sync to disk";
12,256✔
3876
        if (m_config.disable_sync_to_disk) {
11,556✔
3877
            logger.info("%1: All files", lead_text); // Throws
5,956✔
3878
        }
6,204✔
3879
        else {
6,300✔
3880
            logger.info("%1: No", lead_text); // Throws
6,300✔
3881
        }
6,752✔
3882
    }
12,256✔
3883
    if (m_config.disable_sync_to_disk) {
11,556✔
3884
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
5,956✔
3885
                    "never do this in production!"); // Throws
5,956✔
3886
    }
6,656✔
3887
    logger.info("Download compaction: %1",
12,256✔
3888
                (m_config.disable_download_compaction ? "No" : "Yes")); // Throws
12,256✔
3889
    logger.info("Download bootstrap caching: %1",
12,256✔
3890
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
12,256✔
3891
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
12,256✔
3892
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
12,256✔
3893
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
12,256✔
3894
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
12,256✔
3895
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
12,256✔
3896
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
12,256✔
3897
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
12,256✔
3898
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
11,628✔
3899

6,288✔
3900
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
11,628✔
3901

6,288✔
3902
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
11,628✔
3903

6,288✔
3904
    listen(); // Throws
12,256✔
3905
}
11,104✔
3906

3907

3908
void ServerImpl::run()
1,096✔
3909
{
11,752✔
3910
    auto ta = util::make_temp_assign(m_running, true);
11,152✔
3911

6,008✔
3912
    {
11,752✔
3913
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
11,752✔
3914
        std::string name;
11,752✔
3915
        if (util::Thread::get_name(name)) {
11,256✔
3916
            name += "-worker";
6,344✔
3917
            worker_thread.start_with_signals_blocked(name); // Throws
6,344✔
3918
        }
6,240✔
3919
        else {
5,408✔
3920
            worker_thread.start_with_signals_blocked(); // Throws
5,408✔
3921
        }
5,408✔
3922

6,008✔
3923
        m_service.run(); // Throws
11,152✔
3924

6,008✔
3925
        worker_thread.stop_and_rethrow(); // Throws
11,752✔
3926
    }
11,152✔
3927

6,008✔
3928
    logger.info("Realm sync server stopped");
11,752✔
3929
}
10,656✔
3930

3931

3932
void ServerImpl::stop() noexcept
2,002✔
3933
{
19,904✔
3934
    util::LockGuard lock{m_mutex};
19,904✔
3935
    if (m_stopped)
18,750✔
3936
        return;
7,954✔
3937
    m_stopped = true;
12,256✔
3938
    m_wait_or_service_stopped_cond.notify_all();
12,256✔
3939
    m_service.stop();
12,256✔
3940
}
11,102✔
3941

3942

3943
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
22,562✔
3944
{
205,992✔
3945
    m_pending_changesets_from_downstream_byte_size += byte_size;
205,992✔
3946
    logger.debug("Byte size for pending downstream changesets incremented by "
205,992✔
3947
                 "%1 to reach a total of %2",
205,992✔
3948
                 byte_size,
205,992✔
3949
                 m_pending_changesets_from_downstream_byte_size); // Throws
205,992✔
3950
}
183,430✔
3951

3952

3953
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
17,412✔
3954
{
156,088✔
3955
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
156,088✔
3956
    m_pending_changesets_from_downstream_byte_size -= byte_size;
156,088✔
3957
    logger.debug("Byte size for pending downstream changesets decremented by "
156,088✔
3958
                 "%1 to reach a total of %2",
156,088✔
3959
                 byte_size,
156,088✔
3960
                 m_pending_changesets_from_downstream_byte_size); // Throws
156,088✔
3961
}
138,676✔
3962

3963

3964
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
1,092✔
3965
{
9,760✔
3966
    return get_random();
9,760✔
3967
}
8,668✔
3968

3969

3970
void ServerImpl::listen()
1,152✔
3971
{
12,252✔
3972
    network::Resolver resolver{get_service()};
12,252✔
3973
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
12,252✔
3974
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
12,252✔
3975
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
11,624✔
3976

6,284✔
3977
    auto i = endpoints.begin();
12,252✔
3978
    auto end = endpoints.end();
12,252✔
3979
    for (;;) {
12,256✔
3980
        std::error_code ec;
12,256✔
3981
        m_acceptor.open(i->protocol(), ec);
12,256✔
3982
        if (!ec) {
12,256✔
3983
            using SocketBase = network::SocketBase;
12,256✔
3984
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
12,256✔
3985
            if (!ec) {
12,256✔
3986
                m_acceptor.bind(*i, ec);
12,254✔
3987
                if (!ec)
12,254✔
3988
                    break;
11,102✔
3989
            }
2✔
3990
            m_acceptor.close();
2✔
3991
        }
526!
3992
        if (i + 1 == end) {
5,136!
3993
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
3994
                // FIXME: We don't have the error code for previous attempts, so
3995
                // can't print a nice message.
3996
                logger.error("Failed to bind to %1:%2", i2->address(),
×
3997
                             i2->port()); // Throws
×
3998
            }
×
3999
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
4000
                         ec.message()); // Throws
×
4001
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
UNCOV
4002
        }
×
4003
    }
526✔
4004

6,284✔
4005
    m_acceptor.listen(m_config.listen_backlog);
11,624✔
4006

6,284✔
4007
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
12,242✔
4008
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
12,172✔
4009
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
12,252✔
4010
                m_config.listen_backlog, ssl_mode); // Throws
11,624✔
4011

6,284✔
4012
    initiate_accept();
12,252✔
4013
}
11,100✔
4014

4015

4016
void ServerImpl::initiate_accept()
3,218✔
4017
{
29,270✔
4018
    auto handler = [this](std::error_code ec) {
22,776✔
4019
        if (ec != util::error::operation_aborted)
17,640✔
4020
            handle_accept(ec);
17,642✔
4021
    };
18,792✔
4022
    bool is_ssl = bool(m_ssl_context);
29,898✔
4023
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
29,898✔
4024
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
29,898✔
4025
}
26,680✔
4026

4027

4028
void ServerImpl::handle_accept(std::error_code ec)
2,066✔
4029
{
17,642✔
4030
    if (ec) {
15,576✔
4031
        if (ec != util::error::connection_aborted) {
×
4032
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4033

4034
            // We close the reserved files to get a few extra file descriptors.
×
4035
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4036
                m_reserved_files[i].reset();
×
4037
            }
4038

4039
            // FIXME: There are probably errors that need to be treated
4040
            // specially, and not cause the server to "crash".
4041

×
4042
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4043
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4044
                             "consider increasing the limit in your system config"); // Throws
×
4045
                throw OutOfFilesError(ec);
×
4046
            }
×
4047
            else {
×
4048
                throw std::system_error(ec);
×
4049
            }
×
4050
        }
×
4051
        logger.debug("Skipping aborted connection"); // Throws
×
4052
    }
2,066✔
4053
    else {
17,642✔
4054
        HTTPConnection& conn = *m_next_http_conn;
17,642✔
4055
        if (m_config.tcp_no_delay)
17,304✔
4056
            conn.get_socket().set_option(network::SocketBase::no_delay(true));       // Throws
15,140✔
4057
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn));      // Throws
17,642✔
4058
        Formatter& formatter = m_misc_buffers.formatter;
17,642✔
4059
        formatter.reset();
17,642✔
4060
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
17,642✔
4061
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
17,642✔
4062
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
17,642✔
4063
    }
17,642✔
4064
    initiate_accept(); // Throws
17,642✔
4065
}
15,576✔
4066

4067

4068
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
2,064✔
4069
{
17,636✔
4070
    m_http_connections.erase(conn_id);
17,636✔
4071
}
15,572✔
4072

4073

4074
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
2,012✔
4075
{
17,288✔
4076
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
17,288✔
4077
}
15,276✔
4078

4079

4080
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
1,190✔
4081
{
9,914✔
4082
    m_sync_connections.erase(connection_id);
9,914✔
4083
}
8,724✔
4084

4085

4086
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4✔
4087
{
36✔
4088
    get_service().post([this, timeout](Status) {
36✔
4089
        m_config.connection_reaper_timeout = timeout;
36✔
4090
    });
36✔
4091
}
32✔
4092

4093

4094
void ServerImpl::close_connections()
20✔
4095
{
180✔
4096
    get_service().post([this](Status) {
180✔
4097
        do_close_connections(); // Throws
180✔
4098
    });
180✔
4099
}
160✔
4100

4101

4102
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
72✔
4103
{
648✔
4104
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
648✔
4105
}
576✔
4106

4107

4108
void ServerImpl::recognize_external_change(const std::string& virt_path)
4,800✔
4109
{
43,200✔
4110
    std::string virt_path_2 = virt_path; // Throws (copy)
43,200✔
4111
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
43,200✔
4112
        do_recognize_external_change(virt_path); // Throws
43,200✔
4113
    });                                          // Throws
43,200✔
4114
}
38,400✔
4115

4116

4117
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4118
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4119
{
×
4120
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4121
                "timeout = %1",
×
4122
                timeout); // Throws
4123

4124
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4125
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4126
                                                    timeout); // Throws
×
4127
    });
×
4128
}
4129

4130

4131
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
1,190✔
4132
{
13,140✔
4133
    m_connection_reaper_timer.emplace(get_service());
12,512✔
4134
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
6,022✔
4135
        if (status != ErrorCodes::OperationAborted) {
890✔
4136
            reap_connections();                        // Throws
890✔
4137
            initiate_connection_reaper_timer(timeout); // Throws
890✔
4138
        }
890✔
4139
    }); // Throws
2,042✔
4140
}
11,950✔
4141

4142

4143
void ServerImpl::reap_connections()
38✔
4144
{
890✔
4145
    logger.debug("Discarding dead connections"); // Throws
890✔
4146
    SteadyTimePoint now = steady_clock_now();
890✔
4147
    {
890✔
4148
        auto end = m_http_connections.end();
890✔
4149
        auto i = m_http_connections.begin();
894✔
4150
        while (i != end) {
864✔
4151
            HTTPConnection& conn = *i->second;
12✔
4152
            ++i;
10✔
4153
            // Suicide
8✔
4154
            conn.terminate_if_dead(now); // Throws
12✔
4155
        }
46✔
4156
    }
890✔
4157
    {
890✔
4158
        auto end = m_sync_connections.end();
890✔
4159
        auto i = m_sync_connections.begin();
920✔
4160
        while (i != end) {
1,690✔
4161
            SyncConnection& conn = *i->second;
838✔
4162
            ++i;
828✔
4163
            // Suicide
790✔
4164
            conn.terminate_if_dead(now); // Throws
838✔
4165
        }
846✔
4166
    }
890✔
4167
}
852✔
4168

4169

4170
void ServerImpl::do_close_connections()
20✔
4171
{
180✔
4172
    for (auto& entry : m_sync_connections) {
180✔
4173
        SyncConnection& conn = *entry.second;
180✔
4174
        conn.initiate_soft_close(); // Throws
180✔
4175
    }
180✔
4176
}
160✔
4177

4178

4179
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4,800✔
4180
{
43,200✔
4181
    auto i = m_files.find(virt_path);
43,200✔
4182
    if (i == m_files.end())
38,402✔
4183
        return;
4,802✔
4184
    ServerFile& file = *i->second;
43,194✔
4185
    file.recognize_external_change();
43,194✔
4186
}
38,396✔
4187

4188

4189
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4190
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4191
{
×
4192
    static_cast<void>(timeout);
×
4193
    if (m_sync_stopped)
×
4194
        return;
×
4195
    do_close_connections(); // Throws
×
4196
    m_sync_stopped = true;
×
4197
    bool completion_reached = false;
×
4198
    completion_handler(completion_reached); // Throws
×
4199
}
4200

4201

4202
// ============================ SyncConnection implementation ============================
4203

4204
SyncConnection::~SyncConnection() noexcept
2,014✔
4205
{
17,296✔
4206
    m_sessions_enlisted_to_send.clear();
17,296✔
4207
    m_sessions.clear();
17,296✔
4208
}
15,282✔
4209

4210

4211
void SyncConnection::initiate()
2,014✔
4212
{
17,296✔
4213
    m_last_activity_at = steady_clock_now();
17,296✔
4214
    logger.debug("Sync Connection initiated");
17,296✔
4215
    m_websocket.initiate_server_websocket_after_handshake();
17,296✔
4216
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
17,296✔
4217
                     m_appservices_request_id);
17,296✔
4218
}
15,282✔
4219

4220

4221
template <class... Params>
4222
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1,190✔
4223
{
9,914✔
4224
    terminate_sessions();                              // Throws
9,914✔
4225
    logger.log(log_level, log_message, log_params...); // Throws
9,914✔
4226
    m_websocket.stop();
9,914✔
4227
    m_ssl_stream.reset();
9,914✔
4228
    m_socket.reset();
9,426✔
4229
    // Suicide
5,954✔
4230
    m_server.remove_sync_connection(m_id);
9,914✔
4231
}
8,724✔
4232

4233

4234
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
30✔
4235
{
838✔
4236
    milliseconds_type time = steady_duration(m_last_activity_at, now);
838✔
4237
    const Server::Config& config = m_server.get_config();
838✔
4238
    if (m_is_closing) {
808✔
4239
        if (time >= config.soft_close_timeout) {
×
4240
            // Suicide
4241
            terminate(Logger::Level::detail,
×
4242
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4243
        }
×
4244
    }
30✔
4245
    else {
838✔
4246
        if (time >= config.connection_reaper_timeout) {
810✔
4247
            // Suicide
20✔
4248
            terminate(Logger::Level::detail,
36✔
4249
                      "Sync connection closed (no heartbeat)"); // Throws
36✔
4250
        }
62✔
4251
    }
838✔
4252
}
808✔
4253

4254

4255
void SyncConnection::enlist_to_send(Session* sess) noexcept
114,788✔
4256
{
996,162✔
4257
    REALM_ASSERT(m_send_trigger);
996,162✔
4258
    REALM_ASSERT(!m_is_closing);
996,162✔
4259
    REALM_ASSERT(!sess->is_enlisted_to_send());
996,162✔
4260
    m_sessions_enlisted_to_send.push_back(sess);
996,162✔
4261
    m_send_trigger->trigger();
996,162✔
4262
}
881,374✔
4263

4264

4265
void SyncConnection::handle_protocol_error(Status status)
4266
{
×
4267
    logger.error("%1", status);
×
4268
    switch (status.code()) {
×
4269
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4270
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4271
            break;
×
4272
        case ErrorCodes::LimitExceeded:
×
4273
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4274
            break;
×
4275
        default:
×
4276
            protocol_error(ProtocolError::other_error);
×
4277
            break;
×
4278
    }
×
4279
}
4280

4281
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4282
                                          std::string signed_user_token, bool need_client_file_ident,
4283
                                          bool is_subserver)
6,940✔
4284
{
58,094✔
4285
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
58,094✔
4286
    bool was_inserted = p.second;
58,094✔
4287
    if (REALM_UNLIKELY(!was_inserted)) {
51,154✔
4288
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4289
                     session_ident);                           // Throws
×
4290
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4291
        return;
×
4292
    }
6,940✔
4293
    try {
58,094✔
4294
        p.first->second.reset(new Session(*this, session_ident)); // Throws
58,094✔
4295
    }
53,940✔
4296
    catch (...) {
21,730✔
4297
        m_sessions.erase(p.first);
×
4298
        throw;
×
4299
    }
2,786✔
4300

28,678✔
4301
    Session& sess = *p.first->second;
58,102✔
4302
    sess.initiate(); // Throws
58,102✔
4303
    ProtocolError error;
58,102✔
4304
    bool success =
58,102✔
4305
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
58,102✔
4306
                                  error); // Throws
58,102✔
4307
    if (REALM_UNLIKELY(!success))         // Throws
53,962✔
4308
        protocol_error(error, &sess);     // Throws
28,790✔
4309
}
51,162✔
4310

4311

4312
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4313
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4314
                                           version_type scan_client_version, version_type latest_server_version,
4315
                                           salt_type latest_server_version_salt)
6,220✔
4316
{
48,026✔
4317
    auto i = m_sessions.find(session_ident);
48,026✔
4318
    if (REALM_UNLIKELY(i == m_sessions.end())) {
41,806✔
4319
        bad_session_ident("IDENT", session_ident); // Throws
×
4320
        return;
×
4321
    }
6,220✔
4322
    Session& sess = *i->second;
48,026✔
4323
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
41,806✔
4324
        message_after_unbind("IDENT", session_ident); // Throws
×
4325
        return;
×
4326
    }
6,220✔
4327
    if (REALM_UNLIKELY(sess.error_occurred())) {
41,814✔
4328
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
72✔
4329
        // messages, other than UNBIND, must be ignored.
80✔
4330
        return;
144✔
4331
    }
6,332✔
4332
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
41,678✔
4333
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4334
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4335
        return;
×
4336
    }
6,204✔
4337
    if (REALM_UNLIKELY(sess.ident_message_received())) {
41,678✔
4338
        logger.error("Received second IDENT message for session"); // Throws
×
4339
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4340
        return;
×
4341
    }
2,218✔
4342

22,680✔
4343
    ProtocolError error = {};
47,882✔
4344
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
47,882✔
4345
                                              scan_client_version, latest_server_version, latest_server_version_salt,
47,882✔
4346
                                              error); // Throws
47,882✔
4347
    if (REALM_UNLIKELY(!success))                     // Throws
43,912✔
4348
        protocol_error(error, &sess);                 // Throws
22,808✔
4349
}
41,678✔
4350

4351
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4352
                                            version_type progress_server_version, version_type locked_server_version,
4353
                                            const UploadChangesets& upload_changesets)
45,734✔
4354
{
407,420✔
4355
    auto i = m_sessions.find(session_ident);
407,420✔
4356
    if (REALM_UNLIKELY(i == m_sessions.end())) {
361,686✔
4357
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4358
        return;
×
4359
    }
45,734✔
4360
    Session& sess = *i->second;
407,420✔
4361
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
361,686✔
4362
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4363
        return;
×
4364
    }
45,734✔
4365
    if (REALM_UNLIKELY(sess.error_occurred())) {
361,686✔
4366
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4367
        // messages, other than UNBIND, must be ignored.
4368
        return;
×
4369
    }
45,734✔
4370
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
361,686✔
4371
        message_before_ident("UPLOAD", session_ident); // Throws
×
4372
        return;
×
4373
    }
22,558✔
4374

225,326✔
4375
    ProtocolError error = {};
407,420✔
4376
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
407,420✔
4377
                                               locked_server_version, upload_changesets, error); // Throws
407,420✔
4378
    if (REALM_UNLIKELY(!success))                                                                // Throws
384,244✔
4379
        protocol_error(error, &sess);                                                            // Throws
225,326✔
4380
}
361,686✔
4381

4382

4383
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
14,176✔
4384
{
120,618✔
4385
    auto i = m_sessions.find(session_ident);
120,618✔
4386
    if (REALM_UNLIKELY(i == m_sessions.end())) {
106,442✔
4387
        bad_session_ident("MARK", session_ident);
×
4388
        return;
×
4389
    }
14,176✔
4390
    Session& sess = *i->second;
120,618✔
4391
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
106,442✔
4392
        message_after_unbind("MARK", session_ident); // Throws
×
4393
        return;
×
4394
    }
14,176✔
4395
    if (REALM_UNLIKELY(sess.error_occurred())) {
106,466✔
4396
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
216✔
4397
        // messages, other than UNBIND, must be ignored.
238✔
4398
        return;
430✔
4399
    }
14,514✔
4400
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
106,058✔
4401
        message_before_ident("MARK", session_ident); // Throws
×
4402
        return;
×
4403
    }
6,006✔
4404

62,076✔
4405
    ProtocolError error;
120,188✔
4406
    bool success = sess.receive_mark_message(request_ident, error); // Throws
120,188✔
4407
    if (REALM_UNLIKELY(!success))                                   // Throws
112,064✔
4408
        protocol_error(error, &sess);                               // Throws
62,076✔
4409
}
106,058✔
4410

4411

4412
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4,148✔
4413
{
34,218✔
4414
    auto i = m_sessions.find(session_ident); // Throws
34,218✔
4415
    if (REALM_UNLIKELY(i == m_sessions.end())) {
30,070✔
4416
        bad_session_ident("UNBIND", session_ident); // Throws
×
4417
        return;
×
4418
    }
4,148✔
4419
    Session& sess = *i->second;
34,218✔
4420
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
30,070✔
4421
        message_after_unbind("UNBIND", session_ident); // Throws
×
4422
        return;
×
4423
    }
1,232✔
4424

14,826✔
4425
    sess.receive_unbind_message(); // Throws
31,302✔
4426
    // NOTE: The session might have gotten destroyed at this time!
14,826✔
4427
}
30,070✔
4428

4429

4430
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
200✔
4431
{
1,622✔
4432
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
1,622✔
4433
    m_send_pong = true;
1,622✔
4434
    m_last_ping_timestamp = timestamp;
1,622✔
4435
    if (!m_is_sending)
1,618✔
4436
        send_next_message();
1,610✔
4437
}
1,422✔
4438

4439

4440
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4441
                                           std::string_view error_body)
4442
{
×
4443
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4444
                 session_ident); // Throws
×
4445
    auto i = m_sessions.find(session_ident);
×
4446
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4447
        bad_session_ident("ERROR", session_ident);
×
4448
        return;
×
4449
    }
×
4450
    Session& sess = *i->second;
×
4451
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4452
        message_after_unbind("ERROR", session_ident); // Throws
×
4453
        return;
×
4454
    }
4455

4456
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4457
}
4458

4459
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4460
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
8,184✔
4461
{
64,890✔
4462
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
56,706✔
4463
        return logger.log(level, message.c_str());
×
4464
    }
3,266✔
4465

32,126✔
4466
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
64,890✔
4467
    {
64,890✔
4468
        std::lock_guard lock(m_log_mutex);
64,890✔
4469
        m_log_messages.push(std::move(log_msg));
64,890✔
4470
    }
64,890✔
4471
    m_send_trigger->trigger();
64,890✔
4472
}
56,706✔
4473

4474

4475
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4476
{
×
4477
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4478
                 session_ident);                      // Throws
×
4479
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4480
}
4481

4482

4483
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4484
{
×
4485
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4486
                 session_ident);                      // Throws
×
4487
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4488
}
4489

4490

4491
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4492
{
×
4493
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4494
                 session_ident);                      // Throws
×
4495
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4496
}
4497

4498

4499
void SyncConnection::handle_message_received(const char* data, size_t size)
77,420✔
4500
{
627,496✔
4501
    // parse_message_received() parses the message and calls the
312,202✔
4502
    // proper handler on the SyncConnection object (this).
354,692✔
4503
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
669,986✔
4504
    return;
669,986✔
4505
}
592,566✔
4506

4507

4508
void SyncConnection::handle_ping_received(const char* data, size_t size)
4509
{
4510
    // parse_message_received() parses the message and calls the
4511
    // proper handler on the SyncConnection object (this).
4512
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4513
    return;
×
4514
}
4515

4516

4517
void SyncConnection::send_next_message()
114,388✔
4518
{
985,932✔
4519
    REALM_ASSERT(!m_is_sending);
985,932✔
4520
    REALM_ASSERT(!m_sending_pong);
985,932✔
4521
    if (m_send_pong) {
871,744✔
4522
        send_pong(m_last_ping_timestamp);
1,622✔
4523
        if (m_sending_pong)
1,622✔
4524
            return;
115,610✔
4525
    }
1,037,662✔
4526
    for (;;) {
1,448,246✔
4527
        Session* sess = m_sessions_enlisted_to_send.pop_front();
1,448,246✔
4528
        if (!sess) {
1,305,480✔
4529
            // No sessions were enlisted to send
247,204✔
4530
            if (REALM_LIKELY(!m_is_closing))
453,750✔
4531
                break; // Check to see if there are any log messages to go out
400,722✔
4532
            // Send a connection level ERROR
98✔
4533
            REALM_ASSERT(!is_session_level_error(m_error_code));
178✔
4534
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
178✔
4535
            return;
178✔
4536
        }
114,734✔
4537
        sess->send_message(); // Throws
936,560✔
4538
        // NOTE: The session might have gotten destroyed at this time!
506,590✔
4539

506,590✔
4540
        // At this point, `m_is_sending` is true if, and only if the session
506,590✔
4541
        // chose to send a message. If it chose to not send a message, we must
506,590✔
4542
        // loop back and give the next session in `m_sessions_enlisted_to_send`
506,590✔
4543
        // a chance.
564,516✔
4544
        if (m_is_sending)
941,146✔
4545
            return;
583,962✔
4546
    }
962,432✔
4547
    {
688,832✔
4548
        std::lock_guard lock(m_log_mutex);
453,510✔
4549
        if (!m_log_messages.empty()) {
408,708✔
4550
            send_log_message(m_log_messages.front());
64,520✔
4551
            m_log_messages.pop();
64,520✔
4552
        }
109,322✔
4553
    }
425,338✔
4554
    // Otherwise, nothing to do
247,070✔
4555
}
400,574✔
4556

4557

4558
void SyncConnection::initiate_write_output_buffer()
69,360✔
4559
{
595,090✔
4560
    auto handler = [this](std::error_code ec, size_t) {
594,844✔
4561
        if (!ec) {
594,658✔
4562
            handle_write_output_buffer();
593,550✔
4563
        }
593,726✔
4564
    };
558,338✔
4565

328,468✔
4566
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
595,140✔
4567
                                   std::move(handler)); // Throws
595,140✔
4568
    m_is_sending = true;
595,140✔
4569
}
525,780✔
4570

4571

4572
void SyncConnection::initiate_pong_output_buffer()
200✔
4573
{
1,622✔
4574
    auto handler = [this](std::error_code ec, size_t) {
1,622✔
4575
        if (!ec) {
1,618✔
4576
            handle_pong_output_buffer();
1,618✔
4577
        }
1,618✔
4578
    };
1,514✔
4579

788✔
4580
    REALM_ASSERT(!m_is_sending);
1,622✔
4581
    REALM_ASSERT(!m_sending_pong);
1,622✔
4582
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
1,622✔
4583
                                   std::move(handler)); // Throws
1,518✔
4584

788✔
4585
    m_is_sending = true;
1,622✔
4586
    m_sending_pong = true;
1,622✔
4587
}
1,422✔
4588

4589

4590
void SyncConnection::send_pong(milliseconds_type timestamp)
200✔
4591
{
1,622✔
4592
    REALM_ASSERT(m_send_pong);
1,622✔
4593
    REALM_ASSERT(!m_sending_pong);
1,622✔
4594
    m_send_pong = false;
1,622✔
4595
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
1,518✔
4596

788✔
4597
    OutputBuffer& out = get_output_buffer();
1,622✔
4598
    get_server_protocol().make_pong(out, timestamp); // Throws
1,518✔
4599

788✔
4600
    initiate_pong_output_buffer(); // Throws
1,622✔
4601
}
1,422✔
4602

4603
void SyncConnection::send_log_message(const LogMessage& log_msg)
8,134✔
4604
{
64,520✔
4605
    OutputBuffer& out = get_output_buffer();
64,520✔
4606
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
64,520✔
4607
                                           log_msg.co_id); // Throws
59,630✔
4608

32,004✔
4609
    initiate_write_output_buffer(); // Throws
64,520✔
4610
}
56,386✔
4611

4612

4613
void SyncConnection::handle_write_output_buffer()
69,134✔
4614
{
593,552✔
4615
    release_output_buffer();
593,552✔
4616
    m_is_sending = false;
593,552✔
4617
    send_next_message(); // Throws
593,552✔
4618
}
524,418✔
4619

4620

4621
void SyncConnection::handle_pong_output_buffer()
200✔
4622
{
1,618✔
4623
    release_output_buffer();
1,618✔
4624
    REALM_ASSERT(m_is_sending);
1,618✔
4625
    REALM_ASSERT(m_sending_pong);
1,618✔
4626
    m_is_sending = false;
1,618✔
4627
    m_sending_pong = false;
1,618✔
4628
    send_next_message(); // Throws
1,618✔
4629
}
1,418✔
4630

4631

4632
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
20✔
4633
{
180✔
4634
    const char* message = get_protocol_error_message(int(error_code));
180✔
4635
    std::size_t message_size = std::strlen(message);
180✔
4636
    bool try_again = determine_try_again(error_code);
170✔
4637

100✔
4638
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
180✔
4639
                  message_size, try_again, session_ident); // Throws
170✔
4640

100✔
4641
    OutputBuffer& out = get_output_buffer();
180✔
4642
    int protocol_version = get_client_protocol_version();
180✔
4643
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
180✔
4644
                                             session_ident); // Throws
170✔
4645

100✔
4646
    auto handler = [this](std::error_code ec, size_t) {
180✔
4647
        handle_write_error(ec); // Throws
180✔
4648
    };
180✔
4649
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
180✔
4650
    m_is_sending = true;
180✔
4651
}
160✔
4652

4653

4654
void SyncConnection::handle_write_error(std::error_code ec)
20✔
4655
{
180✔
4656
    m_is_sending = false;
180✔
4657
    REALM_ASSERT(m_is_closing);
180✔
4658
    if (!m_ssl_stream) {
180✔
4659
        m_socket->shutdown(network::Socket::shutdown_send, ec);
180!
4660
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
160!
4661
            throw std::system_error(ec);
20✔
4662
    }
180✔
4663
}
160✔
4664

4665

4666
// For connection level errors, `sess` is ignored. For session level errors, a
4667
// session must be specified.
4668
//
4669
// If a session is specified, that session object will have been detached from
4670
// the ServerFile object (and possibly destroyed) upon return from
4671
// protocol_error().
4672
//
4673
// If a session is specified for a protocol level error, that session object
4674
// will have been destroyed upon return from protocol_error(). For session level
4675
// errors, the specified session will have been destroyed upon return from
4676
// protocol_error() if, and only if the negotiated protocol version is less than
4677
// 23.
4678
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
78✔
4679
{
714✔
4680
    REALM_ASSERT(!m_is_closing);
714✔
4681
    bool session_level = is_session_level_error(error_code);
714✔
4682
    REALM_ASSERT(!session_level || sess);
714✔
4683
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
714✔
4684
    if (logger.would_log(util::Logger::Level::debug)) {
636✔
4685
        const char* message = get_protocol_error_message(int(error_code));
×
4686
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4687
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4688
    }
78✔
4689
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
714✔
4690
    if (session_level) {
714✔
4691
        sess->initiate_deactivation(error_code); // Throws
714✔
4692
        return;
714✔
4693
    }
636✔
4694
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4695
}
4696

4697

4698
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
20✔
4699
{
180✔
4700
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
170✔
4701

90✔
4702
    // With recent versions of the protocol (when the version is greater than,
90✔
4703
    // or equal to 23), this function will only be called for connection level
90✔
4704
    // errors, never for session specific errors. However, for the purpose of
90✔
4705
    // emulating earlier protocol versions, this function might be called for
90✔
4706
    // session specific errors too.
100✔
4707
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
180✔
4708
    REALM_ASSERT(!is_session_level_error(error_code));
170✔
4709

100✔
4710
    REALM_ASSERT(m_send_trigger);
180✔
4711
    REALM_ASSERT(!m_is_closing);
180✔
4712
    m_is_closing = true;
170✔
4713

100✔
4714
    m_error_code = error_code;
180✔
4715
    m_error_session_ident = session_ident;
170✔
4716

90✔
4717
    // Don't waste time and effort sending any other messages
100✔
4718
    m_send_pong = false;
180✔
4719
    m_sessions_enlisted_to_send.clear();
170✔
4720

100✔
4721
    m_receiving_session = nullptr;
170✔
4722

100✔
4723
    terminate_sessions(); // Throws
170✔
4724

100✔
4725
    m_send_trigger->trigger();
180✔
4726
}
160✔
4727

4728

4729
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
648✔
4730
{
5,612✔
4731
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
4,832✔
4732
    // Suicide
3,496✔
4733
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
5,702✔
4734
}
5,054✔
4735

4736

4737
void SyncConnection::close_due_to_error(std::error_code ec)
538✔
4738
{
3,972✔
4739
    // Suicide
2,438✔
4740
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
4,176✔
4741
              ec.message()); // Throws
4,176✔
4742
}
3,638✔
4743

4744

4745
void SyncConnection::terminate_sessions()
1,210✔
4746
{
10,740✔
4747
    for (auto& entry : m_sessions) {
15,408✔
4748
        Session& sess = *entry.second;
15,408✔
4749
        sess.terminate(); // Throws
15,408✔
4750
    }
14,762✔
4751
    m_sessions_enlisted_to_send.clear();
10,094✔
4752
    m_sessions.clear();
10,094✔
4753
}
8,884✔
4754

4755

4756
void SyncConnection::initiate_soft_close()
20✔
4757
{
180✔
4758
    if (!m_is_closing) {
180✔
4759
        session_ident_type session_ident = 0;                                    // Not session specific
180✔
4760
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
180✔
4761
    }
180✔
4762
}
160✔
4763

4764

4765
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4,142✔
4766
{
34,132✔
4767
    m_sessions.erase(session_ident);
34,132✔
4768
}
29,990✔
4769

4770
} // anonymous namespace
4771

4772

4773
// ============================ sync::Server implementation ============================
4774

4775
class Server::Implementation : public ServerImpl {
4776
public:
4777
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4778
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
1,152✔
4779
    {
12,252✔
4780
    }
12,252✔
4781
    virtual ~Implementation() {}
11,104✔
4782
};
4783

4784

4785
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4786
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
1,152✔
4787
{
12,246✔
4788
}
11,094✔
4789

4790

4791
Server::Server(Server&& serv) noexcept
4792
    : m_impl{std::move(serv.m_impl)}
4793
{
×
4794
}
4795

4796

1,152✔
4797
Server::~Server() noexcept {}
11,104✔
4798

4799

4800
void Server::start()
472✔
4801
{
6,136✔
4802
    m_impl->start(); // Throws
6,136✔
4803
}
5,664✔
4804

4805

4806
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
680✔
4807
{
6,120✔
4808
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
6,120✔
4809
}
5,440✔
4810

4811

4812
network::Endpoint Server::listen_endpoint() const
1,186✔
4813
{
12,336✔
4814
    return m_impl->listen_endpoint(); // Throws
12,336✔
4815
}
11,150✔
4816

4817

4818
void Server::run()
1,096✔
4819
{
11,752✔
4820
    m_impl->run(); // Throws
11,752✔
4821
}
10,656✔
4822

4823

4824
void Server::stop() noexcept
2,000✔
4825
{
19,904✔
4826
    m_impl->stop();
19,904✔
4827
}
17,904✔
4828

4829

4830
uint_fast64_t Server::errors_seen() const noexcept
680✔
4831
{
6,120✔
4832
    return m_impl->errors_seen;
6,120✔
4833
}
5,440✔
4834

4835

4836
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4837
                                                      milliseconds_type timeout)
4838
{
×
4839
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4840
}
4841

4842

4843
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4✔
4844
{
36✔
4845
    m_impl->set_connection_reaper_timeout(timeout);
36✔
4846
}
32✔
4847

4848

4849
void Server::close_connections()
20✔
4850
{
180✔
4851
    m_impl->close_connections();
180✔
4852
}
160✔
4853

4854

4855
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
72✔
4856
{
648✔
4857
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
648✔
4858
}
576✔
4859

4860

4861
void Server::recognize_external_change(const std::string& virt_path)
4,800✔
4862
{
43,200✔
4863
    m_impl->recognize_external_change(virt_path); // Throws
43,200✔
4864
}
38,400✔
4865

4866

4867
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4868
{
×
4869
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4870
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc