• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2085

01 Mar 2024 12:26PM UTC coverage: 90.926% (-0.001%) from 90.927%
2085

push

Evergreen

jedelbo
Avoid doing unneeded logger work in Replication

Most of the replication log statements do some work including memory
allocations which are then thrown away if the log level it too high, so always
check the log level first. A few places don't actually benefit from this, but
it's easier to consistently check the log level every time.

93986 of 173116 branches covered (54.29%)

63 of 100 new or added lines in 2 files covered. (63.0%)

114 existing lines in 17 files now uncovered.

238379 of 262169 relevant lines covered (90.93%)

6007877.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.76
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/load_file.hpp>
29
#include <realm/util/memory_stream.hpp>
30
#include <realm/util/optional.hpp>
31
#include <realm/util/platform_info.hpp>
32
#include <realm/util/random.hpp>
33
#include <realm/util/safe_int_ops.hpp>
34
#include <realm/util/scope_exit.hpp>
35
#include <realm/util/scratch_allocator.hpp>
36
#include <realm/util/thread.hpp>
37
#include <realm/util/thread_exec_guard.hpp>
38
#include <realm/util/value_reset_guard.hpp>
39
#include <realm/version.hpp>
40

41
#include <algorithm>
42
#include <atomic>
43
#include <cctype>
44
#include <chrono>
45
#include <cmath>
46
#include <condition_variable>
47
#include <cstdint>
48
#include <cstdio>
49
#include <cstring>
50
#include <functional>
51
#include <locale>
52
#include <map>
53
#include <memory>
54
#include <queue>
55
#include <sstream>
56
#include <stdexcept>
57
#include <thread>
58
#include <vector>
59

60
// NOTE: The protocol specification is in `/doc/protocol.md`
61

62

63
// FIXME: Verify that session identifier spoofing cannot be used to get access
64
// to sessions belonging to other network conections in any way.
65
// FIXME: Seems that server must close connection with zero sessions after a
66
// certain timeout.
67

68

69
using namespace realm;
70
using namespace realm::sync;
71
using namespace realm::util;
72

73
// clang-format off
74
using ServerHistory         = _impl::ServerHistory;
75
using ServerProtocol        = _impl::ServerProtocol;
76
using ServerFileAccessCache = _impl::ServerFileAccessCache;
77
using ServerImplBase = _impl::ServerImplBase;
78

79
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
80
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
81
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
82
using IntegrationResult = ServerHistory::IntegrationResult;
83
using BootstrapError = ServerHistory::BootstrapError;
84
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
85
using ClientType = ServerHistory::ClientType;
86
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
87
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
88

89
using UploadChangeset = ServerProtocol::UploadChangeset;
90
// clang-format on
91

92

93
using UploadChangesets = std::vector<UploadChangeset>;
94

95
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
96

97

98
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
99
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
100
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
101
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
102

103

104
namespace {
105

106
enum class SchedStatus { done = 0, pending, in_progress };
107

108
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
109
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
110
{
×
111
    return "com.mongodb.realm-sync/";
×
112
}
×
113

114
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
115
{
28✔
116
    if (str.size() > cutoff) {
28✔
117
        return "..." + str.substr(str.size() - cutoff);
×
118
    }
×
119
    else {
28✔
120
        return str;
28✔
121
    }
28✔
122
}
28✔
123

124

125
class HttpListHeaderValueParser {
126
public:
127
    HttpListHeaderValueParser(std::string_view string) noexcept
128
        : m_string{string}
129
    {
1,970✔
130
    }
1,970✔
131
    bool next(std::string_view& elem) noexcept
132
    {
21,668✔
133
        while (m_pos < m_string.size()) {
21,668✔
134
            size_type i = m_pos;
19,696✔
135
            size_type j = m_string.find(',', i);
19,696✔
136
            if (j != std::string_view::npos) {
19,696✔
137
                m_pos = j + 1;
17,726✔
138
            }
17,726✔
139
            else {
1,970✔
140
                j = m_string.size();
1,970✔
141
                m_pos = j;
1,970✔
142
            }
1,970✔
143

10,076✔
144
            // Exclude leading and trailing white space
10,076✔
145
            while (i < j && is_http_lws(m_string[i]))
37,426✔
146
                ++i;
17,730✔
147
            while (j > i && is_http_lws(m_string[j - 1]))
19,698✔
148
                --j;
×
149

10,076✔
150
            if (i != j) {
19,696✔
151
                elem = m_string.substr(i, j - i);
19,696✔
152
                return true;
19,696✔
153
            }
19,696✔
154
        }
19,696✔
155
        return false;
12,048✔
156
    }
21,668✔
157

158
private:
159
    using size_type = std::string_view::size_type;
160
    const std::string_view m_string;
161
    size_type m_pos = 0;
162
    static bool is_http_lws(char ch) noexcept
163
    {
57,120✔
164
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
57,122✔
165
    }
57,120✔
166
};
167

168

169
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
170
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
171
using SteadyTimePoint = SteadyClock::time_point;
172

173
SteadyTimePoint steady_clock_now() noexcept
174
{
161,876✔
175
    return SteadyClock::now();
161,876✔
176
}
161,876✔
177

178
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
179
{
38,788✔
180
    auto duration = end_time - start_time;
38,788✔
181
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
38,788✔
182
    return milliseconds_type(millis_duration);
38,788✔
183
}
38,788✔
184

185

186
bool determine_try_again(ProtocolError error_code) noexcept
187
{
100✔
188
    return (error_code == ProtocolError::connection_closed);
100✔
189
}
100✔
190

191

192
class ServerFile;
193
class ServerImpl;
194
class HTTPConnection;
195
class SyncConnection;
196
class Session;
197

198

199
using Formatter = util::ResettableExpandableBufferOutputStream;
200
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
201

202
using ProtocolVersionRange = std::pair<int, int>;
203

204
class MiscBuffers {
205
public:
206
    Formatter formatter;
207
    OutputBuffer download_message;
208

209
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
210
    ProtocolVersionRanges protocol_version_ranges;
211

212
    std::vector<char> compress;
213

214
    MiscBuffers()
215
    {
1,376✔
216
        formatter.imbue(std::locale::classic());
1,376✔
217
        download_message.imbue(std::locale::classic());
1,376✔
218
    }
1,376✔
219
};
220

221

222
struct DownloadCache {
223
    std::unique_ptr<char[]> body;
224
    std::size_t uncompressed_body_size;
225
    std::size_t compressed_body_size;
226
    bool body_is_compressed;
227
    version_type end_version;
228
    DownloadCursor download_progress;
229
    std::uint_fast64_t downloadable_bytes;
230
    std::size_t num_changesets;
231
    std::size_t accum_original_size;
232
    std::size_t accum_compacted_size;
233
};
234

235

236
// An unblocked work unit is comprised of one Work object for each of the files
237
// that contribute work to the work unit, generally one reference file and a
238
// number of partial files.
239
class Work {
240
public:
241
    // In general, primary work is all forms of modifying work, including file
242
    // deletion.
243
    bool has_primary_work = false;
244

245
    // Only for reference files
246
    bool might_produce_new_sync_version = false;
247

248
    bool produced_new_realm_version = false;
249
    bool produced_new_sync_version = false;
250
    bool expired_reference_version = false;
251

252
    // True if, and only if changesets_from_downstream contains at least one
253
    // changeset.
254
    bool have_changesets_from_downstream = false;
255

256
    FileIdentAllocSlots file_ident_alloc_slots;
257
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
258
    IntegratableChangesets changesets_from_downstream;
259

260
    VersionInfo version_info;
261

262
    // Result of integration of changesets from downstream clients
263
    IntegrationResult integration_result;
264

265
    void reset() noexcept
266
    {
38,700✔
267
        has_primary_work = false;
38,700✔
268

19,042✔
269
        might_produce_new_sync_version = false;
38,700✔
270

19,042✔
271
        produced_new_realm_version = false;
38,700✔
272
        produced_new_sync_version = false;
38,700✔
273
        expired_reference_version = false;
38,700✔
274
        have_changesets_from_downstream = false;
38,700✔
275

19,042✔
276
        file_ident_alloc_slots.clear();
38,700✔
277
        changeset_buffers.clear();
38,700✔
278
        changesets_from_downstream.clear();
38,700✔
279

19,042✔
280
        version_info = {};
38,700✔
281
        integration_result = {};
38,700✔
282
    }
38,700✔
283
};
284

285

286
class WorkerState {
287
public:
288
    FileIdentAllocSlots file_ident_alloc_slots;
289
    util::ScratchMemory scratch_memory;
290
    bool use_file_cache = true;
291
    std::unique_ptr<ServerHistory> reference_hist;
292
    DBRef reference_sg;
293
};
294

295

296
// ============================ SessionQueue ============================
297

298
class SessionQueue {
299
public:
300
    void push_back(Session*) noexcept;
301
    Session* pop_front() noexcept;
302
    void clear() noexcept;
303

304
private:
305
    Session* m_back = nullptr;
306
};
307

308

309
// ============================ FileIdentReceiver ============================
310

311
class FileIdentReceiver {
312
public:
313
    virtual void receive_file_ident(SaltedFileIdent) = 0;
314

315
protected:
316
    ~FileIdentReceiver() {}
6,976✔
317
};
318

319

320
// ============================ WorkerBox =============================
321

322
class WorkerBox {
323
public:
324
    using JobType = util::UniqueFunction<void(WorkerState&)>;
325
    void add_work(WorkerState& state, JobType job)
326
    {
×
327
        std::unique_lock<std::mutex> lock(m_mutex);
×
328
        if (m_jobs.size() >= m_queue_limit) {
×
329
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
330
            // than to queue it.
×
331
            run_a_job(lock, state, job);
×
332
        }
×
333
        else {
×
334
            // Create worker threads on demand (if all existing threads are active):
×
335
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
336
                m_threads.emplace_back([this]() {
×
337
                    WorkerState state;
×
338
                    state.use_file_cache = false;
×
339
                    JobType the_job;
×
340
                    std::unique_lock<std::mutex> lock(m_mutex);
×
341
                    for (;;) {
×
342
                        while (m_jobs.empty() && !m_finish_up)
×
343
                            m_changes.wait(lock);
×
344
                        if (m_finish_up)
×
345
                            break; // terminate thread
×
346
                        the_job = std::move(m_jobs.back());
×
347
                        m_jobs.pop_back();
×
348
                        run_a_job(lock, state, the_job);
×
349
                        m_changes.notify_all();
×
350
                    }
×
351
                });
×
352
            }
×
353

×
354
            // Submit the job for execution:
×
355
            m_jobs.emplace_back(std::move(job));
×
356
            m_changes.notify_all();
×
357
        }
×
358
    }
×
359

360
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
361
    // propagation of exceptions.
362
    void wait_completion(WorkerState& state)
363
    {
×
364
        std::unique_lock<std::mutex> lock(m_mutex);
×
365
        while (!m_jobs.empty() || m_active > 0) {
×
366
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
367
                JobType the_job = std::move(m_jobs.back());
×
368
                m_jobs.pop_back();
×
369
                run_a_job(lock, state, the_job);
×
370
            }
×
371
            else {
×
372
                m_changes.wait(lock);
×
373
            }
×
374
        }
×
375
        if (m_epr) {
×
376
            std::rethrow_exception(m_epr);
×
377
        }
×
378
    }
×
379

380
    WorkerBox(unsigned int num_threads)
381
    {
×
382
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
383
        m_max_num_threads = num_threads;
×
384
    }
×
385

386
    ~WorkerBox()
387
    {
×
388
        {
×
389
            std::unique_lock<std::mutex> lock(m_mutex);
×
390
            m_finish_up = true;
×
391
            m_changes.notify_all();
×
392
        }
×
393
        for (auto& e : m_threads)
×
394
            e.join();
×
395
    }
×
396

397
private:
398
    std::mutex m_mutex;
399
    std::condition_variable m_changes;
400
    std::vector<std::thread> m_threads;
401
    std::vector<JobType> m_jobs;
402
    unsigned int m_active = 0;
403
    bool m_finish_up = false;
404
    unsigned int m_queue_limit = 0;
405
    unsigned int m_max_num_threads = 0;
406
    std::exception_ptr m_epr;
407

408
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
409
    {
×
410
        ++m_active;
×
411
        lock.unlock();
×
412
        try {
×
413
            job(state);
×
414
            lock.lock();
×
415
        }
×
416
        catch (...) {
×
417
            lock.lock();
×
418
            if (!m_epr)
×
419
                m_epr = std::current_exception();
×
420
        }
×
421
        --m_active;
×
422
    }
×
423
};
424

425

426
// ============================ ServerFile ============================
427

428
class ServerFile : public util::RefCountBase {
429
public:
430
    util::PrefixLogger logger;
431

432
    // Logger to be used by the worker thread
433
    util::PrefixLogger wlogger;
434

435
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
436
               bool disable_sync_to_disk);
437
    ~ServerFile() noexcept;
438

439
    void initialize();
440
    void activate();
441

442
    ServerImpl& get_server() noexcept
443
    {
41,022✔
444
        return m_server;
41,022✔
445
    }
41,022✔
446

447
    const std::string& get_real_path() const noexcept
448
    {
×
449
        return m_file.realm_path;
×
450
    }
×
451

452
    const std::string& get_virt_path() const noexcept
453
    {
×
454
        return m_file.virt_path;
×
455
    }
×
456

457
    ServerFileAccessCache::File& access()
458
    {
52,562✔
459
        return m_file.access(); // Throws
52,562✔
460
    }
52,562✔
461

462
    ServerFileAccessCache::File& worker_access()
463
    {
38,672✔
464
        return m_worker_file.access(); // Throws
38,672✔
465
    }
38,672✔
466

467
    version_type get_realm_version() const noexcept
468
    {
×
469
        return m_version_info.realm_version;
×
470
    }
×
471

472
    version_type get_sync_version() const noexcept
473
    {
×
474
        return m_version_info.sync_version.version;
×
475
    }
×
476

477
    SaltedVersion get_salted_sync_version() const noexcept
478
    {
106,442✔
479
        return m_version_info.sync_version;
106,442✔
480
    }
106,442✔
481

482
    DownloadCache& get_download_cache() noexcept;
483

484
    void register_client_access(file_ident_type client_file_ident);
485

486
    using file_ident_request_type = std::int_fast64_t;
487

488
    // Initiate a request for a new client file identifier.
489
    //
490
    // Unless the request is cancelled, the identifier will be delivered to the
491
    // receiver by way of an invocation of
492
    // FileIdentReceiver::receive_file_ident().
493
    //
494
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
495
    // until after request_file_ident() has returned (no callback reentrance).
496
    //
497
    // New client file identifiers will be delivered to receivers in the order
498
    // that they were requested.
499
    //
500
    // The returned value is a nonzero integer that can be used to cancel the
501
    // request before the file identifier is delivered using
502
    // cancel_file_ident_request().
503
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
504

505
    // Cancel the specified file identifier request.
506
    //
507
    // It is an error to call this function after the identifier has been
508
    // delivered.
509
    void cancel_file_ident_request(file_ident_request_type) noexcept;
510

511
    void add_unidentified_session(Session*);
512
    void identify_session(Session*, file_ident_type client_file_ident);
513

514
    void remove_unidentified_session(Session*) noexcept;
515
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
516

517
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
518

519
    bool can_add_changesets_from_downstream() const noexcept;
520
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
521
                                        version_type locked_server_version, const UploadChangeset*,
522
                                        std::size_t num_changesets);
523

524
    // bootstrap_client_session calls the function of same name in server_history
525
    // but corrects the upload_progress with information from pending
526
    // integratable changesets. A situation can occur where a client terminates
527
    // a session and starts a new session and re-uploads changesets that are known
528
    // by the ServerFile object but not by the ServerHistory.
529
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
530
                                            SaltedVersion server_version, ClientType client_type,
531
                                            UploadCursor& upload_progress, version_type& locked_server_version,
532
                                            Logger&);
533

534
    // NOTE: This function is executed by the worker thread
535
    void worker_process_work_unit(WorkerState&);
536

537
    void recognize_external_change();
538

539
private:
540
    ServerImpl& m_server;
541
    ServerFileAccessCache::Slot m_file;
542

543
    // In general, `m_version_info` refers to the last snapshot of the Realm
544
    // file that is supposed to be visible to remote peers engaging in regular
545
    // Realm file synchronization.
546
    VersionInfo m_version_info;
547

548
    file_ident_request_type m_last_file_ident_request = 0;
549

550
    // The set of sessions whose client file identifier is not yet known, i.e.,
551
    // those for which an IDENT message has not yet been received,
552
    std::set<Session*> m_unidentified_sessions;
553

554
    // A map of the sessions whose client file identifier is known, i.e, those
555
    // for which an IDENT message has been received.
556
    std::map<file_ident_type, Session*> m_identified_sessions;
557

558
    // Used when a file used as partial view wants to allocate a client file
559
    // identifier from the reference Realm.
560
    file_ident_request_type m_file_ident_request = 0;
561

562
    struct FileIdentRequestInfo {
563
        FileIdentReceiver* receiver;
564
        file_ident_type proxy_file;
565
        ClientType client_type;
566
    };
567

568
    // When nonempty, it counts towards outstanding blocked work (see
569
    // `m_has_blocked_work`).
570
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
571

572
    // Changesets received from the downstream clients, and waiting to be
573
    // integrated, as well as information about the clients progress in terms of
574
    // integrating changesets received from the server. When nonempty, it counts
575
    // towards outstanding blocked work (see `m_has_blocked_work`).
576
    //
577
    // At any given time, the set of changesets from a particular client-side
578
    // file may be comprised of changesets received via distinct sessions.
579
    //
580
    // See also `m_num_changesets_from_downstream`.
581
    IntegratableChangesets m_changesets_from_downstream;
582

583
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
584
    //
585
    // Its purpose is also to initialize
586
    // `Work::have_changesets_from_downstream`.
587
    std::size_t m_num_changesets_from_downstream = 0;
588

589
    // The total size, in bytes, of the changesets that were received from
590
    // clients, are targeting this file, and are currently part of the blocked
591
    // work unit.
592
    //
593
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
594
    // purpose is to allow the server to keep track of the accumulated size of
595
    // changesets being processed, or waiting to be processed (metric
596
    // `upload.pending.bytes`) (see
597
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
598
    //
599
    // Its purpose is also to enable the "very poor man's" backpressure solution
600
    // (see can_add_changesets_from_downstream()).
601
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
602

603
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
604
    // currently unblocked work unit.
605
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
606

607
    // When nonempty, it counts towards outstanding blocked work (see
608
    // `m_has_blocked_work`).
609
    std::vector<std::string> m_permission_changes;
610

611
    // True iff this file, or any of its associated partial files (when
612
    // applicable), has a nonzero amount of outstanding work that is currently
613
    // held back from being passed to the worker thread because a previously
614
    // accumulated chunk of work related to this file is currently in progress.
615
    bool m_has_blocked_work = false;
616

617
    // A file, that is not a partial file, is considered *exposed to the worker
618
    // thread* from the point in time where it is submitted to the worker
619
    // (Worker::enqueue()) and up until the point in time where
620
    // group_postprocess_stage_1() starts to execute. A partial file is
621
    // considered *exposed to the worker thread* precisely when the associated
622
    // reference file is exposed to the worker thread, but only if it was in
623
    // `m_reference_file->m_work.partial_files` at the point in time where the
624
    // reference file was passed to the worker.
625
    //
626
    // While this file is exposed to the worker thread, all members of `m_work`
627
    // other than `changesets_from_downstream` may be accessed and modified by
628
    // the worker thread only.
629
    //
630
    // While this file is exposed to the worker thread,
631
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
632
    // must not be modified by any thread. This special status of
633
    // `m_work.changesets_from_downstream` is required to allow
634
    // ServerFile::bootstrap_client_session() to read from it at any time.
635
    Work m_work;
636

637
    // For reference files, set to true when work is unblocked, and reset back
638
    // to false when the work finalization process completes
639
    // (group_postprocess_stage_3()). Always zero for partial files.
640
    bool m_has_work_in_progress = 0;
641

642
    // This one must only be accessed by the worker thread.
643
    //
644
    // More specifically, `m_worker_file.access()` must only be called by the
645
    // worker thread, and if it was ever called, it must be closed by the worker
646
    // thread before the ServerFile object is destroyed, if destruction happens
647
    // before the destruction of the server object itself.
648
    ServerFileAccessCache::Slot m_worker_file;
649

650
    std::vector<std::int_fast64_t> m_deleting_connections;
651

652
    DownloadCache m_download_cache;
653

654
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
655
    void on_work_added();
656
    void group_unblock_work();
657
    void unblock_work();
658

659
    /// Resume history scanning in all sessions bound to this file. To be called
660
    /// after a successfull integration of a changeset.
661
    void resume_download() noexcept;
662

663
    // NOTE: These functions are executed by the worker thread
664
    void worker_allocate_file_identifiers();
665
    bool worker_integrate_changes_from_downstream(WorkerState&);
666
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
667
                                           DBRef& sg_ptr);
668
    ServerHistory& get_reference_file_history(WorkerState& state);
669
    void group_postprocess_stage_1();
670
    void group_postprocess_stage_2();
671
    void group_postprocess_stage_3();
672
    void group_finalize_work_stage_1();
673
    void group_finalize_work_stage_2();
674
    void finalize_work_stage_1();
675
    void finalize_work_stage_2();
676
};
677

678

679
inline DownloadCache& ServerFile::get_download_cache() noexcept
680
{
40,594✔
681
    return m_download_cache;
40,594✔
682
}
40,594✔
683

684
inline void ServerFile::group_finalize_work_stage_1()
685
{
38,322✔
686
    finalize_work_stage_1(); // Throws
38,322✔
687
}
38,322✔
688

689
inline void ServerFile::group_finalize_work_stage_2()
690
{
38,322✔
691
    finalize_work_stage_2(); // Throws
38,322✔
692
}
38,322✔
693

694

695
// ============================ Worker ============================
696

697
// All write transaction on server-side Realm files performed on behalf of the
698
// server, must be performed by the worker thread, not the network event loop
699
// thread. This is to ensure that the network event loop thread never gets
700
// blocked waiting for the worker thread to end a long running write
701
// transaction.
702
//
703
// FIXME: Currently, the event loop thread does perform a number of write
704
// transactions, but only on subtier nodes of a star topology server cluster.
705
class Worker : public ServerHistory::Context {
706
public:
707
    std::shared_ptr<util::Logger> logger_ptr;
708
    util::Logger& logger;
709

710
    explicit Worker(ServerImpl&);
711

712
    ServerFileAccessCache& get_file_access_cache() noexcept;
713

714
    void enqueue(ServerFile*);
715

716
    // Overriding members of ServerHistory::Context
717
    std::mt19937_64& server_history_get_random() noexcept override final;
718

719
private:
720
    ServerImpl& m_server;
721
    std::mt19937_64 m_random;
722
    ServerFileAccessCache m_file_access_cache;
723

724
    util::Mutex m_mutex;
725
    util::CondVar m_cond; // Protected by `m_mutex`
726

727
    bool m_stop = false; // Protected by `m_mutex`
728

729
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
730

731
    WorkerState m_state;
732

733
    void run();
734
    void stop() noexcept;
735

736
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
737
};
738

739

740
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
741
{
1,054✔
742
    return m_file_access_cache;
1,054✔
743
}
1,054✔
744

745

746
// ============================ ServerImpl ============================
747

748
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
749
public:
750
    std::uint_fast64_t errors_seen = 0;
751

752
    std::atomic<milliseconds_type> m_par_time;
753
    std::atomic<milliseconds_type> m_seq_time;
754

755
    util::Mutex last_client_accesses_mutex;
756

757
    const std::shared_ptr<util::Logger> logger_ptr;
758
    util::Logger& logger;
759

760
    network::Service& get_service() noexcept
761
    {
53,094✔
762
        return m_service;
53,094✔
763
    }
53,094✔
764

765
    const network::Service& get_service() const noexcept
766
    {
×
767
        return m_service;
×
768
    }
×
769

770
    std::mt19937_64& get_random() noexcept
771
    {
69,424✔
772
        return m_random;
69,424✔
773
    }
69,424✔
774

775
    const Server::Config& get_config() const noexcept
776
    {
115,412✔
777
        return m_config;
115,412✔
778
    }
115,412✔
779

780
    std::size_t get_max_upload_backlog() const noexcept
781
    {
46,850✔
782
        return m_max_upload_backlog;
46,850✔
783
    }
46,850✔
784

785
    const std::string& get_root_dir() const noexcept
786
    {
6,976✔
787
        return m_root_dir;
6,976✔
788
    }
6,976✔
789

790
    network::ssl::Context& get_ssl_context() noexcept
791
    {
48✔
792
        return *m_ssl_context;
48✔
793
    }
48✔
794

795
    const AccessControl& get_access_control() const noexcept
796
    {
×
797
        return m_access_control;
×
798
    }
×
799

800
    ProtocolVersionRange get_protocol_version_range() const noexcept
801
    {
1,970✔
802
        return m_protocol_version_range;
1,970✔
803
    }
1,970✔
804

805
    ServerProtocol& get_server_protocol() noexcept
806
    {
146,828✔
807
        return m_server_protocol;
146,828✔
808
    }
146,828✔
809

810
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
811
    {
4,290✔
812
        return m_compress_memory_arena;
4,290✔
813
    }
4,290✔
814

815
    MiscBuffers& get_misc_buffers() noexcept
816
    {
46,852✔
817
        return m_misc_buffers;
46,852✔
818
    }
46,852✔
819

820
    int_fast64_t get_current_server_session_ident() const noexcept
821
    {
×
822
        return m_current_server_session_ident;
×
823
    }
×
824

825
    util::ScratchMemory& get_scratch_memory() noexcept
826
    {
×
827
        return m_scratch_memory;
×
828
    }
×
829

830
    Worker& get_worker() noexcept
831
    {
40,802✔
832
        return m_worker;
40,802✔
833
    }
40,802✔
834

835
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
836
    {
×
837
        parallel_section = m_par_time;
×
838
        sequential_section = m_seq_time;
×
839
    }
×
840

841
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
842
    ~ServerImpl() noexcept;
843

844
    void start();
845

846
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
847
    {
668✔
848
        m_config.listen_address = listen_address;
668✔
849
        m_config.listen_port = listen_port;
668✔
850
        m_config.reuse_address = reuse_address;
668✔
851

336✔
852
        start(); // Throws
668✔
853
    }
668✔
854

855
    network::Endpoint listen_endpoint() const
856
    {
1,382✔
857
        return m_acceptor.local_endpoint();
1,382✔
858
    }
1,382✔
859

860
    void run();
861
    void stop() noexcept;
862

863
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
864

865
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
866
    void remove_sync_connection(int_fast64_t connection_id);
867

868
    size_t get_number_of_http_connections()
869
    {
×
870
        return m_http_connections.size();
×
871
    }
×
872

873
    size_t get_number_of_sync_connections()
874
    {
×
875
        return m_sync_connections.size();
×
876
    }
×
877

878
    bool is_sync_stopped()
879
    {
40,672✔
880
        return m_sync_stopped;
40,672✔
881
    }
40,672✔
882

883
    const std::set<std::string>& get_realm_names() const noexcept
884
    {
×
885
        return m_realm_names;
×
886
    }
×
887

888
    // virt_path must be valid when get_or_create_file() is called.
889
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
890
    {
6,948✔
891
        util::bind_ptr<ServerFile> file = get_file(virt_path);
6,948✔
892
        if (REALM_LIKELY(file))
6,948✔
893
            return file;
6,314✔
894

422✔
895
        _impl::VirtualPathComponents virt_path_components =
1,056✔
896
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,056✔
897
        REALM_ASSERT(virt_path_components.is_valid);
1,056✔
898

422✔
899
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,056✔
900
        m_realm_names.insert(virt_path);         // Throws
1,056✔
901
        {
1,056✔
902
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,056✔
903
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,056✔
904
                                      disable_sync_to_disk)); // Throws
1,056✔
905
        }
1,056✔
906

422✔
907
        file->initialize();
1,056✔
908
        m_files[virt_path] = file; // Throws
1,056✔
909
        file->activate();          // Throws
1,056✔
910
        return file;
1,056✔
911
    }
1,056✔
912

913
    std::unique_ptr<ServerHistory> make_history_for_path()
914
    {
×
915
        return std::make_unique<ServerHistory>(*this);
×
916
    }
×
917

918
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
919
    {
6,946✔
920
        auto i = m_files.find(virt_path);
6,946✔
921
        if (REALM_LIKELY(i != m_files.end()))
6,946✔
922
            return i->second;
6,312✔
923
        return {};
1,054✔
924
    }
1,054✔
925

926
    // Returns the number of seconds since the Epoch of
927
    // std::chrono::system_clock.
928
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
929
    {
×
930
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
931
            return m_config.token_expiration_clock->now();
×
932
        return std::chrono::system_clock::now();
×
933
    }
×
934

935
    void set_connection_reaper_timeout(milliseconds_type);
936

937
    void close_connections();
938
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
939

940
    void recognize_external_change(const std::string& virt_path);
941

942
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
943
                                                  milliseconds_type timeout);
944

945
    // Server global outputbuffers that can be reused.
946
    // The server is single threaded, so there are no
947
    // synchronization issues.
948
    // output_buffers_count is equal to the
949
    // maximum number of buffers needed at any point.
950
    static constexpr int output_buffers_count = 1;
951
    OutputBuffer output_buffers[output_buffers_count];
952

953
    bool is_load_balancing_allowed() const
954
    {
×
955
        return m_allow_load_balancing;
×
956
    }
×
957

958
    // inc_byte_size_for_pending_downstream_changesets() must be called by
959
    // ServerFile objects when changesets from downstream clients have been
960
    // received.
961
    //
962
    // dec_byte_size_for_pending_downstream_changesets() must be called by
963
    // ServerFile objects when changesets from downstream clients have been
964
    // processed or discarded.
965
    //
966
    // ServerImpl uses this information to keep a running tally (metric
967
    // `upload.pending.bytes`) of the total byte size of pending changesets from
968
    // downstream clients.
969
    //
970
    // These functions must be called on the network thread.
971
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
972
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
973

974
    // Overriding member functions in _impl::ServerHistory::Context
975
    std::mt19937_64& server_history_get_random() noexcept override final;
976

977
private:
978
    Server::Config m_config;
979
    network::Service m_service;
980
    std::mt19937_64 m_random;
981
    const std::size_t m_max_upload_backlog;
982
    const std::string m_root_dir;
983
    const AccessControl m_access_control;
984
    const ProtocolVersionRange m_protocol_version_range;
985

986
    // The reserved files will be closed in situations where the server
987
    // runs out of file descriptors.
988
    std::unique_ptr<File> m_reserved_files[5];
989

990
    // The set of all Realm files known to this server, represented by their
991
    // virtual path.
992
    //
993
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
994
    // reported by an invocation of _impl::get_realm_names()), then the
995
    // corresponding virtual path is in `m_realm_names`, assuming no external
996
    // file-system level intervention.
997
    std::set<std::string> m_realm_names;
998

999
    std::unique_ptr<network::ssl::Context> m_ssl_context;
1000
    ServerFileAccessCache m_file_access_cache;
1001
    Worker m_worker;
1002
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1003
    network::Acceptor m_acceptor;
1004
    std::int_fast64_t m_next_conn_id = 0;
1005
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1006
    network::Endpoint m_next_http_conn_endpoint;
1007
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1008
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1009
    ServerProtocol m_server_protocol;
1010
    compression::CompressMemoryArena m_compress_memory_arena;
1011
    MiscBuffers m_misc_buffers;
1012
    int_fast64_t m_current_server_session_ident;
1013
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1014
    bool m_allow_load_balancing = false;
1015

1016
    util::Mutex m_mutex;
1017

1018
    bool m_stopped = false; // Protected by `m_mutex`
1019

1020
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1021
    // When m_sync_stopped is true, the server does not perform any sync.
1022
    bool m_sync_stopped = false;
1023

1024
    std::atomic<bool> m_running{false}; // Debugging facility
1025

1026
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1027

1028
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1029

1030
    util::ScratchMemory m_scratch_memory;
1031

1032
    void listen();
1033
    void initiate_accept();
1034
    void handle_accept(std::error_code);
1035

1036
    void reap_connections();
1037
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1038
    void do_close_connections();
1039

1040
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1041
    {
1,376✔
1042
        if (config.max_upload_backlog == 0)
1,376✔
1043
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
1,376✔
1044
        return config.max_upload_backlog;
×
1045
    }
×
1046

1047
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1048
    {
1,376✔
1049
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
1,376✔
1050
        const int actual_max = get_current_protocol_version();
1,376✔
1051
        static_assert(actual_min <= actual_max, "");
1,376✔
1052
        int min = actual_min;
1,376✔
1053
        int max = actual_max;
1,376✔
1054
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
1,376!
1055
            if (config.max_protocol_version < min)
×
1056
                throw Server::NoSupportedProtocolVersions();
×
1057
            max = config.max_protocol_version;
×
1058
        }
×
1059
        return {min, max};
1,376✔
1060
    }
1,376✔
1061

1062
    void do_recognize_external_change(const std::string& virt_path);
1063

1064
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1065
                                                     milliseconds_type timeout);
1066
};
1067

1068
// ============================ SyncConnection ============================
1069

1070
class SyncConnection : public websocket::Config {
1071
public:
1072
    const std::shared_ptr<util::Logger> logger_ptr;
1073
    util::Logger& logger;
1074

1075
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1076
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1077
    // Clients with sync protocol version less than 10 do not support log messages
1078
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1079

1080
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1081
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1082
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1083
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1084
        : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1085
                                                          serv.logger_ptr)} // Throws
1086
        , logger{*logger_ptr}
1087
        , m_server{serv}
1088
        , m_id{id}
1089
        , m_socket{std::move(socket)}
1090
        , m_ssl_stream{std::move(ssl_stream)}
1091
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
1092
        , m_websocket{*this}
1093
        , m_client_protocol_version{client_protocol_version}
1094
        , m_client_user_agent{std::move(client_user_agent)}
1095
        , m_remote_endpoint{std::move(remote_endpoint)}
1096
        , m_appservices_request_id{std::move(appservices_request_id)}
1097
    {
1,970✔
1098
        // Make the output buffer stream throw std::bad_alloc if it fails to
1,008✔
1099
        // expand the buffer
1,008✔
1100
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
1,970✔
1101

1,008✔
1102
        network::Service& service = m_server.get_service();
1,970✔
1103
        auto handler = [this](Status status) {
100,630✔
1104
            if (!status.is_ok())
100,630✔
1105
                return;
×
1106
            if (!m_is_sending)
100,630✔
1107
                send_next_message(); // Throws
43,686✔
1108
        };
100,630✔
1109
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
1,970✔
1110
    }
1,970✔
1111

1112
    ~SyncConnection() noexcept;
1113

1114
    ServerImpl& get_server() noexcept
1115
    {
119,526✔
1116
        return m_server;
119,526✔
1117
    }
119,526✔
1118

1119
    ServerProtocol& get_server_protocol() noexcept
1120
    {
146,828✔
1121
        return m_server.get_server_protocol();
146,828✔
1122
    }
146,828✔
1123

1124
    int get_client_protocol_version()
1125
    {
109,936✔
1126
        return m_client_protocol_version;
109,936✔
1127
    }
109,936✔
1128

1129
    const std::string& get_client_user_agent() const noexcept
1130
    {
6,946✔
1131
        return m_client_user_agent;
6,946✔
1132
    }
6,946✔
1133

1134
    const std::string& get_remote_endpoint() const noexcept
1135
    {
6,946✔
1136
        return m_remote_endpoint;
6,946✔
1137
    }
6,946✔
1138

1139
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1140
    {
1,970✔
1141
        return logger_ptr;
1,970✔
1142
    }
1,970✔
1143

1144
    std::mt19937_64& websocket_get_random() noexcept final override
1145
    {
68,372✔
1146
        return m_server.get_random();
68,372✔
1147
    }
68,372✔
1148

1149
    bool websocket_binary_message_received(const char* data, size_t size) final override
1150
    {
78,946✔
1151
        using sf = _impl::SimulatedFailure;
78,946✔
1152
        if (sf::check_trigger(sf::sync_server__read_head)) {
78,946✔
1153
            // Suicide
246✔
1154
            read_error(sf::sync_server__read_head);
462✔
1155
            return false;
462✔
1156
        }
462✔
1157
        // After a connection level error has occurred, all incoming messages
35,390✔
1158
        // will be ignored. By continuing to read until end of input, the server
35,390✔
1159
        // is able to know when the client closes the connection, which in
35,390✔
1160
        // general means that is has received the ERROR message.
35,390✔
1161
        if (REALM_LIKELY(!m_is_closing)) {
78,484✔
1162
            m_last_activity_at = steady_clock_now();
78,468✔
1163
            handle_message_received(data, size);
78,468✔
1164
        }
78,468✔
1165
        return true;
78,484✔
1166
    }
78,484✔
1167

1168
    bool websocket_ping_message_received(const char* data, size_t size) final override
1169
    {
×
1170
        if (REALM_LIKELY(!m_is_closing)) {
×
1171
            m_last_activity_at = steady_clock_now();
×
1172
            handle_ping_received(data, size);
×
1173
        }
×
1174
        return true;
×
1175
    }
×
1176

1177
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1178
    {
68,368✔
1179
        if (m_ssl_stream) {
68,368✔
1180
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1181
        }
70✔
1182
        else {
68,298✔
1183
            m_socket->async_write(data, size, std::move(handler)); // Throws
68,298✔
1184
        }
68,298✔
1185
    }
68,368✔
1186

1187
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1188
    {
238,330✔
1189
        if (m_ssl_stream) {
238,330✔
1190
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
164✔
1191
        }
164✔
1192
        else {
238,166✔
1193
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
238,166✔
1194
        }
238,166✔
1195
    }
238,330✔
1196

1197
    void async_read_until(char* buffer, size_t size, char delim,
1198
                          websocket::ReadCompletionHandler handler) final override
1199
    {
×
1200
        if (m_ssl_stream) {
×
1201
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1202
                                           std::move(handler)); // Throws
×
1203
        }
×
1204
        else {
×
1205
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1206
                                       std::move(handler)); // Throws
×
1207
        }
×
1208
    }
×
1209

1210
    void websocket_read_error_handler(std::error_code ec) final override
1211
    {
682✔
1212
        read_error(ec);
682✔
1213
    }
682✔
1214

1215
    void websocket_write_error_handler(std::error_code ec) final override
1216
    {
×
1217
        write_error(ec);
×
1218
    }
×
1219

1220
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
1221
                                           const std::string_view*) final override
1222
    {
×
1223
        // WebSocket class has already logged a message for this error
1224
        close_due_to_error(ec); // Throws
×
1225
    }
×
1226

1227
    void websocket_protocol_error_handler(std::error_code ec) final override
1228
    {
×
1229
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1230
        close_due_to_error(ec);                                              // Throws
×
1231
    }
×
1232

1233
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1234
    {
×
1235
        // This is not called since we handle HTTP request in handle_request_for_sync()
1236
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
1237
    }
×
1238

1239
    int_fast64_t get_id() const noexcept
1240
    {
×
1241
        return m_id;
×
1242
    }
×
1243

1244
    network::Socket& get_socket() noexcept
1245
    {
×
1246
        return *m_socket;
×
1247
    }
×
1248

1249
    void initiate();
1250

1251
    // Commits suicide
1252
    template <class... Params>
1253
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1254

1255
    // Commits suicide
1256
    void terminate_if_dead(SteadyTimePoint now);
1257

1258
    void enlist_to_send(Session*) noexcept;
1259

1260
    // Sessions should get the output_buffer and insert a message, after which
1261
    // they call initiate_write_output_buffer().
1262
    OutputBuffer& get_output_buffer()
1263
    {
68,372✔
1264
        m_output_buffer.reset();
68,372✔
1265
        return m_output_buffer;
68,372✔
1266
    }
68,372✔
1267

1268
    // More advanced memory strategies can be implemented if needed.
1269
    void release_output_buffer() {}
68,124✔
1270

1271
    // When this function is called, the connection will initiate a write with
1272
    // its output_buffer. Sessions use this method.
1273
    void initiate_write_output_buffer();
1274

1275
    void initiate_pong_output_buffer();
1276

1277
    void handle_protocol_error(Status status);
1278

1279
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1280
                              bool need_client_file_ident, bool is_subserver);
1281

1282
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1283
                               salt_type client_file_ident_salt, version_type scan_server_version,
1284
                               version_type scan_client_version, version_type latest_server_version,
1285
                               salt_type latest_server_version_salt);
1286

1287
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1288
                                version_type progress_server_version, version_type locked_server_version,
1289
                                const UploadChangesets&);
1290

1291
    void receive_mark_message(session_ident_type, request_ident_type);
1292

1293
    void receive_unbind_message(session_ident_type);
1294

1295
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1296

1297
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1298

1299
    void protocol_error(ProtocolError, Session* = nullptr);
1300

1301
    void initiate_soft_close();
1302

1303
    void discard_session(session_ident_type) noexcept;
1304

1305
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1306
                          std::optional<std::string> co_id = std::nullopt);
1307

1308
private:
1309
    ServerImpl& m_server;
1310
    const int_fast64_t m_id;
1311
    std::unique_ptr<network::Socket> m_socket;
1312
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1313
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1314

1315
    websocket::Socket m_websocket;
1316
    std::unique_ptr<char[]> m_input_body_buffer;
1317
    OutputBuffer m_output_buffer;
1318
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1319

1320
    // The protocol version in use by the connected client.
1321
    const int m_client_protocol_version;
1322

1323
    // The user agent description passed by the client.
1324
    const std::string m_client_user_agent;
1325

1326
    const std::string m_remote_endpoint;
1327

1328
    const std::string m_appservices_request_id;
1329

1330
    // A queue of sessions that have enlisted for an opportunity to send a
1331
    // message. Sessions will be served in the order that they enlist. A session
1332
    // can only occur once in this queue (linked list). If the queue is not
1333
    // empty, and no message is currently being written to the socket, the first
1334
    // session is taken out of the queue, and then granted an opportunity to
1335
    // send a message.
1336
    //
1337
    // Sessions will never be destroyed while in this queue. This is ensured
1338
    // because the connection owns the sessions that are associated with it, and
1339
    // the connection only removes a session from m_sessions at points in time
1340
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1341
    // (Connection::send_next_message() and Connection::~Connection()).
1342
    SessionQueue m_sessions_enlisted_to_send;
1343

1344
    Session* m_receiving_session = nullptr;
1345

1346
    bool m_is_sending = false;
1347
    bool m_is_closing = false;
1348

1349
    bool m_send_pong = false;
1350
    bool m_sending_pong = false;
1351

1352
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1353

1354
    milliseconds_type m_last_ping_timestamp = 0;
1355

1356
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1357
    // set to true (initiation of soft close). Otherwise, if no messages have
1358
    // been received from the client, this is the time at which the connection
1359
    // object was initiated (completion of WebSocket handshake). Otherwise this
1360
    // is the time at which the last message was received from the client.
1361
    SteadyTimePoint m_last_activity_at;
1362

1363
    // These are initialized by do_initiate_soft_close().
1364
    //
1365
    // With recent versions of the protocol (when the version is greater than,
1366
    // or equal to 23), `m_error_session_ident` is always zero.
1367
    ProtocolError m_error_code = {};
1368
    session_ident_type m_error_session_ident = 0;
1369

1370
    struct LogMessage {
1371
        session_ident_type sess_ident;
1372
        util::Logger::Level level;
1373
        std::string message;
1374
        std::optional<std::string> co_id;
1375
    };
1376

1377
    std::mutex m_log_mutex;
1378
    std::queue<LogMessage> m_log_messages;
1379

1380
    static std::string make_logger_prefix(int_fast64_t id)
1381
    {
1,970✔
1382
        std::ostringstream out;
1,970✔
1383
        out.imbue(std::locale::classic());
1,970✔
1384
        out << "Sync Connection[" << id << "]: "; // Throws
1,970✔
1385
        return out.str();                         // Throws
1,970✔
1386
    }
1,970✔
1387

1388
    // The return value of handle_message_received() designates whether
1389
    // message processing should continue. If the connection object is
1390
    // destroyed during execution of handle_message_received(), the return
1391
    // value must be false.
1392
    void handle_message_received(const char* data, size_t size);
1393

1394
    void handle_ping_received(const char* data, size_t size);
1395

1396
    void send_next_message();
1397
    void send_pong(milliseconds_type timestamp);
1398
    void send_log_message(const LogMessage& log_msg);
1399

1400
    void handle_write_output_buffer();
1401
    void handle_pong_output_buffer();
1402

1403
    void initiate_write_error(ProtocolError, session_ident_type);
1404
    void handle_write_error(std::error_code ec);
1405

1406
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1407
    void read_error(std::error_code);
1408
    void write_error(std::error_code);
1409

1410
    void close_due_to_close_by_client(std::error_code);
1411
    void close_due_to_error(std::error_code);
1412

1413
    void terminate_sessions();
1414

1415
    void bad_session_ident(const char* message_type, session_ident_type);
1416
    void message_after_unbind(const char* message_type, session_ident_type);
1417
    void message_before_ident(const char* message_type, session_ident_type);
1418
};
1419

1420

1421
inline void SyncConnection::read_error(std::error_code ec)
1422
{
1,144✔
1423
    REALM_ASSERT(ec != util::error::operation_aborted);
1,144✔
1424
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,144✔
1425
        // Suicide
404✔
1426
        close_due_to_close_by_client(ec); // Throws
682✔
1427
        return;
682✔
1428
    }
682✔
1429
    if (ec == util::MiscExtErrors::delim_not_found) {
462✔
1430
        logger.error("Input message head delimited not found"); // Throws
×
1431
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1432
        return;
×
1433
    }
×
1434

246✔
1435
    logger.error("Reading failed: %1", ec.message()); // Throws
462✔
1436

246✔
1437
    // Suicide
246✔
1438
    close_due_to_error(ec); // Throws
462✔
1439
}
462✔
1440

1441
inline void SyncConnection::write_error(std::error_code ec)
1442
{
×
1443
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1444
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1445
        // Suicide
1446
        close_due_to_close_by_client(ec); // Throws
×
1447
        return;
×
1448
    }
×
1449
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1450

1451
    // Suicide
1452
    close_due_to_error(ec); // Throws
×
1453
}
×
1454

1455

1456
// ============================ HTTPConnection ============================
1457

1458
std::string g_user_agent = "User-Agent";
1459

1460
class HTTPConnection {
1461
public:
1462
    const std::shared_ptr<Logger> logger_ptr;
1463
    util::Logger& logger;
1464

1465
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1466
        : logger_ptr{std::make_shared<PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1467
                                                    serv.logger_ptr)} // Throws
1468
        , logger{*logger_ptr}
1469
        , m_server{serv}
1470
        , m_id{id}
1471
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1472
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1473
        , m_http_server{*this, logger_ptr}
1474
    {
3,386✔
1475
        // Make the output buffer stream throw std::bad_alloc if it fails to
1,664✔
1476
        // expand the buffer
1,664✔
1477
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
3,386✔
1478

1,664✔
1479
        if (is_ssl) {
3,386✔
1480
            using namespace network::ssl;
48✔
1481
            Context& ssl_context = serv.get_ssl_context();
48✔
1482
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1483
                                                    Stream::server); // Throws
48✔
1484
        }
48✔
1485
    }
3,386✔
1486

1487
    ServerImpl& get_server() noexcept
1488
    {
×
1489
        return m_server;
×
1490
    }
×
1491

1492
    int_fast64_t get_id() const noexcept
1493
    {
2,010✔
1494
        return m_id;
2,010✔
1495
    }
2,010✔
1496

1497
    network::Socket& get_socket() noexcept
1498
    {
5,086✔
1499
        return *m_socket;
5,086✔
1500
    }
5,086✔
1501

1502
    template <class H>
1503
    void async_write(const char* data, size_t size, H handler)
1504
    {
1,990✔
1505
        if (m_ssl_stream) {
1,990✔
1506
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1507
        }
14✔
1508
        else {
1,976✔
1509
            m_socket->async_write(data, size, std::move(handler)); // Throws
1,976✔
1510
        }
1,976✔
1511
    }
1,990✔
1512

1513
    template <class H>
1514
    void async_read(char* buffer, size_t size, H handler)
1515
    {
8✔
1516
        if (m_ssl_stream) {
8!
1517
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1518
                                     std::move(handler)); // Throws
×
1519
        }
×
1520
        else {
8✔
1521
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1522
                                 std::move(handler)); // Throws
8✔
1523
        }
8✔
1524
    }
8✔
1525

1526
    template <class H>
1527
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1528
    {
17,800✔
1529
        if (m_ssl_stream) {
17,800!
1530
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1531
                                           std::move(handler)); // Throws
126✔
1532
        }
126✔
1533
        else {
17,674✔
1534
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
17,674✔
1535
                                       std::move(handler)); // Throws
17,674✔
1536
        }
17,674✔
1537
    }
17,800✔
1538

1539
    void initiate(std::string remote_endpoint)
1540
    {
2,010✔
1541
        m_last_activity_at = steady_clock_now();
2,010✔
1542
        m_remote_endpoint = std::move(remote_endpoint);
2,010✔
1543

1,028✔
1544
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
2,010✔
1545

1,028✔
1546
        if (m_ssl_stream) {
2,010✔
1547
            initiate_ssl_handshake(); // Throws
24✔
1548
        }
24✔
1549
        else {
1,986✔
1550
            initiate_http(); // Throws
1,986✔
1551
        }
1,986✔
1552
    }
2,010✔
1553

1554
    void respond_200_ok()
1555
    {
×
1556
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1557
    }
×
1558

1559
    void respond_404_not_found()
1560
    {
×
1561
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1562
    }
×
1563

1564
    void respond_503_service_unavailable()
1565
    {
×
1566
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1567
    }
×
1568

1569
    // Commits suicide
1570
    template <class... Params>
1571
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1572
    {
38✔
1573
        logger.log(log_level, log_message, log_params...); // Throws
38✔
1574
        m_ssl_stream.reset();
38✔
1575
        m_socket.reset();
38✔
1576
        m_server.remove_http_connection(m_id); // Suicide
38✔
1577
    }
38✔
1578

1579
    // Commits suicide
1580
    void terminate_if_dead(SteadyTimePoint now)
1581
    {
2✔
1582
        milliseconds_type time = steady_duration(m_last_activity_at, now);
2✔
1583
        const Server::Config& config = m_server.get_config();
2✔
1584
        if (m_is_sending) {
2✔
1585
            if (time >= config.http_response_timeout) {
×
1586
                // Suicide
1587
                terminate(Logger::Level::detail,
×
1588
                          "HTTP connection closed (request timeout)"); // Throws
×
1589
            }
×
1590
        }
×
1591
        else {
2✔
1592
            if (time >= config.http_request_timeout) {
2✔
1593
                // Suicide
1594
                terminate(Logger::Level::detail,
×
1595
                          "HTTP connection closed (response timeout)"); // Throws
×
1596
            }
×
1597
        }
2✔
1598
    }
2✔
1599

1600
    std::string get_appservices_request_id() const
1601
    {
1,990✔
1602
        return m_appservices_request_id.to_string();
1,990✔
1603
    }
1,990✔
1604

1605
private:
1606
    ServerImpl& m_server;
1607
    const int_fast64_t m_id;
1608
    const ObjectId m_appservices_request_id = ObjectId::gen();
1609
    std::unique_ptr<network::Socket> m_socket;
1610
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1611
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1612
    HTTPServer<HTTPConnection> m_http_server;
1613
    OutputBuffer m_output_buffer;
1614
    bool m_is_sending = false;
1615
    SteadyTimePoint m_last_activity_at;
1616
    std::string m_remote_endpoint;
1617
    int m_negotiated_protocol_version = 0;
1618

1619
    void initiate_ssl_handshake()
1620
    {
24✔
1621
        auto handler = [this](std::error_code ec) {
22✔
1622
            if (ec != util::error::operation_aborted)
22✔
1623
                handle_ssl_handshake(ec); // Throws
22✔
1624
        };
22✔
1625
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1626
    }
24✔
1627

1628
    void handle_ssl_handshake(std::error_code ec)
1629
    {
22✔
1630
        if (ec) {
22✔
1631
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
8✔
1632
            close_due_to_error(ec);                                         // Throws
8✔
1633
            return;
8✔
1634
        }
8✔
1635
        initiate_http(); // Throws
14✔
1636
    }
14✔
1637

1638
    void initiate_http()
1639
    {
2,000✔
1640
        logger.debug("Connection initiates HTTP receipt");
2,000✔
1641

1,022✔
1642
        auto handler = [this](HTTPRequest request, std::error_code ec) {
2,000✔
1643
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
2,000✔
1644
                return;
1,022✔
1645
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
2,000✔
1646
                logger.error("Malformed HTTP request");
×
1647
                close_due_to_error(ec); // Throws
×
1648
                return;
×
1649
            }
×
1650
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
2,000✔
1651
                logger.error("Bad HTTP request");
8✔
1652
                const char* body = "The HTTP request was corrupted";
8✔
1653
                handle_400_bad_request(body); // Throws
8✔
1654
                return;
8✔
1655
            }
8✔
1656
            if (REALM_UNLIKELY(ec)) {
1,992✔
1657
                read_error(ec); // Throws
10✔
1658
                return;
10✔
1659
            }
10✔
1660
            handle_http_request(std::move(request)); // Throws
1,982✔
1661
        };
1,982✔
1662
        m_http_server.async_receive_request(std::move(handler)); // Throws
2,000✔
1663
    }
2,000✔
1664

1665
    void handle_http_request(const HTTPRequest& request)
1666
    {
1,982✔
1667
        StringData path = request.path;
1,982✔
1668

1,014✔
1669
        logger.debug("HTTP request received, request = %1", request);
1,982✔
1670

1,014✔
1671
        m_is_sending = true;
1,982✔
1672
        m_last_activity_at = steady_clock_now();
1,982✔
1673

1,014✔
1674
        // FIXME: When thinking of this function as a switching device, it seem
1,014✔
1675
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
1,014✔
1676
        // supposed to be mandatory, then that check ought to be delegated to
1,014✔
1677
        // handle_request_for_sync(), as that will yield a sharper separation of
1,014✔
1678
        // concerns.
1,014✔
1679
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
1,982✔
1680
            handle_request_for_sync(request); // Throws
1,970✔
1681
        }
1,970✔
1682
        else {
12✔
1683
            handle_404_not_found(request); // Throws
12✔
1684
        }
12✔
1685
    }
1,982✔
1686

1687
    void handle_request_for_sync(const HTTPRequest& request)
1688
    {
1,970✔
1689
        if (m_server.is_sync_stopped()) {
1,970✔
1690
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1691
                         "stopped"); // Throws
×
1692
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1693
                                                    "connections"); // Throws
×
1694
            return;
×
1695
        }
×
1696

1,008✔
1697
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
1,970✔
1698

1,008✔
1699
        // Figure out whether there are any protocol versions supported by both
1,008✔
1700
        // the client and the server, and if so, choose the newest one of them.
1,008✔
1701
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
1,970✔
1702
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
1,970✔
1703
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
1,970✔
1704
        {
1,970✔
1705
            protocol_version_ranges.clear();
1,970✔
1706
            util::MemoryInputStream in;
1,970✔
1707
            in.imbue(std::locale::classic());
1,970✔
1708
            in.unsetf(std::ios_base::skipws);
1,970✔
1709
            std::string_view value;
1,970✔
1710
            if (sec_websocket_protocol)
1,970✔
1711
                value = *sec_websocket_protocol;
1,970✔
1712
            HttpListHeaderValueParser parser{value};
1,970✔
1713
            std::string_view elem;
1,970✔
1714
            while (parser.next(elem)) {
21,668✔
1715
                // FIXME: Use std::string_view::begins_with() in C++20.
10,076✔
1716
                const StringData protocol{elem};
19,696✔
1717
                std::string_view prefix;
19,696✔
1718
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
19,696✔
1719
                    prefix = get_pbs_websocket_protocol_prefix();
19,698✔
1720
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
2,147,483,647!
1721
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1722
                if (!prefix.empty()) {
19,698✔
1723
                    auto parse_version = [&](std::string_view str) {
19,698✔
1724
                        in.set_buffer(str.data(), str.data() + str.size());
19,698✔
1725
                        int version = 0;
19,698✔
1726
                        in >> version;
19,698✔
1727
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
19,698✔
1728
                            return version;
19,700✔
1729
                        return -1;
2,147,483,647✔
1730
                    };
2,147,483,647✔
1731
                    int min, max;
19,698✔
1732
                    std::string_view range = elem.substr(prefix.size());
19,698✔
1733
                    auto i = range.find('-');
19,698✔
1734
                    if (i != std::string_view::npos) {
19,698✔
1735
                        min = parse_version(range.substr(0, i));
×
1736
                        max = parse_version(range.substr(i + 1));
×
1737
                    }
×
1738
                    else {
19,698✔
1739
                        min = parse_version(range);
19,698✔
1740
                        max = min;
19,698✔
1741
                    }
19,698✔
1742
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
19,700✔
1743
                        protocol_version_ranges.emplace_back(min, max); // Throws
19,700✔
1744
                        continue;
19,700✔
1745
                    }
19,700✔
1746
                    logger.error("Protocol version negotiation failed: Client sent malformed "
2,147,483,647✔
1747
                                 "specification of supported protocol versions: '%1'",
2,147,483,647✔
1748
                                 elem); // Throws
2,147,483,647✔
1749
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
2,147,483,647✔
1750
                                           "specification of supported protocol "
2,147,483,647✔
1751
                                           "versions\n"); // Throws
2,147,483,647✔
1752
                    return;
2,147,483,647✔
1753
                }
2,147,483,647✔
1754
                logger.warn("Unrecognized protocol token in HTTP response header "
2,147,483,647✔
1755
                            "Sec-WebSocket-Protocol: '%1'",
2,147,483,647✔
1756
                            elem); // Throws
2,147,483,647✔
1757
            }
2,147,483,647✔
1758
            if (protocol_version_ranges.empty()) {
1,972✔
1759
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1760
                             "specification of supported protocol versions"); // Throws
×
1761
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1762
                                       "of supported protocol versions\n"); // Throws
×
1763
                return;
×
1764
            }
×
1765
        }
1,972✔
1766
        {
1,972✔
1767
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
1,972✔
1768
            int server_min = server_range.first;
1,972✔
1769
            int server_max = server_range.second;
1,972✔
1770
            int best_match = 0;
1,972✔
1771
            int overall_client_min = std::numeric_limits<int>::max();
1,972✔
1772
            int overall_client_max = std::numeric_limits<int>::min();
1,972✔
1773
            for (const auto& range : protocol_version_ranges) {
19,700✔
1774
                int client_min = range.first;
19,700✔
1775
                int client_max = range.second;
19,700✔
1776
                if (client_max >= server_min && client_min <= server_max) {
19,700✔
1777
                    // Overlap
10,080✔
1778
                    int version = std::min(client_max, server_max);
19,700✔
1779
                    if (version > best_match) {
19,700✔
1780
                        best_match = version;
1,970✔
1781
                    }
1,970✔
1782
                }
19,700✔
1783
                if (client_min < overall_client_min)
19,700✔
1784
                    overall_client_min = client_min;
19,700✔
1785
                if (client_max > overall_client_max)
19,700✔
1786
                    overall_client_max = client_max;
1,970✔
1787
            }
19,700✔
1788
            Formatter& formatter = misc_buffers.formatter;
1,972✔
1789
            if (REALM_UNLIKELY(best_match == 0)) {
1,972✔
1790
                const char* elaboration = "No version supported by both client and server";
×
1791
                const char* identifier_hint = nullptr;
×
1792
                if (overall_client_max < server_min) {
×
1793
                    // Client is too old
1794
                    elaboration = "Client is too old for server";
×
1795
                    identifier_hint = "CLIENT_TOO_OLD";
×
1796
                }
×
1797
                else if (overall_client_min > server_max) {
×
1798
                    // Client is too new
1799
                    elaboration = "Client is too new for server";
×
1800
                    identifier_hint = "CLIENT_TOO_NEW";
×
1801
                }
×
1802
                auto format_ranges = [&](const auto& list) {
×
1803
                    bool nonfirst = false;
×
1804
                    for (auto range : list) {
×
1805
                        if (nonfirst)
×
1806
                            formatter << ", "; // Throws
×
1807
                        int min = range.first, max = range.second;
×
1808
                        REALM_ASSERT(min <= max);
×
1809
                        formatter << min;
×
1810
                        if (max != min)
×
1811
                            formatter << "-" << max;
×
1812
                        nonfirst = true;
×
1813
                    }
×
1814
                };
×
1815
                using Range = ProtocolVersionRange;
×
1816
                formatter.reset();
×
1817
                format_ranges(protocol_version_ranges); // Throws
×
1818
                logger.error("Protocol version negotiation failed: %1 "
×
1819
                             "(client supports: %2)",
×
1820
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1821
                formatter.reset();
×
1822
                formatter << "Protocol version negotiation failed: "
×
1823
                             ""
×
1824
                          << elaboration << ".\n\n";                                   // Throws
×
1825
                formatter << "Server supports: ";                                      // Throws
×
1826
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1827
                formatter << "\n";                                                     // Throws
×
1828
                formatter << "Client supports: ";                                      // Throws
×
1829
                format_ranges(protocol_version_ranges);                                // Throws
×
1830
                formatter << "\n\n";                                                   // Throws
×
1831
                formatter << "REALM_SYNC_PROTOCOL_MISMATCH";                           // Throws
×
1832
                if (identifier_hint)
×
1833
                    formatter << ":" << identifier_hint;                      // Throws
×
1834
                formatter << "\n";                                            // Throws
×
1835
                handle_400_bad_request({formatter.data(), formatter.size()}); // Throws
×
1836
                return;
×
1837
            }
×
1838
            m_negotiated_protocol_version = best_match;
1,972✔
1839
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
1,972✔
1840
                         m_negotiated_protocol_version); // Throws
1,972✔
1841
            formatter.reset();
1,972✔
1842
        }
1,972✔
1843

1,010✔
1844
        std::string sec_websocket_protocol_2;
1,972✔
1845
        {
1,972✔
1846
            std::string_view prefix =
1,972✔
1847
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
1,972✔
1848
                    ? get_old_pbs_websocket_protocol_prefix()
1,010✔
1849
                    : get_pbs_websocket_protocol_prefix();
1,972✔
1850
            std::ostringstream out;
1,972✔
1851
            out.imbue(std::locale::classic());
1,972✔
1852
            out << prefix << m_negotiated_protocol_version; // Throws
1,972✔
1853
            sec_websocket_protocol_2 = std::move(out).str();
1,972✔
1854
        }
1,972✔
1855

1,010✔
1856
        std::error_code ec;
1,972✔
1857
        util::Optional<HTTPResponse> response =
1,972✔
1858
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
1,972✔
1859

1,010✔
1860
        if (ec) {
1,972✔
1861
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1862
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1863
            }
×
1864
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1865
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1866
            }
×
1867
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1868
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1869
            }
×
1870
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1871
                logger.error("The header Sec-WebSocket-Key is missing");
×
1872
            }
×
1873

1874
            logger.error("The HTTP request with the error is:\n%1", request);
×
1875
            logger.error("Check the proxy configuration and make sure that the "
×
1876
                         "HTTP request is a valid Websocket request.");
×
1877
            close_due_to_error(ec);
×
1878
            return;
×
1879
        }
×
1880
        REALM_ASSERT(response);
1,972✔
1881
        add_common_http_response_headers(*response);
1,972✔
1882

1,010✔
1883
        std::string user_agent;
1,972✔
1884
        {
1,972✔
1885
            auto i = request.headers.find(g_user_agent);
1,972✔
1886
            if (i != request.headers.end())
1,972✔
1887
                user_agent = i->second; // Throws (copy)
1,970✔
1888
        }
1,972✔
1889

1,010✔
1890
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
1,972✔
1891
                        this](std::error_code ec) {
1,972✔
1892
            // If the operation is aborted, the socket object may have been destroyed.
1,008✔
1893
            if (ec != util::error::operation_aborted) {
1,970✔
1894
                if (ec) {
1,970✔
1895
                    write_error(ec);
×
1896
                    return;
×
1897
                }
×
1898

1,008✔
1899
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
1,970✔
1900
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
1,970✔
1901
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
1,970✔
1902
                    get_appservices_request_id()); // Throws
1,970✔
1903
                SyncConnection& sync_conn_ref = *sync_conn;
1,970✔
1904
                m_server.add_sync_connection(m_id, std::move(sync_conn));
1,970✔
1905
                m_server.remove_http_connection(m_id);
1,970✔
1906
                sync_conn_ref.initiate();
1,970✔
1907
            }
1,970✔
1908
        };
1,970✔
1909
        m_http_server.async_send_response(*response, std::move(handler));
1,972✔
1910
    }
1,972✔
1911

1912
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1913
    {
20✔
1914
        std::string body_2 = std::string(body); // Throws
20✔
1915

10✔
1916
        HTTPResponse response;
20✔
1917
        response.status = http_status;
20✔
1918
        add_common_http_response_headers(response);
20✔
1919
        response.headers["Connection"] = "close";
20✔
1920

10✔
1921
        if (!body_2.empty()) {
20✔
1922
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1923
            response.body = std::move(body_2);
20✔
1924
        }
20✔
1925

10✔
1926
        auto handler = [this](std::error_code ec) {
20✔
1927
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1928
                return;
10✔
1929
            if (REALM_UNLIKELY(ec)) {
20✔
1930
                write_error(ec);
×
1931
                return;
×
1932
            }
×
1933
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1934
        };
20✔
1935
        m_http_server.async_send_response(response, std::move(handler));
20✔
1936
    }
20✔
1937

1938
    void handle_400_bad_request(std::string_view body)
1939
    {
8✔
1940
        logger.detail("400 Bad Request");
8✔
1941
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1942
    }
8✔
1943

1944
    void handle_404_not_found(const HTTPRequest&)
1945
    {
12✔
1946
        logger.detail("404 Not Found"); // Throws
12✔
1947
        handle_text_response(HTTPStatus::NotFound,
12✔
1948
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1949
    }
12✔
1950

1951
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1952
    {
×
1953
        logger.debug("503 Service Unavailable");                       // Throws
×
1954
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1955
    }
×
1956

1957
    void add_common_http_response_headers(HTTPResponse& response)
1958
    {
1,990✔
1959
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
1,990✔
1960
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
1,990✔
1961
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
10✔
1962
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1963
        }
20✔
1964
    }
1,990✔
1965

1966
    void read_error(std::error_code ec)
1967
    {
10✔
1968
        REALM_ASSERT(ec != util::error::operation_aborted);
10✔
1969
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
10!
1970
            // Suicide
4✔
1971
            close_due_to_close_by_client(ec); // Throws
10✔
1972
            return;
10✔
1973
        }
10✔
1974
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1975
            logger.error("Input message head delimited not found"); // Throws
×
1976
            close_due_to_error(ec);                                 // Throws
×
1977
            return;
×
1978
        }
×
1979

1980
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1981

1982
        // Suicide
1983
        close_due_to_error(ec); // Throws
×
1984
    }
×
1985

1986
    void write_error(std::error_code ec)
1987
    {
×
1988
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1989
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1990
            // Suicide
1991
            close_due_to_close_by_client(ec); // Throws
×
1992
            return;
×
1993
        }
×
1994
        logger.error("Writing failed: %1", ec.message()); // Throws
×
1995

1996
        // Suicide
1997
        close_due_to_error(ec); // Throws
×
1998
    }
×
1999

2000
    void close_due_to_close_by_client(std::error_code ec)
2001
    {
10✔
2002
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
10✔
2003
        // Suicide
4✔
2004
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
10✔
2005
    }
10✔
2006

2007
    void close_due_to_error(std::error_code ec)
2008
    {
8✔
2009
        // Suicide
6✔
2010
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
8✔
2011
                  ec.message()); // Throws
8✔
2012
    }
8✔
2013

2014
    static std::string make_logger_prefix(int_fast64_t id)
2015
    {
3,384✔
2016
        std::ostringstream out;
3,384✔
2017
        out.imbue(std::locale::classic());
3,384✔
2018
        out << "HTTP Connection[" << id << "]: "; // Throws
3,384✔
2019
        return out.str();                         // Throws
3,384✔
2020
    }
3,384✔
2021
};
2022

2023

2024
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2025
public:
2026
    std::size_t num_changesets = 0;
2027
    std::size_t accum_original_size = 0;
2028
    std::size_t accum_compacted_size = 0;
2029

2030
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2031
        : m_protocol{protocol}
2032
        , m_buffer{buffer}
2033
        , m_logger{logger}
2034
    {
40,588✔
2035
    }
40,588✔
2036

2037
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2038
    {
40,724✔
2039
        version_type client_version = entry.remote_version;
40,724✔
2040
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
40,724✔
2041
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
40,724✔
2042
        ++num_changesets;
40,724✔
2043
        accum_original_size += original_size;
40,724✔
2044
        accum_compacted_size += entry.changeset.size();
40,724✔
2045
    }
40,724✔
2046

2047
private:
2048
    ServerProtocol& m_protocol;
2049
    OutputBuffer& m_buffer;
2050
    util::Logger& m_logger;
2051
};
2052

2053

2054
// ============================ Session ============================
2055

2056
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2057
//   Protocol             ent file    IDENT    message   message   Error     message
2058
//   state                identifier  message  received  received  occurred  sent
2059
// ---------------------------------------------------------------------------------
2060
//   AllocatingIdent      yes         yes      no        no        no        no
2061
//   SendIdent            no          yes      no        no        no        no
2062
//   WaitForIdent         no          no       no        no        no        no
2063
//   WaitForUnbind        maybe       no       yes       no        no        no
2064
//   SendError            maybe       maybe    maybe     no        yes       no
2065
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2066
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2067
//
2068
//
2069
//   Condition                      Expression
2070
// ----------------------------------------------------------
2071
//   Need client file identifier    need_client_file_ident()
2072
//   Send IDENT message             must_send_ident_message()
2073
//   IDENT message received         ident_message_received()
2074
//   UNBIND message received        unbind_message_received()
2075
//   Error occurred                 error_occurred()
2076
//   ERROR message sent             m_error_message_sent
2077
//
2078
//
2079
//   Protocol
2080
//   state                Will send              Can receive
2081
// -----------------------------------------------------------------------
2082
//   AllocatingIdent      none                   UNBIND
2083
//   SendIdent            IDENT                  UNBIND
2084
//   WaitForIdent         none                   IDENT, UNBIND
2085
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2086
//                        MARK, ALLOC            ALLOC, UNBIND
2087
//   SendError            ERROR                  any
2088
//   WaitForUnbindErr     none                   any
2089
//   SendUnbound          UNBOUND                none
2090
//
2091
class Session final : private FileIdentReceiver {
2092
public:
2093
    util::PrefixLogger logger;
2094

2095
    Session(SyncConnection& conn, session_ident_type session_ident)
2096
        : logger{util::LogCategory::server, make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2097
        , m_connection{conn}
2098
        , m_session_ident{session_ident}
2099
    {
6,976✔
2100
    }
6,976✔
2101

2102
    ~Session() noexcept
2103
    {
6,976✔
2104
        REALM_ASSERT(!is_enlisted_to_send());
6,976✔
2105
        detach_from_server_file();
6,976✔
2106
    }
6,976✔
2107

2108
    SyncConnection& get_connection() noexcept
2109
    {
40,610✔
2110
        return m_connection;
40,610✔
2111
    }
40,610✔
2112

2113
    const Optional<std::array<char, 64>>& get_encryption_key()
2114
    {
×
2115
        return m_connection.get_server().get_config().encryption_key;
×
2116
    }
×
2117

2118
    session_ident_type get_session_ident() const noexcept
2119
    {
160✔
2120
        return m_session_ident;
160✔
2121
    }
160✔
2122

2123
    ServerProtocol& get_server_protocol() noexcept
2124
    {
60,162✔
2125
        return m_connection.get_server_protocol();
60,162✔
2126
    }
60,162✔
2127

2128
    bool need_client_file_ident() const noexcept
2129
    {
8,656✔
2130
        return (m_file_ident_request != 0);
8,656✔
2131
    }
8,656✔
2132

2133
    bool must_send_ident_message() const noexcept
2134
    {
6,140✔
2135
        return m_send_ident_message;
6,140✔
2136
    }
6,140✔
2137

2138
    bool ident_message_received() const noexcept
2139
    {
357,668✔
2140
        return m_client_file_ident != 0;
357,668✔
2141
    }
357,668✔
2142

2143
    bool unbind_message_received() const noexcept
2144
    {
363,636✔
2145
        return m_unbind_message_received;
363,636✔
2146
    }
363,636✔
2147

2148
    bool error_occurred() const noexcept
2149
    {
351,040✔
2150
        return int(m_error_code) != 0;
351,040✔
2151
    }
351,040✔
2152

2153
    bool relayed_alloc_request_in_progress() const noexcept
2154
    {
×
2155
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2156
    }
×
2157

2158
    // Returns the file identifier (always a nonzero value) of the client side
2159
    // file if ident_message_received() returns true. Otherwise it returns zero.
2160
    file_ident_type get_client_file_ident() const noexcept
2161
    {
×
2162
        return m_client_file_ident;
×
2163
    }
×
2164

2165
    void initiate()
2166
    {
6,976✔
2167
        logger.detail("Session initiated", m_session_ident); // Throws
6,976✔
2168
    }
6,976✔
2169

2170
    void terminate()
2171
    {
6,030✔
2172
        logger.detail("Session terminated", m_session_ident); // Throws
6,030✔
2173
    }
6,030✔
2174

2175
    // Initiate the deactivation process, if it has not been initiated already
2176
    // by the client.
2177
    //
2178
    // IMPORTANT: This function must not be called with protocol versions
2179
    // earlier than 23.
2180
    //
2181
    // The deactivation process will eventually lead to termination of the
2182
    // session.
2183
    //
2184
    // The session will detach itself from the server file when the deactivation
2185
    // process is initiated, regardless of whether it is initiated by the
2186
    // client, or by calling this function.
2187
    void initiate_deactivation(ProtocolError error_code)
2188
    {
80✔
2189
        REALM_ASSERT(is_session_level_error(error_code));
80✔
2190
        REALM_ASSERT(!error_occurred()); // Must only be called once
80✔
2191

38✔
2192
        // If the UNBIND message has been received, then the client has
38✔
2193
        // initiated the deactivation process already.
38✔
2194
        if (REALM_LIKELY(!unbind_message_received())) {
80✔
2195
            detach_from_server_file();
80✔
2196
            m_error_code = error_code;
80✔
2197
            // Protocol state is now SendError
38✔
2198
            ensure_enlisted_to_send();
80✔
2199
            return;
80✔
2200
        }
80✔
2201
        // Protocol state was SendUnbound, and remains unchanged
38✔
2202
    }
80✔
2203

2204
    bool is_enlisted_to_send() const noexcept
2205
    {
279,726✔
2206
        return m_next != nullptr;
279,726✔
2207
    }
279,726✔
2208

2209
    void ensure_enlisted_to_send() noexcept
2210
    {
54,158✔
2211
        if (!is_enlisted_to_send())
54,158✔
2212
            enlist_to_send();
51,426✔
2213
    }
54,158✔
2214

2215
    void enlist_to_send() noexcept
2216
    {
112,170✔
2217
        m_connection.enlist_to_send(this);
112,170✔
2218
    }
112,170✔
2219

2220
    // Overriding memeber function in FileIdentReceiver
2221
    void receive_file_ident(SaltedFileIdent file_ident) override final
2222
    {
1,258✔
2223
        // Protocol state must be AllocatingIdent or WaitForUnbind
584✔
2224
        if (!ident_message_received()) {
1,258✔
2225
            REALM_ASSERT(need_client_file_ident());
1,258✔
2226
            REALM_ASSERT(m_send_ident_message);
1,258✔
2227
        }
1,258✔
2228
        else {
×
2229
            REALM_ASSERT(!m_send_ident_message);
×
2230
        }
×
2231
        REALM_ASSERT(!unbind_message_received());
1,258✔
2232
        REALM_ASSERT(!error_occurred());
1,258✔
2233
        REALM_ASSERT(!m_error_message_sent);
1,258✔
2234

584✔
2235
        m_file_ident_request = 0;
1,258✔
2236
        m_allocated_file_ident = file_ident;
1,258✔
2237

584✔
2238
        // If the protocol state was AllocatingIdent, it is now SendIdent,
584✔
2239
        // otherwise it continues to be WaitForUnbind.
584✔
2240

584✔
2241
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,258✔
2242
                     file_ident.salt); // Throws
1,258✔
2243

584✔
2244
        ensure_enlisted_to_send();
1,258✔
2245
    }
1,258✔
2246

2247
    // Called by the associated connection object when this session is granted
2248
    // an opportunity to initiate the sending of a message.
2249
    //
2250
    // This function may lead to the destruction of the session object
2251
    // (suicide).
2252
    void send_message()
2253
    {
111,974✔
2254
        if (REALM_LIKELY(!unbind_message_received())) {
111,974✔
2255
            if (REALM_LIKELY(!error_occurred())) {
107,782✔
2256
                if (REALM_LIKELY(ident_message_received())) {
107,704✔
2257
                    // State is WaitForUnbind.
52,540✔
2258
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
106,446✔
2259
                    if (REALM_LIKELY(!relayed_alloc)) {
106,446✔
2260
                        // Send DOWNLOAD or MARK.
52,540✔
2261
                        continue_history_scan(); // Throws
106,446✔
2262
                        // Session object may have been
52,540✔
2263
                        // destroyed at this point (suicide)
52,540✔
2264
                        return;
106,446✔
2265
                    }
106,446✔
UNCOV
2266
                    send_alloc_message(); // Throws
×
UNCOV
2267
                    return;
×
UNCOV
2268
                }
×
2269
                // State is SendIdent
584✔
2270
                send_ident_message(); // Throws
1,258✔
2271
                return;
1,258✔
2272
            }
1,258✔
2273
            // State is SendError
38✔
2274
            send_error_message(); // Throws
78✔
2275
            return;
78✔
2276
        }
78✔
2277
        // State is SendUnbound
1,330✔
2278
        send_unbound_message(); // Throws
4,192✔
2279
        terminate();            // Throws
4,192✔
2280
        m_connection.discard_session(m_session_ident);
4,192✔
2281
        // This session is now destroyed!
1,330✔
2282
    }
4,192✔
2283

2284
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2285
                              bool is_subserver, ProtocolError& error)
2286
    {
6,976✔
2287
        if (logger.would_log(util::Logger::Level::info)) {
6,976✔
2288
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
×
2289
                          "need_client_file_ident=%3, is_subserver=%4)",
×
2290
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
×
2291
                          int(is_subserver)); // Throws
×
2292
        }
×
2293

2,828✔
2294
        ServerImpl& server = m_connection.get_server();
6,976✔
2295
        _impl::VirtualPathComponents virt_path_components =
6,976✔
2296
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
6,976✔
2297

2,828✔
2298
        if (!virt_path_components.is_valid) {
6,976✔
2299
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2300
                         "signed_user_token='%2')",
28✔
2301
                         path,
28✔
2302
                         short_token_fmt(signed_user_token)); // Throws
28✔
2303
            error = ProtocolError::illegal_realm_path;
28✔
2304
            return false;
28✔
2305
        }
28✔
2306

2,814✔
2307
        // The user has proper permissions at this stage.
2,814✔
2308

2,814✔
2309
        m_server_file = server.get_or_create_file(path); // Throws
6,948✔
2310

2,814✔
2311
        m_server_file->add_unidentified_session(this); // Throws
6,948✔
2312

2,814✔
2313
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
6,948✔
2314
                    m_connection.get_client_protocol_version(),
6,948✔
2315
                    m_connection.get_client_user_agent()); // Throws
6,948✔
2316

2,814✔
2317
        m_is_subserver = is_subserver;
6,948✔
2318
        if (REALM_LIKELY(!need_client_file_ident)) {
6,948✔
2319
            // Protocol state is now WaitForUnbind
1,684✔
2320
            return true;
5,038✔
2321
        }
5,038✔
2322

1,130✔
2323
        // FIXME: We must make a choice about client file ident for read only
1,130✔
2324
        // sessions. They should have a special read-only client file ident.
1,130✔
2325
        file_ident_type proxy_file = 0; // No proxy
1,910✔
2326
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
1,910✔
2327
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
1,910✔
2328
        m_send_ident_message = true;
1,910✔
2329
        // Protocol state is now AllocatingIdent
1,130✔
2330

1,130✔
2331
        return true;
1,910✔
2332
    }
1,910✔
2333

2334
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2335
                               version_type scan_server_version, version_type scan_client_version,
2336
                               version_type latest_server_version, salt_type latest_server_version_salt,
2337
                               ProtocolError& error)
2338
    {
6,140✔
2339
        // Protocol state must be WaitForIdent
2,148✔
2340
        REALM_ASSERT(!need_client_file_ident());
6,140✔
2341
        REALM_ASSERT(!m_send_ident_message);
6,140✔
2342
        REALM_ASSERT(!ident_message_received());
6,140✔
2343
        REALM_ASSERT(!unbind_message_received());
6,140✔
2344
        REALM_ASSERT(!error_occurred());
6,140✔
2345
        REALM_ASSERT(!m_error_message_sent);
6,140✔
2346

2,148✔
2347
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,140✔
2348
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
6,140✔
2349
                     "latest_server_version_salt=%6)",
6,140✔
2350
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
6,140✔
2351
                     latest_server_version, latest_server_version_salt); // Throws
6,140✔
2352

2,148✔
2353
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
6,140✔
2354
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
6,140✔
2355
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
6,140✔
2356
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
6,140✔
2357
        UploadCursor upload_threshold = {0, 0};
6,140✔
2358
        version_type locked_server_version = 0;
6,140✔
2359
        BootstrapError error_2 =
6,140✔
2360
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
6,140✔
2361
                                                    client_type, upload_threshold, locked_server_version,
6,140✔
2362
                                                    logger); // Throws
6,140✔
2363
        switch (error_2) {
6,140✔
2364
            case BootstrapError::no_error:
6,108✔
2365
                break;
6,108✔
2366
            case BootstrapError::client_file_expired:
✔
2367
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2368
                error = ProtocolError::client_file_expired;
×
2369
                return false;
×
2370
            case BootstrapError::bad_client_file_ident:
✔
2371
                logger.error("Bad client file ident (%1) in IDENT message",
×
2372
                             client_file_ident); // Throws
×
2373
                error = ProtocolError::bad_client_file_ident;
×
2374
                return false;
×
2375
            case BootstrapError::bad_client_file_ident_salt:
4✔
2376
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2377
                             client_file_ident_salt); // Throws
4✔
2378
                error = ProtocolError::diverging_histories;
4✔
2379
                return false;
4✔
2380
            case BootstrapError::bad_download_server_version:
✔
2381
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2382
                error = ProtocolError::bad_server_version;
×
2383
                return false;
×
2384
            case BootstrapError::bad_download_client_version:
4✔
2385
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2386
                error = ProtocolError::bad_client_version;
4✔
2387
                return false;
4✔
2388
            case BootstrapError::bad_server_version:
20✔
2389
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2390
                error = ProtocolError::bad_server_version;
20✔
2391
                return false;
20✔
2392
            case BootstrapError::bad_server_version_salt:
4✔
2393
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2394
                error = ProtocolError::diverging_histories;
4✔
2395
                return false;
4✔
2396
            case BootstrapError::bad_client_type:
✔
2397
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2398
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2399
                                                              // `bad_client_type`.
2400
                return false;
×
2401
        }
6,108✔
2402

2,132✔
2403
        // Make sure there is no other session currently associcated with the
2,132✔
2404
        // same client-side file
2,132✔
2405
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
6,108✔
2406
            SyncConnection& other_conn = other_sess->get_connection();
×
2407
            // It is a protocol violation if the other session is associated
2408
            // with the same connection
2409
            if (&other_conn == &m_connection) {
×
2410
                logger.error("Client file already bound in other session associated with "
×
2411
                             "the same connection"); // Throws
×
2412
                error = ProtocolError::bound_in_other_session;
×
2413
                return false;
×
2414
            }
×
2415
            // When the other session is associated with a different connection
2416
            // (`other_conn`), the clash may be due to the server not yet having
2417
            // realized that the other connection has been closed by the
2418
            // client. If so, the other connention is a "zombie". In the
2419
            // interest of getting rid of zombie connections as fast as
2420
            // possible, we shall assume that a clash with a session in another
2421
            // connection is always due to that other connection being a
2422
            // zombie. And when such a situation is detected, we want to close
2423
            // the zombie connection immediately.
2424
            auto log_level = Logger::Level::detail;
×
2425
            other_conn.terminate(log_level,
×
2426
                                 "Sync connection closed (superseded session)"); // Throws
×
2427
        }
×
2428

2,132✔
2429
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
6,108✔
2430

2,132✔
2431
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
6,108✔
2432
                                                                  m_session_ident, client_file_ident));
6,108✔
2433

2,132✔
2434
        m_server_file->identify_session(this, client_file_ident); // Throws
6,108✔
2435

2,132✔
2436
        m_client_file_ident = client_file_ident;
6,108✔
2437
        m_download_progress = download_progress;
6,108✔
2438
        m_upload_threshold = upload_threshold;
6,108✔
2439
        m_locked_server_version = locked_server_version;
6,108✔
2440

2,132✔
2441
        ServerImpl& server = m_connection.get_server();
6,108✔
2442
        const Server::Config& config = server.get_config();
6,108✔
2443
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
6,108✔
2444

2,132✔
2445
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
6,108✔
2446
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2447
                                              client_file_ident); // Throws
×
2448
        }
×
2449

2,132✔
2450
        // Protocol  state is now WaitForUnbind
2,132✔
2451
        enlist_to_send();
6,108✔
2452
        return true;
6,108✔
2453
    }
6,108✔
2454

2455
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2456
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2457
                                ProtocolError& error)
2458
    {
46,854✔
2459
        // Protocol state must be WaitForUnbind
22,984✔
2460
        REALM_ASSERT(!m_send_ident_message);
46,854✔
2461
        REALM_ASSERT(ident_message_received());
46,854✔
2462
        REALM_ASSERT(!unbind_message_received());
46,854✔
2463
        REALM_ASSERT(!error_occurred());
46,854✔
2464
        REALM_ASSERT(!m_error_message_sent);
46,854✔
2465

22,984✔
2466
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
46,854✔
2467
                      "locked_server_version=%3, num_changesets=%4)",
46,854✔
2468
                      progress_client_version, progress_server_version, locked_server_version,
46,854✔
2469
                      upload_changesets.size()); // Throws
46,854✔
2470

22,984✔
2471
        // We are unable to reproduce the cursor object for the upload progress
22,984✔
2472
        // when the protocol version is less than 29, because the client does
22,984✔
2473
        // not provide the required information. When the protocol version is
22,984✔
2474
        // less than 25, we can always get a consistent cursor by taking it from
22,984✔
2475
        // the changeset that was uploaded last, but in protocol versions 25,
22,984✔
2476
        // 26, 27, and 28, things are more complicated. Here, we receive new
22,984✔
2477
        // values for `last_integrated_server_version` which we cannot afford to
22,984✔
2478
        // ignore, but we do not know what client versions they correspond
22,984✔
2479
        // to. Fortunately, we can produce a cursor that works, and is mutually
22,984✔
2480
        // consistent with previous cursors, by simply bumping
22,984✔
2481
        // `upload_progress.client_version` when
22,984✔
2482
        // `upload_progress.last_intgerated_server_version` grows.
22,984✔
2483
        //
22,984✔
2484
        // To see that this scheme works, consider the last changeset, A, that
22,984✔
2485
        // will have already been uploaded and integrated at the beginning of
22,984✔
2486
        // the next session, and the first changeset, B, that follows A in the
22,984✔
2487
        // client side history, and is not upload skippable (of local origin and
22,984✔
2488
        // nonempty). We then need to show that A will be skipped, if uploaded
22,984✔
2489
        // in the next session, but B will not.
22,984✔
2490
        //
22,984✔
2491
        // Let V be the client version produced by A, and let T be the value of
22,984✔
2492
        // `upload_progress.client_version` as determined in this session, which
22,984✔
2493
        // is used as threshold in the next session. Then we know that A is
22,984✔
2494
        // skipped during the next session if V is less than, or equal to T. If
22,984✔
2495
        // the protocol version is at least 29, the protocol requires that T is
22,984✔
2496
        // greater than, or equal to V. If the protocol version is less than 25,
22,984✔
2497
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
22,984✔
2498
        // or 28, we construct T such that it is always greater than, or equal
22,984✔
2499
        // to V, so in all cases, A will be skipped during the next session.
22,984✔
2500
        //
22,984✔
2501
        // Let W be the client version on which B is based. We then know that B
22,984✔
2502
        // will be retained if, and only if W is greater than, or equalto T. If
22,984✔
2503
        // the protocol version is at least 29, we know that T is less than, or
22,984✔
2504
        // equal to W, since B is not integrated until the next session. If the
22,984✔
2505
        // protocol version is less tahn 25, we know that T is V. Since V must
22,984✔
2506
        // be less than, or equal to W, we again know that T is less than, or
22,984✔
2507
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
22,984✔
2508
        // construct T such that it is equal to V + N, where N is the number of
22,984✔
2509
        // observed increments in `last_integrated_server_version` since the
22,984✔
2510
        // client version prodiced by A. For each of these observed increments,
22,984✔
2511
        // there must have been a distinct new client version, but all these
22,984✔
2512
        // client versions must be less than, or equal to W, since B is not
22,984✔
2513
        // integrated until the next session. Therefore, we know that T = V + N
22,984✔
2514
        // is less than, or qual to W. So, in all cases, B will not skipped
22,984✔
2515
        // during the next session.
22,984✔
2516
        int protocol_version = m_connection.get_client_protocol_version();
46,854✔
2517
        static_cast<void>(protocol_version); // No protocol diversion (yet)
46,854✔
2518

22,984✔
2519
        UploadCursor upload_progress;
46,854✔
2520
        upload_progress = {progress_client_version, progress_server_version};
46,854✔
2521

22,984✔
2522
        // `upload_progress.client_version` must be nondecreasing across the
22,984✔
2523
        // session.
22,984✔
2524
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
46,854✔
2525
        if (REALM_UNLIKELY(!good_1)) {
46,854✔
2526
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2527
                         m_upload_progress.client_version); // Throws
×
2528
            error = ProtocolError::bad_client_version;
×
2529
            return false;
×
2530
        }
×
2531
        // `upload_progress.last_integrated_server_version` must be a version
22,984✔
2532
        // that the client can have heard about.
22,984✔
2533
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
46,854✔
2534
        if (REALM_UNLIKELY(!good_2)) {
46,854✔
2535
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2536
                         upload_progress.last_integrated_server_version,
×
2537
                         m_download_progress.server_version); // Throws
×
2538
            error = ProtocolError::bad_server_version;
×
2539
            return false;
×
2540
        }
×
2541

22,984✔
2542
        // `upload_progress` must be consistent.
22,984✔
2543
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
46,854✔
2544
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2545
                         upload_progress.last_integrated_server_version); // Throws
×
2546
            error = ProtocolError::bad_server_version;
×
2547
            return false;
×
2548
        }
×
2549
        // `upload_progress` and `m_upload_threshold` must be mutually
22,984✔
2550
        // consistent.
22,984✔
2551
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
46,854✔
2552
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2553
                         "threshold (%3, %4)",
×
2554
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2555
                         m_upload_threshold.client_version,
×
2556
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2557
            error = ProtocolError::bad_server_version;
×
2558
            return false;
×
2559
        }
×
2560
        // `upload_progress` and `m_upload_progress` must be mutually
22,984✔
2561
        // consistent.
22,984✔
2562
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
46,854✔
2563
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2564
                         "upload progress (%3, %4)",
×
2565
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2566
                         m_upload_progress.client_version,
×
2567
                         m_upload_progress.last_integrated_server_version); // Throws
×
2568
            error = ProtocolError::bad_server_version;
×
2569
            return false;
×
2570
        }
×
2571

22,984✔
2572
        version_type locked_server_version_2 = locked_server_version;
46,854✔
2573

22,984✔
2574
        // `locked_server_version_2` must be nondecreasing over the lifetime of
22,984✔
2575
        // the client-side file.
22,984✔
2576
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
46,854✔
2577
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2578
                         m_locked_server_version); // Throws
×
2579
            error = ProtocolError::bad_server_version;
×
2580
            return false;
×
2581
        }
×
2582
        // `locked_server_version_2` must be a version that the client can have
22,984✔
2583
        // heard about.
22,984✔
2584
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
46,854✔
2585
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2586
                         m_download_progress.server_version); // Throws
×
2587
            error = ProtocolError::bad_server_version;
×
2588
            return false;
×
2589
        }
×
2590

22,984✔
2591
        std::size_t num_previously_integrated_changesets = 0;
46,854✔
2592
        if (!upload_changesets.empty()) {
46,854✔
2593
            UploadCursor up = m_upload_progress;
24,014✔
2594
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
37,102✔
2595
                // `uc.upload_cursor.client_version` must be increasing across
18,062✔
2596
                // all the changesets in this UPLOAD message, and all must be
18,062✔
2597
                // greater than upload_progress.client_version of previous
18,062✔
2598
                // UPLOAD message.
18,062✔
2599
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
37,102✔
2600
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2601
                                 "changeset (%1 <= %2)",
×
2602
                                 uc.upload_cursor.client_version,
×
2603
                                 up.client_version); // Throws
×
2604
                    error = ProtocolError::bad_client_version;
×
2605
                    return false;
×
2606
                }
×
2607
                // `uc.upload_progress` must be consistent.
18,062✔
2608
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
37,102✔
2609
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2610
                                 uc.upload_cursor.client_version,
×
2611
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2612
                    error = ProtocolError::bad_server_version;
×
2613
                    return false;
×
2614
                }
×
2615
                // `uc.upload_progress` must be mutually consistent with
18,062✔
2616
                // previous upload cursor.
18,062✔
2617
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
37,102✔
2618
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2619
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2620
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2621
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2622
                    error = ProtocolError::bad_server_version;
×
2623
                    return false;
×
2624
                }
×
2625
                // `uc.upload_progress` must be mutually consistent with
18,062✔
2626
                // threshold, that is, for changesets that have not previously
18,062✔
2627
                // been integrated, it is important that the specified value of
18,062✔
2628
                // `last_integrated_server_version` is greater than, or equal to
18,062✔
2629
                // the reciprocal history base version.
18,062✔
2630
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
37,102✔
2631
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
37,102✔
2632
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2633
                                 "inconsistent with threshold (%3, %4)",
×
2634
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2635
                                 m_upload_threshold.client_version,
×
2636
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2637
                    error = ProtocolError::bad_server_version;
×
2638
                    return false;
×
2639
                }
×
2640
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
37,102✔
2641
                if (previously_integrated)
37,102✔
2642
                    ++num_previously_integrated_changesets;
2,692✔
2643
                up = uc.upload_cursor;
37,102✔
2644
            }
37,102✔
2645
            // `upload_progress.client_version` must be greater than, or equal
12,374✔
2646
            // to client versions produced by each of the changesets in this
12,374✔
2647
            // UPLOAD message.
12,374✔
2648
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
24,014✔
2649
                logger.error("Upload progress less than client version produced by uploaded "
×
2650
                             "changeset (%1 > %2)",
×
2651
                             up.client_version,
×
2652
                             upload_progress.client_version); // Throws
×
2653
                error = ProtocolError::bad_client_version;
×
2654
                return false;
×
2655
            }
×
2656
            // The upload cursor of last uploaded changeset must be mutually
12,374✔
2657
            // consistent with the reported upload progress.
12,374✔
2658
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
24,014✔
2659
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2660
                             "inconsistent with upload progress (%3, %4)",
×
2661
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2662
                             upload_progress.last_integrated_server_version); // Throws
×
2663
                error = ProtocolError::bad_server_version;
×
2664
                return false;
×
2665
            }
×
2666
        }
46,854✔
2667

22,984✔
2668
        // FIXME: Part of a very poor man's substitute for a proper backpressure
22,984✔
2669
        // scheme.
22,984✔
2670
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
46,854✔
2671
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2672
            // Using this exact error code, because it causes `try_again` flag
2673
            // to be set to true, which causes the client to wait for about 5
2674
            // minuites before trying to connect again.
2675
            error = ProtocolError::connection_closed;
×
2676
            return false;
×
2677
        }
×
2678

22,984✔
2679
        m_upload_progress = upload_progress;
46,854✔
2680

22,984✔
2681
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
46,854✔
2682
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
46,854✔
2683

22,984✔
2684
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
46,854✔
2685
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
46,854✔
2686

22,984✔
2687
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
46,854✔
2688
        if (!have_anything_to_do)
46,854✔
2689
            return true;
232✔
2690

22,836✔
2691
        if (!have_real_upload_progress)
46,622✔
2692
            upload_progress = m_upload_threshold;
×
2693

22,836✔
2694
        if (num_previously_integrated_changesets > 0) {
46,622✔
2695
            logger.detail("Ignoring %1 previously integrated changesets",
850✔
2696
                          num_previously_integrated_changesets); // Throws
850✔
2697
        }
850✔
2698
        if (num_changesets_to_integrate > 0) {
46,622✔
2699
            logger.detail("Initiate integration of %1 remote changesets",
23,632✔
2700
                          num_changesets_to_integrate); // Throws
23,632✔
2701
        }
23,632✔
2702

22,836✔
2703
        REALM_ASSERT(m_server_file);
46,622✔
2704
        ServerFile& file = *m_server_file;
46,622✔
2705
        std::size_t offset = num_previously_integrated_changesets;
46,622✔
2706
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
46,622✔
2707
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
46,622✔
2708

22,836✔
2709
        m_locked_server_version = locked_server_version_2;
46,622✔
2710
        return true;
46,622✔
2711
    }
46,622✔
2712

2713
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2714
    {
14,046✔
2715
        // Protocol state must be WaitForUnbind
5,970✔
2716
        REALM_ASSERT(!m_send_ident_message);
14,046✔
2717
        REALM_ASSERT(ident_message_received());
14,046✔
2718
        REALM_ASSERT(!unbind_message_received());
14,046✔
2719
        REALM_ASSERT(!error_occurred());
14,046✔
2720
        REALM_ASSERT(!m_error_message_sent);
14,046✔
2721

5,970✔
2722
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
14,046✔
2723

5,970✔
2724
        m_download_completion_request = request_ident;
14,046✔
2725

5,970✔
2726
        ensure_enlisted_to_send();
14,046✔
2727
        return true;
14,046✔
2728
    }
14,046✔
2729

2730
    // Returns true if the deactivation process has been completed, at which
2731
    // point the caller (SyncConnection::receive_unbind_message()) should
2732
    // terminate the session.
2733
    //
2734
    // CAUTION: This function may commit suicide!
2735
    void receive_unbind_message()
2736
    {
4,224✔
2737
        // Protocol state may be anything but SendUnbound
1,354✔
2738
        REALM_ASSERT(!m_unbind_message_received);
4,224✔
2739

1,354✔
2740
        logger.detail("Received: UNBIND"); // Throws
4,224✔
2741

1,354✔
2742
        detach_from_server_file();
4,224✔
2743
        m_unbind_message_received = true;
4,224✔
2744

1,354✔
2745
        // Detect completion of the deactivation process
1,354✔
2746
        if (m_error_message_sent) {
4,224✔
2747
            // Deactivation process completed
14✔
2748
            terminate(); // Throws
24✔
2749
            m_connection.discard_session(m_session_ident);
24✔
2750
            // This session is now destroyed!
14✔
2751
            return;
24✔
2752
        }
24✔
2753

1,340✔
2754
        // Protocol state is now SendUnbound
1,340✔
2755
        ensure_enlisted_to_send();
4,200✔
2756
    }
4,200✔
2757

2758
    void receive_error_message(session_ident_type, int, std::string_view)
2759
    {
×
2760
        REALM_ASSERT(!m_unbind_message_received);
×
2761

2762
        logger.detail("Received: ERROR"); // Throws
×
2763
    }
×
2764

2765
private:
2766
    SyncConnection& m_connection;
2767

2768
    const session_ident_type m_session_ident;
2769

2770
    // Not null if, and only if this session is in
2771
    // m_connection.m_sessions_enlisted_to_send.
2772
    Session* m_next = nullptr;
2773

2774
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2775
    // reset to null when the deactivation process is initiated, either when the
2776
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2777
    util::bind_ptr<ServerFile> m_server_file;
2778

2779
    bool m_disable_download = false;
2780
    bool m_is_subserver = false;
2781

2782
    using file_ident_request_type = ServerFile::file_ident_request_type;
2783

2784
    // When nonzero, this session has an outstanding request for a client file
2785
    // identifier.
2786
    file_ident_request_type m_file_ident_request = 0;
2787

2788
    // Payload for next outgoing ALLOC message.
2789
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2790

2791
    // Zero until the session receives an IDENT message from the client.
2792
    file_ident_type m_client_file_ident = 0;
2793

2794
    // Zero until initiate_deactivation() is called.
2795
    ProtocolError m_error_code = {};
2796

2797
    // The current point of progression of the download process. Set to (<server
2798
    // version>, <client version>) of the IDENT message when the IDENT message
2799
    // is received. At the time of return from continue_history_scan(), it
2800
    // points to the latest server version such that all preceding changesets in
2801
    // the server-side history have been downloaded, are currently being
2802
    // downloaded, or are *download excluded*.
2803
    DownloadCursor m_download_progress = {0, 0};
2804

2805
    request_ident_type m_download_completion_request = 0;
2806

2807
    // Records the progress of the upload process. Used to check that the client
2808
    // uploads changesets in order. Also, when m_upload_progress >
2809
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2810
    // version of the upload progress.
2811
    UploadCursor m_upload_progress = {0, 0};
2812

2813
    // Initialized on reception of the IDENT message. Specifies the actual
2814
    // upload progress (as recorded on the server-side) at the beginning of the
2815
    // session, and it remains fixed throughout the session.
2816
    //
2817
    // m_upload_threshold includes the progress resulting from the received
2818
    // changesets that have not yet been integrated (only relevant for
2819
    // synchronous backup).
2820
    UploadCursor m_upload_threshold = {0, 0};
2821

2822
    // Works partially as a cache of the persisted value, and partially as a way
2823
    // of checking that the client respects that it can never decrease.
2824
    version_type m_locked_server_version = 0;
2825

2826
    bool m_send_ident_message = false;
2827
    bool m_unbind_message_received = false;
2828
    bool m_error_message_sent = false;
2829

2830
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2831
    /// has been sent in the current session. The variable is used to ensure
2832
    /// that a DOWNLOAD message is always sent in a session. The received
2833
    /// DOWNLOAD message is needed by the client to ensure that its current
2834
    /// download progress is up to date.
2835
    bool m_one_download_message_sent = false;
2836

2837
    static std::string make_logger_prefix(session_ident_type session_ident)
2838
    {
6,976✔
2839
        std::ostringstream out;
6,976✔
2840
        out.imbue(std::locale::classic());
6,976✔
2841
        out << "Session[" << session_ident << "]: "; // Throws
6,976✔
2842
        return out.str();                            // Throws
6,976✔
2843
    }
6,976✔
2844

2845
    // Scan the history for changesets to be downloaded.
2846
    // If the history is longer than the end point of the previous scan,
2847
    // a DOWNLOAD message will be sent.
2848
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2849
    // requested to be notified about download completion.
2850
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2851
    //
2852
    // This function may lead to the destruction of the session object
2853
    // (suicide).
2854
    void continue_history_scan()
2855
    {
106,444✔
2856
        // Protocol state must be WaitForUnbind
52,536✔
2857
        REALM_ASSERT(!m_send_ident_message);
106,444✔
2858
        REALM_ASSERT(ident_message_received());
106,444✔
2859
        REALM_ASSERT(!unbind_message_received());
106,444✔
2860
        REALM_ASSERT(!error_occurred());
106,444✔
2861
        REALM_ASSERT(!m_error_message_sent);
106,444✔
2862
        REALM_ASSERT(!is_enlisted_to_send());
106,444✔
2863

52,536✔
2864
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
106,444✔
2865
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
106,444✔
2866

52,536✔
2867
        ServerImpl& server = m_connection.get_server();
106,444✔
2868
        const Server::Config& config = server.get_config();
106,444✔
2869
        if (REALM_UNLIKELY(m_disable_download))
106,444✔
2870
            return;
52,536✔
2871

52,536✔
2872
        bool have_more_to_scan =
106,444✔
2873
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
106,444✔
2874
        if (have_more_to_scan) {
106,444✔
2875
            m_server_file->register_client_access(m_client_file_ident);     // Throws
40,594✔
2876
            const ServerHistory& history = m_server_file->access().history; // Throws
40,594✔
2877
            const char* body;
40,594✔
2878
            std::size_t uncompressed_body_size;
40,594✔
2879
            std::size_t compressed_body_size = 0;
40,594✔
2880
            bool body_is_compressed = false;
40,594✔
2881
            version_type end_version = last_server_version.version;
40,594✔
2882
            DownloadCursor download_progress;
40,594✔
2883
            UploadCursor upload_progress = {0, 0};
40,594✔
2884
            std::uint_fast64_t downloadable_bytes = 0;
40,594✔
2885
            std::size_t num_changesets;
40,594✔
2886
            std::size_t accum_original_size;
40,594✔
2887
            std::size_t accum_compacted_size;
40,594✔
2888
            ServerProtocol& protocol = get_server_protocol();
40,594✔
2889
            bool disable_download_compaction = config.disable_download_compaction;
40,594✔
2890
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
40,594!
2891
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
20,662!
2892
            DownloadCache& cache = m_server_file->get_download_cache();
40,594✔
2893
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
40,594!
2894
            if (fetch_from_cache) {
40,594✔
2895
                body = cache.body.get();
×
2896
                uncompressed_body_size = cache.uncompressed_body_size;
×
2897
                compressed_body_size = cache.compressed_body_size;
×
2898
                body_is_compressed = cache.body_is_compressed;
×
2899
                download_progress = cache.download_progress;
×
2900
                downloadable_bytes = cache.downloadable_bytes;
×
2901
                num_changesets = cache.num_changesets;
×
2902
                accum_original_size = cache.accum_original_size;
×
2903
                accum_compacted_size = cache.accum_compacted_size;
×
2904
            }
×
2905
            else {
40,594✔
2906
                // Discard the old cached DOWNLOAD body before generating a new
20,662✔
2907
                // one to be cached. This can make a big difference because the
20,662✔
2908
                // size of that body can be very large (10GiB has been seen in a
20,662✔
2909
                // real-world case).
20,662✔
2910
                if (enable_cache)
40,594✔
2911
                    cache.body = {};
×
2912

20,662✔
2913
                OutputBuffer& out = server.get_misc_buffers().download_message;
40,594✔
2914
                out.reset();
40,594✔
2915
                download_progress = m_download_progress;
40,594✔
2916
                auto fetch_and_compress = [&](std::size_t max_download_size) {
40,592✔
2917
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
40,590✔
2918
                    std::uint_fast64_t cumulative_byte_size_current;
40,590✔
2919
                    std::uint_fast64_t cumulative_byte_size_total;
40,590✔
2920
                    bool not_expired = history.fetch_download_info(
40,590✔
2921
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
40,590✔
2922
                        cumulative_byte_size_current, cumulative_byte_size_total, disable_download_compaction,
40,590✔
2923
                        max_download_size); // Throws
40,590✔
2924
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
40,590✔
2925
                    SyncConnection& conn = get_connection();
40,590✔
2926
                    if (REALM_UNLIKELY(!not_expired)) {
40,590✔
2927
                        logger.debug("History scanning failed: Client file entry "
×
2928
                                     "expired during session"); // Throws
×
2929
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2930
                        // Session object may have been destroyed at this point
2931
                        // (suicide).
2932
                        return false;
×
2933
                    }
×
2934

20,660✔
2935
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
40,590✔
2936
                    uncompressed_body_size = out.size();
40,590✔
2937
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
40,590✔
2938
                    body = uncompressed.data();
40,590✔
2939
                    std::size_t max_uncompressed = 1024;
40,590✔
2940
                    if (uncompressed.size() > max_uncompressed) {
40,590✔
2941
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,290✔
2942
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,290✔
2943
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,290✔
2944
                        if (buffer.size() < uncompressed.size()) {
4,290✔
2945
                            body = buffer.data();
4,290✔
2946
                            compressed_body_size = buffer.size();
4,290✔
2947
                            body_is_compressed = true;
4,290✔
2948
                        }
4,290✔
2949
                    }
4,290✔
2950
                    num_changesets = handler.num_changesets;
40,590✔
2951
                    accum_original_size = handler.accum_original_size;
40,590✔
2952
                    accum_compacted_size = handler.accum_compacted_size;
40,590✔
2953
                    return true;
40,590✔
2954
                };
40,590✔
2955
                if (enable_cache) {
40,594✔
2956
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2957
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2958
                        // Session object may have been destroyed at this point
2959
                        // (suicide).
2960
                        return;
×
2961
                    }
×
2962
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2963
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2964
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2965
                    std::copy(body, body + body_size, cache.body.get());
×
2966
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2967
                    cache.compressed_body_size = compressed_body_size;
×
2968
                    cache.body_is_compressed = body_is_compressed;
×
2969
                    cache.end_version = end_version;
×
2970
                    cache.download_progress = download_progress;
×
2971
                    cache.downloadable_bytes = downloadable_bytes;
×
2972
                    cache.num_changesets = num_changesets;
×
2973
                    cache.accum_original_size = accum_original_size;
×
2974
                    cache.accum_compacted_size = accum_compacted_size;
×
2975
                }
×
2976
                else {
40,594✔
2977
                    std::size_t max_download_size = config.max_download_size;
40,594✔
2978
                    if (!fetch_and_compress(max_download_size)) { // Throws
40,594✔
2979
                        // Session object may have been destroyed at this point
2980
                        // (suicide).
2981
                        return;
×
2982
                    }
×
2983
                }
40,594✔
2984
            }
40,594✔
2985

20,662✔
2986
            OutputBuffer& out = m_connection.get_output_buffer();
40,594✔
2987
            protocol.make_download_message(
40,594✔
2988
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
40,594✔
2989
                download_progress.last_integrated_client_version, last_server_version.version,
40,594✔
2990
                last_server_version.salt, upload_progress.client_version,
40,594✔
2991
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
40,594✔
2992
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
40,594✔
2993

20,662✔
2994
            if (!disable_download_compaction) {
40,596✔
2995
                std::size_t saved = accum_original_size - accum_compacted_size;
40,596✔
2996
                double saved_2 = (accum_original_size == 0 ? 0 : std::round(saved * 100.0 / accum_original_size));
31,158✔
2997
                logger.detail("Download compaction: Saved %1 bytes (%2%%)", saved, saved_2); // Throws
40,596✔
2998
            }
40,596✔
2999

20,662✔
3000
            m_download_progress = download_progress;
40,594✔
3001
            logger.debug("Setting of m_download_progress.server_version = %1",
40,594✔
3002
                         m_download_progress.server_version); // Throws
40,594✔
3003
            send_download_message();
40,594✔
3004
            m_one_download_message_sent = true;
40,594✔
3005

20,662✔
3006
            enlist_to_send();
40,594✔
3007
        }
40,594✔
3008
        else if (m_download_completion_request) {
65,850✔
3009
            // Send a MARK message
5,966✔
3010
            request_ident_type request_ident = m_download_completion_request;
14,040✔
3011
            send_mark_message(request_ident);  // Throws
14,040✔
3012
            m_download_completion_request = 0; // Request handled
14,040✔
3013
            enlist_to_send();
14,040✔
3014
        }
14,040✔
3015
    }
106,444✔
3016

3017
    void send_ident_message()
3018
    {
1,258✔
3019
        // Protocol state must be SendIdent
584✔
3020
        REALM_ASSERT(!need_client_file_ident());
1,258✔
3021
        REALM_ASSERT(m_send_ident_message);
1,258✔
3022
        REALM_ASSERT(!ident_message_received());
1,258✔
3023
        REALM_ASSERT(!unbind_message_received());
1,258✔
3024
        REALM_ASSERT(!error_occurred());
1,258✔
3025
        REALM_ASSERT(!m_error_message_sent);
1,258✔
3026

584✔
3027
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,258✔
3028

584✔
3029
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,258✔
3030
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,258✔
3031

584✔
3032
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,258✔
3033
                     client_file_ident_salt); // Throws
1,258✔
3034

584✔
3035
        ServerProtocol& protocol = get_server_protocol();
1,258✔
3036
        OutputBuffer& out = m_connection.get_output_buffer();
1,258✔
3037
        int protocol_version = m_connection.get_client_protocol_version();
1,258✔
3038
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,258✔
3039
                                    client_file_ident_salt); // Throws
1,258✔
3040
        m_connection.initiate_write_output_buffer();         // Throws
1,258✔
3041

584✔
3042
        m_allocated_file_ident.ident = 0; // Consumed
1,258✔
3043
        m_send_ident_message = false;
1,258✔
3044
        // Protocol state is now WaitForStateRequest or WaitForIdent
584✔
3045
    }
1,258✔
3046

3047
    void send_download_message()
3048
    {
40,598✔
3049
        m_connection.initiate_write_output_buffer(); // Throws
40,598✔
3050
    }
40,598✔
3051

3052
    void send_mark_message(request_ident_type request_ident)
3053
    {
14,040✔
3054
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
14,040✔
3055

5,966✔
3056
        ServerProtocol& protocol = get_server_protocol();
14,040✔
3057
        OutputBuffer& out = m_connection.get_output_buffer();
14,040✔
3058
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
14,040✔
3059
        m_connection.initiate_write_output_buffer();                     // Throws
14,040✔
3060
    }
14,040✔
3061

3062
    void send_alloc_message()
3063
    {
×
3064
        // Protocol state must be WaitForUnbind
3065
        REALM_ASSERT(!m_send_ident_message);
×
3066
        REALM_ASSERT(ident_message_received());
×
3067
        REALM_ASSERT(!unbind_message_received());
×
3068
        REALM_ASSERT(!error_occurred());
×
3069
        REALM_ASSERT(!m_error_message_sent);
×
3070

3071
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3072

3073
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3074
        REALM_ASSERT(false);
×
3075

3076
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3077

3078
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3079

3080
        ServerProtocol& protocol = get_server_protocol();
×
3081
        OutputBuffer& out = m_connection.get_output_buffer();
×
3082
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3083
        m_connection.initiate_write_output_buffer();                   // Throws
×
3084

3085
        m_allocated_file_ident.ident = 0; // Consumed
×
3086

3087
        // Other messages may be waiting to be sent.
3088
        enlist_to_send();
×
3089
    }
×
3090

3091
    void send_unbound_message()
3092
    {
4,190✔
3093
        // Protocol state must be SendUnbound
1,332✔
3094
        REALM_ASSERT(unbind_message_received());
4,190✔
3095
        REALM_ASSERT(!m_error_message_sent);
4,190✔
3096

1,332✔
3097
        logger.debug("Sending: UNBOUND"); // Throws
4,190✔
3098

1,332✔
3099
        ServerProtocol& protocol = get_server_protocol();
4,190✔
3100
        OutputBuffer& out = m_connection.get_output_buffer();
4,190✔
3101
        protocol.make_unbound_message(out, m_session_ident); // Throws
4,190✔
3102
        m_connection.initiate_write_output_buffer();         // Throws
4,190✔
3103
    }
4,190✔
3104

3105
    void send_error_message()
3106
    {
80✔
3107
        // Protocol state must be SendError
38✔
3108
        REALM_ASSERT(!unbind_message_received());
80✔
3109
        REALM_ASSERT(error_occurred());
80✔
3110
        REALM_ASSERT(!m_error_message_sent);
80✔
3111

38✔
3112
        REALM_ASSERT(is_session_level_error(m_error_code));
80✔
3113

38✔
3114
        ProtocolError error_code = m_error_code;
80✔
3115
        const char* message = get_protocol_error_message(int(error_code));
80✔
3116
        std::size_t message_size = std::strlen(message);
80✔
3117
        bool try_again = determine_try_again(error_code);
80✔
3118

38✔
3119
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
80✔
3120
                      try_again); // Throws
80✔
3121

38✔
3122
        ServerProtocol& protocol = get_server_protocol();
80✔
3123
        OutputBuffer& out = m_connection.get_output_buffer();
80✔
3124
        int protocol_version = m_connection.get_client_protocol_version();
80✔
3125
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
80✔
3126
                                    m_session_ident); // Throws
80✔
3127
        m_connection.initiate_write_output_buffer();  // Throws
80✔
3128

38✔
3129
        m_error_message_sent = true;
80✔
3130
        // Protocol state is now WaitForUnbindErr
38✔
3131
    }
80✔
3132

3133
    void send_log_message(util::Logger::Level level, const std::string&& message)
3134
    {
6,108✔
3135
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
6,108✔
3136
            return logger.log(level, message.c_str());
×
3137
        }
×
3138

2,132✔
3139
        m_connection.send_log_message(level, std::move(message), m_session_ident);
6,108✔
3140
    }
6,108✔
3141

3142
    // Idempotent
3143
    void detach_from_server_file() noexcept
3144
    {
11,280✔
3145
        if (!m_server_file)
11,280✔
3146
            return;
4,332✔
3147
        ServerFile& file = *m_server_file;
6,948✔
3148
        if (ident_message_received()) {
6,948✔
3149
            file.remove_identified_session(m_client_file_ident);
6,108✔
3150
        }
6,108✔
3151
        else {
840✔
3152
            file.remove_unidentified_session(this);
840✔
3153
        }
840✔
3154
        if (m_file_ident_request != 0)
6,948✔
3155
            file.cancel_file_ident_request(m_file_ident_request);
652✔
3156
        m_server_file.reset();
6,948✔
3157
    }
6,948✔
3158

3159
    friend class SessionQueue;
3160
};
3161

3162

3163
// ============================ SessionQueue implementation ============================
3164

3165
void SessionQueue::push_back(Session* sess) noexcept
3166
{
112,174✔
3167
    REALM_ASSERT(!sess->m_next);
112,174✔
3168
    if (m_back) {
112,174✔
3169
        sess->m_next = m_back->m_next;
38,262✔
3170
        m_back->m_next = sess;
38,262✔
3171
    }
38,262✔
3172
    else {
73,912✔
3173
        sess->m_next = sess;
73,912✔
3174
    }
73,912✔
3175
    m_back = sess;
112,174✔
3176
}
112,174✔
3177

3178

3179
Session* SessionQueue::pop_front() noexcept
3180
{
163,618✔
3181
    Session* sess = nullptr;
163,618✔
3182
    if (m_back) {
163,618✔
3183
        sess = m_back->m_next;
111,976✔
3184
        if (sess != m_back) {
111,976✔
3185
            m_back->m_next = sess->m_next;
38,200✔
3186
        }
38,200✔
3187
        else {
73,776✔
3188
            m_back = nullptr;
73,776✔
3189
        }
73,776✔
3190
        sess->m_next = nullptr;
111,976✔
3191
    }
111,976✔
3192
    return sess;
163,618✔
3193
}
163,618✔
3194

3195

3196
void SessionQueue::clear() noexcept
3197
{
3,158✔
3198
    if (m_back) {
3,158✔
3199
        Session* sess = m_back;
128✔
3200
        for (;;) {
198✔
3201
            Session* next = sess->m_next;
198✔
3202
            sess->m_next = nullptr;
198✔
3203
            if (next == m_back)
198✔
3204
                break;
128✔
3205
            sess = next;
70✔
3206
        }
70✔
3207
        m_back = nullptr;
128✔
3208
    }
128✔
3209
}
3,158✔
3210

3211

3212
// ============================ ServerFile implementation ============================
3213

3214
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3215
                       std::string real_path, bool disable_sync_to_disk)
3216
    : logger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.logger_ptr}               // Throws
3217
    , wlogger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.get_worker().logger_ptr} // Throws
3218
    , m_server{server}
3219
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
3220
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
3221
{
1,054✔
3222
}
1,054✔
3223

3224

3225
ServerFile::~ServerFile() noexcept
3226
{
1,054✔
3227
    REALM_ASSERT(m_unidentified_sessions.empty());
1,054✔
3228
    REALM_ASSERT(m_identified_sessions.empty());
1,054✔
3229
    REALM_ASSERT(m_file_ident_request == 0);
1,054✔
3230
}
1,054✔
3231

3232

3233
void ServerFile::initialize()
3234
{
1,054✔
3235
    const ServerHistory& history = access().history; // Throws
1,054✔
3236
    file_ident_type partial_file_ident = 0;
1,054✔
3237
    version_type partial_progress_reference_version = 0;
1,054✔
3238
    bool has_upstream_sync_status;
1,054✔
3239
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,054✔
3240
                       partial_progress_reference_version); // Throws
1,054✔
3241
    REALM_ASSERT(!has_upstream_sync_status);
1,054✔
3242
    REALM_ASSERT(partial_file_ident == 0);
1,054✔
3243
}
1,054✔
3244

3245

3246
void ServerFile::activate() {}
1,054✔
3247

3248

3249
// This function must be called only after a completed invocation of
3250
// initialize(). Both functinos must only ever be called by the network event
3251
// loop thread.
3252
void ServerFile::register_client_access(file_ident_type) {}
93,310✔
3253

3254

3255
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file, ClientType client_type)
3256
    -> file_ident_request_type
3257
{
1,910✔
3258
    auto request = ++m_last_file_ident_request;
1,910✔
3259
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
1,910✔
3260

1,130✔
3261
    on_work_added(); // Throws
1,910✔
3262
    return request;
1,910✔
3263
}
1,910✔
3264

3265

3266
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3267
{
652✔
3268
    auto i = m_file_ident_requests.find(request);
652✔
3269
    REALM_ASSERT(i != m_file_ident_requests.end());
652✔
3270
    FileIdentRequestInfo& info = i->second;
652✔
3271
    REALM_ASSERT(info.receiver);
652✔
3272
    info.receiver = nullptr;
652✔
3273
}
652✔
3274

3275

3276
void ServerFile::add_unidentified_session(Session* sess)
3277
{
6,946✔
3278
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
6,946✔
3279
    m_unidentified_sessions.insert(sess); // Throws
6,946✔
3280
}
6,946✔
3281

3282

3283
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3284
{
6,108✔
3285
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
6,108✔
3286
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
6,108✔
3287

2,132✔
3288
    m_identified_sessions[client_file_ident] = sess; // Throws
6,108✔
3289
    m_unidentified_sessions.erase(sess);
6,108✔
3290
}
6,108✔
3291

3292

3293
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3294
{
840✔
3295
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
840✔
3296
    m_unidentified_sessions.erase(sess);
840✔
3297
}
840✔
3298

3299

3300
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3301
{
6,108✔
3302
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
6,108✔
3303
    m_identified_sessions.erase(client_file_ident);
6,108✔
3304
}
6,108✔
3305

3306

3307
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3308
{
6,108✔
3309
    auto i = m_identified_sessions.find(client_file_ident);
6,108✔
3310
    if (i == m_identified_sessions.end())
6,108✔
3311
        return nullptr;
6,108✔
3312
    return i->second;
×
3313
}
×
3314

3315
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3316
{
46,850✔
3317
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
46,850✔
3318
}
46,850✔
3319

3320

3321
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3322
                                                version_type locked_server_version, const UploadChangeset* changesets,
3323
                                                std::size_t num_changesets)
3324
{
46,618✔
3325
    register_client_access(client_file_ident); // Throws
46,618✔
3326

22,832✔
3327
    bool dirty = false;
46,618✔
3328

22,832✔
3329
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
46,618✔
3330
    std::size_t num_bytes = 0;
46,618✔
3331
    for (std::size_t i = 0; i < num_changesets; ++i) {
81,028✔
3332
        const UploadChangeset& uc = changesets[i];
34,410✔
3333
        auto& changesets = list.changesets;
34,410✔
3334
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,410✔
3335
                                uc.changeset); // Throws
34,410✔
3336
        num_bytes += uc.changeset.size();
34,410✔
3337
        dirty = true;
34,410✔
3338
    }
34,410✔
3339

22,832✔
3340
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
46,618✔
3341
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
46,618✔
3342
    if (upload_progress.client_version > list.upload_progress.client_version) {
46,624✔
3343
        list.upload_progress = upload_progress;
46,622✔
3344
        dirty = true;
46,622✔
3345
    }
46,622✔
3346

22,832✔
3347
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
46,618✔
3348
    if (locked_server_version > list.locked_server_version) {
46,618✔
3349
        list.locked_server_version = locked_server_version;
38,660✔
3350
        dirty = true;
38,660✔
3351
    }
38,660✔
3352

22,832✔
3353
    if (REALM_LIKELY(dirty)) {
46,622✔
3354
        if (num_changesets > 0) {
46,620✔
3355
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
23,634✔
3356
        }
23,634✔
3357
        else {
22,986✔
3358
            on_work_added(); // Throws
22,986✔
3359
        }
22,986✔
3360
    }
46,620✔
3361
}
46,618✔
3362

3363

3364
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3365
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3366
                                                    ClientType client_type, UploadCursor& upload_progress,
3367
                                                    version_type& locked_server_version, Logger& logger)
3368
{
6,140✔
3369
    // The Realm file may contain a later snapshot than the one reflected by
2,148✔
3370
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
2,148✔
3371
    if (server_version.version > m_version_info.sync_version.version)
6,140✔
3372
        return BootstrapError::bad_server_version;
20✔
3373

2,138✔
3374
    const ServerHistory& hist = access().history; // Throws
6,120✔
3375
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
6,120✔
3376
                                                         client_type, upload_progress, locked_server_version,
6,120✔
3377
                                                         logger); // Throws
6,120✔
3378

2,138✔
3379
    // FIXME: Rather than taking previously buffered changesets from the same
2,138✔
3380
    // client file into account when determining the upload progress, and then
2,138✔
3381
    // allowing for an error during the integration of those changesets to be
2,138✔
3382
    // reported to, and terminate the new session, consider to instead postpone
2,138✔
3383
    // the bootstrapping of the new session until all previously buffered
2,138✔
3384
    // changesets from same client file have been fully processed.
2,138✔
3385

2,138✔
3386
    if (error == BootstrapError::no_error) {
6,120✔
3387
        register_client_access(client_file_ident.ident); // Throws
6,108✔
3388

2,132✔
3389
        // If upload, or releaseing of server versions progressed further during
2,132✔
3390
        // previous sessions than the persisted points, take that into account
2,132✔
3391
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
6,108✔
3392
        if (i != m_work.changesets_from_downstream.end()) {
6,108✔
3393
            const IntegratableChangesetList& list = i->second;
3,388✔
3394
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
3,388✔
3395
            upload_progress = list.upload_progress;
3,388✔
3396
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
3,388✔
3397
            locked_server_version = list.locked_server_version;
3,388✔
3398
        }
3,388✔
3399
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
6,108✔
3400
        if (j != m_changesets_from_downstream.end()) {
6,108✔
3401
            const IntegratableChangesetList& list = j->second;
182✔
3402
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
182✔
3403
            upload_progress = list.upload_progress;
182✔
3404
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
182✔
3405
            locked_server_version = list.locked_server_version;
182✔
3406
        }
182✔
3407
    }
6,108✔
3408

2,138✔
3409
    return error;
6,120✔
3410
}
6,120✔
3411

3412
// NOTE: This function is executed by the worker thread
3413
void ServerFile::worker_process_work_unit(WorkerState& state)
3414
{
38,672✔
3415
    SteadyTimePoint start_time = steady_clock_now();
38,672✔
3416
    milliseconds_type parallel_time = 0;
38,672✔
3417

19,036✔
3418
    Work& work = m_work;
38,672✔
3419
    wlogger.debug("Work unit execution started"); // Throws
38,672✔
3420

19,036✔
3421
    if (work.has_primary_work) {
38,674✔
3422
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
38,674✔
3423
            worker_allocate_file_identifiers(); // Throws
19,776✔
3424

19,036✔
3425
        if (!m_work.changesets_from_downstream.empty())
38,674✔
3426
            worker_integrate_changes_from_downstream(state); // Throws
37,322✔
3427
    }
38,674✔
3428

19,036✔
3429
    wlogger.debug("Work unit execution completed"); // Throws
38,672✔
3430

19,036✔
3431
    milliseconds_type time = steady_duration(start_time);
38,672✔
3432
    milliseconds_type seq_time = time - parallel_time;
38,672✔
3433
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
38,672✔
3434
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
38,672✔
3435

19,036✔
3436
    // Pass control back to the network event loop thread
19,036✔
3437
    network::Service& service = m_server.get_service();
38,672✔
3438
    service.post([this](Status) {
38,556✔
3439
        // FIXME: The safety of capturing `this` here, relies on the fact
18,802✔
3440
        // that ServerFile objects currently are not destroyed until the
18,802✔
3441
        // server object is destroyed.
18,802✔
3442
        group_postprocess_stage_1(); // Throws
38,322✔
3443
        // Suicide may have happened at this point
18,802✔
3444
    }); // Throws
38,322✔
3445
}
38,672✔
3446

3447

3448
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3449
{
23,634✔
3450
    m_num_changesets_from_downstream += num_changesets;
23,634✔
3451

12,152✔
3452
    if (num_bytes > 0) {
23,634✔
3453
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
23,634✔
3454
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
23,634✔
3455
    }
23,634✔
3456

12,152✔
3457
    on_work_added(); // Throws
23,634✔
3458
}
23,634✔
3459

3460

3461
void ServerFile::on_work_added()
3462
{
48,528✔
3463
    if (m_has_blocked_work)
48,528✔
3464
        return;
9,780✔
3465
    m_has_blocked_work = true;
38,748✔
3466
    // Reference file
19,074✔
3467
    if (m_has_work_in_progress)
38,748✔
3468
        return;
10,624✔
3469
    group_unblock_work(); // Throws
28,124✔
3470
}
28,124✔
3471

3472

3473
void ServerFile::group_unblock_work()
3474
{
38,704✔
3475
    REALM_ASSERT(!m_has_work_in_progress);
38,704✔
3476
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
38,704✔
3477
        unblock_work(); // Throws
38,704✔
3478
        const Work& work = m_work;
38,704✔
3479
        if (REALM_LIKELY(work.has_primary_work)) {
38,704✔
3480
            logger.trace("Work unit unblocked"); // Throws
38,688✔
3481
            m_has_work_in_progress = true;
38,688✔
3482
            Worker& worker = m_server.get_worker();
38,688✔
3483
            worker.enqueue(this); // Throws
38,688✔
3484
        }
38,688✔
3485
    }
38,704✔
3486
}
38,704✔
3487

3488

3489
void ServerFile::unblock_work()
3490
{
38,704✔
3491
    REALM_ASSERT(m_has_blocked_work);
38,704✔
3492

19,046✔
3493
    m_work.reset();
38,704✔
3494

19,046✔
3495
    // Discard requests for file identifiers whose receiver is no longer
19,046✔
3496
    // waiting.
19,046✔
3497
    {
38,704✔
3498
        auto i = m_file_ident_requests.begin();
38,704✔
3499
        auto end = m_file_ident_requests.end();
38,704✔
3500
        while (i != end) {
40,612✔
3501
            auto j = i++;
1,908✔
3502
            const FileIdentRequestInfo& info = j->second;
1,908✔
3503
            if (!info.receiver)
1,908✔
3504
                m_file_ident_requests.erase(j);
500✔
3505
        }
1,908✔
3506
    }
38,704✔
3507
    std::size_t n = m_file_ident_requests.size();
38,704✔
3508
    if (n > 0) {
38,704✔
3509
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,356✔
3510
        std::size_t i = 0;
1,356✔
3511
        for (const auto& pair : m_file_ident_requests) {
1,408✔
3512
            const FileIdentRequestInfo& info = pair.second;
1,408✔
3513
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,408✔
3514
            slot.proxy_file = info.proxy_file;
1,408✔
3515
            slot.client_type = info.client_type;
1,408✔
3516
            ++i;
1,408✔
3517
        }
1,408✔
3518
        m_work.has_primary_work = true;
1,356✔
3519
    }
1,356✔
3520

19,046✔
3521
    // FIXME: `ServerFile::m_changesets_from_downstream` and
19,046✔
3522
    // `Work::changesets_from_downstream` should be renamed to something else,
19,046✔
3523
    // as it may contain kinds of data other than changesets.
19,046✔
3524

19,046✔
3525
    using std::swap;
38,704✔
3526
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
38,704✔
3527
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
38,704✔
3528
    bool has_changesets = !m_work.changesets_from_downstream.empty();
38,704✔
3529
    if (has_changesets) {
38,704✔
3530
        m_work.has_primary_work = true;
37,332✔
3531
    }
37,332✔
3532

19,046✔
3533
    // Keep track of the size of pending changesets
19,046✔
3534
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
38,704✔
3535
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
38,704✔
3536
    m_blocked_changesets_from_downstream_byte_size = 0;
38,704✔
3537

19,046✔
3538
    m_num_changesets_from_downstream = 0;
38,704✔
3539
    m_has_blocked_work = false;
38,704✔
3540
}
38,704✔
3541

3542

3543
void ServerFile::resume_download() noexcept
3544
{
22,154✔
3545
    for (const auto& entry : m_identified_sessions) {
34,580✔
3546
        Session& sess = *entry.second;
34,580✔
3547
        sess.ensure_enlisted_to_send();
34,580✔
3548
    }
34,580✔
3549
}
22,154✔
3550

3551

3552
void ServerFile::recognize_external_change()
3553
{
4,798✔
3554
    VersionInfo prev_version_info = m_version_info;
4,798✔
3555
    const ServerHistory& history = access().history;       // Throws
4,798✔
3556
    bool has_upstream_status;                              // Dummy
4,798✔
3557
    sync::file_ident_type partial_file_ident;              // Dummy
4,798✔
3558
    sync::version_type partial_progress_reference_version; // Dummy
4,798✔
3559
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,798✔
3560
                       partial_progress_reference_version); // Throws
4,798✔
3561

2,400✔
3562
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,798✔
3563
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,798✔
3564
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,798✔
3565
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,798✔
3566
        resume_download();
4,798✔
3567
    }
4,798✔
3568
}
4,798✔
3569

3570

3571
// NOTE: This function is executed by the worker thread
3572
void ServerFile::worker_allocate_file_identifiers()
3573
{
1,350✔
3574
    Work& work = m_work;
1,350✔
3575
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,350✔
3576
    ServerHistory& hist = worker_access().history;                                      // Throws
1,350✔
3577
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,350✔
3578
    m_work.produced_new_realm_version = true;
1,350✔
3579
}
1,350✔
3580

3581

3582
// Returns true when, and only when this function produces a new sync version
3583
// (adds a new entry to the sync history).
3584
//
3585
// NOTE: This function is executed by the worker thread
3586
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3587
{
37,322✔
3588
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
37,322✔
3589

18,426✔
3590
    std::unique_ptr<ServerHistory> hist_ptr;
37,322✔
3591
    DBRef sg_ptr;
37,322✔
3592
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
37,322✔
3593
    bool backup_whole_realm = false;
37,322✔
3594
    bool produced_new_realm_version = hist.integrate_client_changesets(
37,322✔
3595
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
37,322✔
3596
        wlogger); // Throws
37,322✔
3597
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
37,322✔
3598
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
37,322✔
3599
    if (produced_new_realm_version) {
37,322✔
3600
        m_work.produced_new_realm_version = true;
37,300✔
3601
        if (produced_new_sync_version) {
37,300✔
3602
            m_work.produced_new_sync_version = true;
17,380✔
3603
        }
17,380✔
3604
    }
37,300✔
3605
    return produced_new_sync_version;
37,322✔
3606
}
37,322✔
3607

3608
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3609
                                                   DBRef& sg_ptr)
3610
{
37,324✔
3611
    if (state.use_file_cache)
37,324✔
3612
        return worker_access().history; // Throws
37,324✔
3613
    const std::string& path = m_worker_file.realm_path;
×
3614
    hist_ptr = m_server.make_history_for_path();                   // Throws
×
3615
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
×
3616
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
×
3617
    sg_ptr->claim_sync_agent();                                    // Throws
×
3618
    return *hist_ptr;                                              // Throws
×
3619
}
×
3620

3621

3622
// When worker thread finishes work unit.
3623
void ServerFile::group_postprocess_stage_1()
3624
{
38,322✔
3625
    REALM_ASSERT(m_has_work_in_progress);
38,322✔
3626

18,802✔
3627
    group_finalize_work_stage_1(); // Throws
38,322✔
3628
    group_finalize_work_stage_2(); // Throws
38,322✔
3629
    group_postprocess_stage_2();   // Throws
38,322✔
3630
}
38,322✔
3631

3632

3633
void ServerFile::group_postprocess_stage_2()
3634
{
38,318✔
3635
    REALM_ASSERT(m_has_work_in_progress);
38,318✔
3636
    group_postprocess_stage_3(); // Throws
38,318✔
3637
    // Suicide may have happened at this point
18,802✔
3638
}
38,318✔
3639

3640

3641
// When all files, including the reference file, have been backed up.
3642
void ServerFile::group_postprocess_stage_3()
3643
{
38,316✔
3644
    REALM_ASSERT(m_has_work_in_progress);
38,316✔
3645
    m_has_work_in_progress = false;
38,316✔
3646

18,802✔
3647
    logger.trace("Work unit postprocessing complete"); // Throws
38,316✔
3648
    if (m_has_blocked_work)
38,316✔
3649
        group_unblock_work(); // Throws
10,574✔
3650
}
38,316✔
3651

3652

3653
void ServerFile::finalize_work_stage_1()
3654
{
38,322✔
3655
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
38,322✔
3656
        // Report the byte size of completed downstream changesets.
9,118✔
3657
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
17,388✔
3658
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
17,388✔
3659
        m_unblocked_changesets_from_downstream_byte_size = 0;
17,388✔
3660
    }
17,388✔
3661

18,804✔
3662
    // Deal with errors (bad changesets) pertaining to downstream clients
18,804✔
3663
    std::size_t num_changesets_removed = 0;
38,322✔
3664
    std::size_t num_bytes_removed = 0;
38,322✔
3665
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
18,816✔
3666
        file_ident_type client_file_ident = entry.first;
20✔
3667
        ExtendedIntegrationError error = entry.second;
20✔
3668
        ProtocolError error_2 = ProtocolError::other_session_error;
20✔
3669
        switch (error) {
20✔
3670
            case ExtendedIntegrationError::client_file_expired:
✔
3671
                logger.debug("Changeset integration failed: Client file entry "
×
3672
                             "expired during session"); // Throws
×
3673
                error_2 = ProtocolError::client_file_expired;
×
3674
                break;
×
3675
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3676
                error_2 = ProtocolError::bad_origin_file_ident;
×
3677
                break;
×
3678
            case ExtendedIntegrationError::bad_changeset:
20✔
3679
                error_2 = ProtocolError::bad_changeset;
20✔
3680
                break;
20✔
3681
        }
20✔
3682
        auto i = m_identified_sessions.find(client_file_ident);
20✔
3683
        if (i != m_identified_sessions.end()) {
20✔
3684
            Session& sess = *i->second;
20✔
3685
            SyncConnection& conn = sess.get_connection();
20✔
3686
            conn.protocol_error(error_2, &sess); // Throws
20✔
3687
        }
20✔
3688
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
20✔
3689
        std::size_t num_changesets = list.changesets.size();
20✔
3690
        std::size_t num_bytes = 0;
20✔
3691
        for (const IntegratableChangeset& ic : list.changesets)
20✔
3692
            num_bytes += ic.changeset.size();
×
3693
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
20✔
3694
                    client_file_ident); // Throws
20✔
3695
        num_changesets_removed += num_changesets;
20✔
3696
        num_bytes_removed += num_bytes;
20✔
3697
        m_changesets_from_downstream.erase(client_file_ident);
20✔
3698
    }
20✔
3699

18,804✔
3700
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
38,322✔
3701
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
38,322✔
3702

18,804✔
3703
    if (num_changesets_removed == 0)
38,322✔
3704
        return;
38,322✔
3705

3706
    m_num_changesets_from_downstream -= num_changesets_removed;
×
3707

3708
    // The byte size of the blocked changesets must be decremented.
3709
    if (num_bytes_removed > 0) {
×
3710
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3711
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3712
    }
×
3713
}
×
3714

3715

3716
void ServerFile::finalize_work_stage_2()
3717
{
38,320✔
3718
    // Expose new snapshot to remote peers
18,804✔
3719
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
38,320✔
3720
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
38,320✔
3721
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
38,296✔
3722
        m_version_info = m_work.version_info;
38,296✔
3723
    }
38,296✔
3724

18,804✔
3725
    bool resume_download_and_upload = m_work.produced_new_sync_version;
38,320✔
3726

18,804✔
3727
    // Deliver allocated file identifiers to requesters
18,804✔
3728
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
38,320✔
3729
    auto begin = m_file_ident_requests.begin();
38,320✔
3730
    auto i = begin;
38,320✔
3731
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
19,558✔
3732
        FileIdentRequestInfo& info = i->second;
1,388✔
3733
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,388✔
3734
        REALM_ASSERT(info.client_type == slot.client_type);
1,388✔
3735
        if (FileIdentReceiver* receiver = info.receiver) {
1,388✔
3736
            info.receiver = nullptr;
1,258✔
3737
            receiver->receive_file_ident(slot.file_ident); // Throws
1,258✔
3738
        }
1,258✔
3739
        ++i;
1,388✔
3740
    }
1,388✔
3741
    m_file_ident_requests.erase(begin, i);
38,320✔
3742

18,804✔
3743
    // Resume download to downstream clients
18,804✔
3744
    if (resume_download_and_upload) {
38,320✔
3745
        resume_download();
17,356✔
3746
    }
17,356✔
3747
}
38,320✔
3748

3749
// ============================ Worker implementation ============================
3750

3751
Worker::Worker(ServerImpl& server)
3752
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, "Worker: ", server.logger_ptr)}
3753
    // Throws
3754
    , logger(*logger_ptr)
3755
    , m_server{server}
3756
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
3757
{
1,376✔
3758
    util::seed_prng_nondeterministically(m_random); // Throws
1,376✔
3759
}
1,376✔
3760

3761

3762
void Worker::enqueue(ServerFile* file)
3763
{
38,694✔
3764
    util::LockGuard lock{m_mutex};
38,694✔
3765
    m_queue.push_back(file); // Throws
38,694✔
3766
    m_cond.notify_all();
38,694✔
3767
}
38,694✔
3768

3769

3770
std::mt19937_64& Worker::server_history_get_random() noexcept
3771
{
2,392✔
3772
    return m_random;
2,392✔
3773
}
2,392✔
3774

3775

3776
void Worker::run()
3777
{
1,320✔
3778
    for (;;) {
39,990✔
3779
        ServerFile* file = nullptr;
39,990✔
3780
        {
39,990✔
3781
            util::LockGuard lock{m_mutex};
39,990✔
3782
            for (;;) {
79,360✔
3783
                if (REALM_UNLIKELY(m_stop))
79,360✔
3784
                    return;
39,622✔
3785
                if (!m_queue.empty()) {
78,040✔
3786
                    file = m_queue.front();
38,670✔
3787
                    m_queue.pop_front();
38,670✔
3788
                    break;
38,670✔
3789
                }
38,670✔
3790
                m_cond.wait(lock);
39,370✔
3791
            }
39,370✔
3792
        }
39,990✔
3793
        file->worker_process_work_unit(m_state); // Throws
39,278✔
3794
    }
38,670✔
3795
}
1,320✔
3796

3797

3798
void Worker::stop() noexcept
3799
{
1,320✔
3800
    util::LockGuard lock{m_mutex};
1,320✔
3801
    m_stop = true;
1,320✔
3802
    m_cond.notify_all();
1,320✔
3803
}
1,320✔
3804

3805

3806
// ============================ ServerImpl implementation ============================
3807

3808
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3809
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::server, std::move(config.logger))}
3810
    , logger{*logger_ptr}
3811
    , m_config{std::move(config)}
3812
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
3813
    , m_root_dir{root_dir} // Throws
3814
    , m_access_control{std::move(pkey)}
3815
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
3816
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
3817
    , m_worker{*this}                                                                    // Throws
3818
    , m_acceptor{get_service()}
3819
    , m_server_protocol{}       // Throws
3820
    , m_compress_memory_arena{} // Throws
3821
{
1,376✔
3822
    if (m_config.ssl) {
1,376✔
3823
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3824
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3825
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3826
    }
24✔
3827
}
1,376✔
3828

3829

3830
ServerImpl::~ServerImpl() noexcept
3831
{
1,376✔
3832
    bool server_destroyed_while_still_running = m_running;
1,376✔
3833
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
1,376✔
3834
}
1,376✔
3835

3836

3837
void ServerImpl::start()
3838
{
1,376✔
3839
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
1,376✔
3840
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
1,376✔
3841
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
1,376✔
3842
                m_protocol_version_range.first,
1,376✔
3843
                m_protocol_version_range.second); // Throws
1,376✔
3844
    logger.info("Platform: %1", util::get_platform_info());
1,376✔
3845
    bool is_debug_build = false;
1,376✔
3846
#if REALM_DEBUG
1,376✔
3847
    is_debug_build = true;
1,376✔
3848
#endif
1,376✔
3849
    {
1,376✔
3850
        const char* lead_text = "Build mode";
1,376✔
3851
        if (is_debug_build) {
1,376✔
3852
            logger.info("%1: Debug", lead_text); // Throws
1,376✔
3853
        }
1,376✔
3854
        else {
×
3855
            logger.info("%1: Release", lead_text); // Throws
×
3856
        }
×
3857
    }
1,376✔
3858
    if (is_debug_build) {
1,376✔
3859
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
1,376✔
3860
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
1,376✔
3861
    }
1,376✔
3862
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
1,376✔
3863
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
1,376✔
3864
    {
1,376✔
3865
        const char* lead_text = "Encryption";
1,376✔
3866
        if (m_config.encryption_key) {
1,376✔
3867
            logger.info("%1: Yes", lead_text); // Throws
4✔
3868
        }
4✔
3869
        else {
1,372✔
3870
            logger.info("%1: No", lead_text); // Throws
1,372✔
3871
        }
1,372✔
3872
    }
1,376✔
3873
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
1,376✔
3874
    {
1,376✔
3875
        const char* lead_text = "Disable sync to disk";
1,376✔
3876
        if (m_config.disable_sync_to_disk) {
1,376✔
3877
            logger.info("%1: All files", lead_text); // Throws
688✔
3878
        }
688✔
3879
        else {
688✔
3880
            logger.info("%1: No", lead_text); // Throws
688✔
3881
        }
688✔
3882
    }
1,376✔
3883
    if (m_config.disable_sync_to_disk) {
1,376✔
3884
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
688✔
3885
                    "never do this in production!"); // Throws
688✔
3886
    }
688✔
3887
    logger.info("Download compaction: %1",
1,376✔
3888
                (m_config.disable_download_compaction ? "No" : "Yes")); // Throws
1,376✔
3889
    logger.info("Download bootstrap caching: %1",
1,376✔
3890
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
1,376✔
3891
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
1,376✔
3892
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
1,376✔
3893
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
1,376✔
3894
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
1,376✔
3895
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
1,376✔
3896
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
1,376✔
3897
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
1,376✔
3898
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
1,376✔
3899

636✔
3900
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
1,376✔
3901

636✔
3902
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
1,376✔
3903

636✔
3904
    listen(); // Throws
1,376✔
3905
}
1,376✔
3906

3907

3908
void ServerImpl::run()
3909
{
1,320✔
3910
    auto ta = util::make_temp_assign(m_running, true);
1,320✔
3911

608✔
3912
    {
1,320✔
3913
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
1,320✔
3914
        std::string name;
1,320✔
3915
        if (util::Thread::get_name(name)) {
1,320✔
3916
            name += "-worker";
712✔
3917
            worker_thread.start_with_signals_blocked(name); // Throws
712✔
3918
        }
712✔
3919
        else {
608✔
3920
            worker_thread.start_with_signals_blocked(); // Throws
608✔
3921
        }
608✔
3922

608✔
3923
        m_service.run(); // Throws
1,320✔
3924

608✔
3925
        worker_thread.stop_and_rethrow(); // Throws
1,320✔
3926
    }
1,320✔
3927

608✔
3928
    logger.info("Realm sync server stopped");
1,320✔
3929
}
1,320✔
3930

3931

3932
void ServerImpl::stop() noexcept
3933
{
2,214✔
3934
    util::LockGuard lock{m_mutex};
2,214✔
3935
    if (m_stopped)
2,214✔
3936
        return;
838✔
3937
    m_stopped = true;
1,376✔
3938
    m_wait_or_service_stopped_cond.notify_all();
1,376✔
3939
    m_service.stop();
1,376✔
3940
}
1,376✔
3941

3942

3943
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3944
{
23,634✔
3945
    m_pending_changesets_from_downstream_byte_size += byte_size;
23,634✔
3946
    logger.debug("Byte size for pending downstream changesets incremented by "
23,634✔
3947
                 "%1 to reach a total of %2",
23,634✔
3948
                 byte_size,
23,634✔
3949
                 m_pending_changesets_from_downstream_byte_size); // Throws
23,634✔
3950
}
23,634✔
3951

3952

3953
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3954
{
17,388✔
3955
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
17,388✔
3956
    m_pending_changesets_from_downstream_byte_size -= byte_size;
17,388✔
3957
    logger.debug("Byte size for pending downstream changesets decremented by "
17,388✔
3958
                 "%1 to reach a total of %2",
17,388✔
3959
                 byte_size,
17,388✔
3960
                 m_pending_changesets_from_downstream_byte_size); // Throws
17,388✔
3961
}
17,388✔
3962

3963

3964
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3965
{
1,054✔
3966
    return get_random();
1,054✔
3967
}
1,054✔
3968

3969

3970
void ServerImpl::listen()
3971
{
1,376✔
3972
    network::Resolver resolver{get_service()};
1,376✔
3973
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
1,376✔
3974
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
1,376✔
3975
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
1,376✔
3976

636✔
3977
    auto i = endpoints.begin();
1,376✔
3978
    auto end = endpoints.end();
1,376✔
3979
    for (;;) {
1,376✔
3980
        std::error_code ec;
1,376✔
3981
        m_acceptor.open(i->protocol(), ec);
1,376✔
3982
        if (!ec) {
1,376✔
3983
            using SocketBase = network::SocketBase;
1,374✔
3984
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
1,374✔
3985
            if (!ec) {
1,374✔
3986
                m_acceptor.bind(*i, ec);
1,372✔
3987
                if (!ec)
1,372✔
3988
                    break;
1,374✔
3989
            }
×
3990
            m_acceptor.close();
×
3991
        }
×
3992
        if (i + 1 == end) {
636!
3993
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
3994
                // FIXME: We don't have the error code for previous attempts, so
3995
                // can't print a nice message.
3996
                logger.error("Failed to bind to %1:%2", i2->address(),
×
3997
                             i2->port()); // Throws
×
3998
            }
×
3999
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
4000
                         ec.message()); // Throws
×
4001
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
4002
        }
×
4003
    }
2✔
4004

636✔
4005
    m_acceptor.listen(m_config.listen_backlog);
1,376✔
4006

636✔
4007
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
1,376✔
4008
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
1,366✔
4009
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
1,376✔
4010
                m_config.listen_backlog, ssl_mode); // Throws
1,376✔
4011

636✔
4012
    initiate_accept();
1,376✔
4013
}
1,376✔
4014

4015

4016
void ServerImpl::initiate_accept()
4017
{
3,382✔
4018
    auto handler = [this](std::error_code ec) {
2,642✔
4019
        if (ec != util::error::operation_aborted)
2,010✔
4020
            handle_accept(ec);
2,010✔
4021
    };
2,010✔
4022
    bool is_ssl = bool(m_ssl_context);
3,382✔
4023
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
3,382✔
4024
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
3,382✔
4025
}
3,382✔
4026

4027

4028
void ServerImpl::handle_accept(std::error_code ec)
4029
{
2,010✔
4030
    if (ec) {
2,010✔
4031
        if (ec != util::error::connection_aborted) {
×
4032
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4033

4034
            // We close the reserved files to get a few extra file descriptors.
4035
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4036
                m_reserved_files[i].reset();
×
4037
            }
×
4038

4039
            // FIXME: There are probably errors that need to be treated
4040
            // specially, and not cause the server to "crash".
4041

4042
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4043
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4044
                             "consider increasing the limit in your system config"); // Throws
×
4045
                throw OutOfFilesError(ec);
×
4046
            }
×
4047
            else {
×
4048
                throw std::system_error(ec);
×
4049
            }
×
4050
        }
×
4051
        logger.debug("Skipping aborted connection"); // Throws
×
4052
    }
×
4053
    else {
2,010✔
4054
        HTTPConnection& conn = *m_next_http_conn;
2,010✔
4055
        if (m_config.tcp_no_delay)
2,010✔
4056
            conn.get_socket().set_option(network::SocketBase::no_delay(true));       // Throws
1,700✔
4057
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn));      // Throws
2,010✔
4058
        Formatter& formatter = m_misc_buffers.formatter;
2,010✔
4059
        formatter.reset();
2,010✔
4060
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
2,010✔
4061
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
2,010✔
4062
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
2,010✔
4063
    }
2,010✔
4064
    initiate_accept(); // Throws
2,010✔
4065
}
2,010✔
4066

4067

4068
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4069
{
2,008✔
4070
    m_http_connections.erase(conn_id);
2,008✔
4071
}
2,008✔
4072

4073

4074
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4075
{
1,970✔
4076
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
1,970✔
4077
}
1,970✔
4078

4079

4080
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4081
{
1,148✔
4082
    m_sync_connections.erase(connection_id);
1,148✔
4083
}
1,148✔
4084

4085

4086
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4087
{
4✔
4088
    get_service().post([this, timeout](Status) {
4✔
4089
        m_config.connection_reaper_timeout = timeout;
4✔
4090
    });
4✔
4091
}
4✔
4092

4093

4094
void ServerImpl::close_connections()
4095
{
20✔
4096
    get_service().post([this](Status) {
20✔
4097
        do_close_connections(); // Throws
20✔
4098
    });
20✔
4099
}
20✔
4100

4101

4102
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4103
{
72✔
4104
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4105
}
72✔
4106

4107

4108
void ServerImpl::recognize_external_change(const std::string& virt_path)
4109
{
4,800✔
4110
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4111
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4112
        do_recognize_external_change(virt_path); // Throws
4,800✔
4113
    });                                          // Throws
4,800✔
4114
}
4,800✔
4115

4116

4117
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4118
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4119
{
×
4120
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4121
                "timeout = %1",
×
4122
                timeout); // Throws
×
4123

4124
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4125
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4126
                                                    timeout); // Throws
×
4127
    });
×
4128
}
×
4129

4130

4131
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4132
{
1,494✔
4133
    m_connection_reaper_timer.emplace(get_service());
1,494✔
4134
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
754✔
4135
        if (status != ErrorCodes::OperationAborted) {
120✔
4136
            reap_connections();                        // Throws
120✔
4137
            initiate_connection_reaper_timer(timeout); // Throws
120✔
4138
        }
120✔
4139
    }); // Throws
120✔
4140
}
1,494✔
4141

4142

4143
void ServerImpl::reap_connections()
4144
{
120✔
4145
    logger.debug("Discarding dead connections"); // Throws
120✔
4146
    SteadyTimePoint now = steady_clock_now();
120✔
4147
    {
120✔
4148
        auto end = m_http_connections.end();
120✔
4149
        auto i = m_http_connections.begin();
120✔
4150
        while (i != end) {
122✔
4151
            HTTPConnection& conn = *i->second;
2✔
4152
            ++i;
2✔
4153
            // Suicide
4154
            conn.terminate_if_dead(now); // Throws
2✔
4155
        }
2✔
4156
    }
120✔
4157
    {
120✔
4158
        auto end = m_sync_connections.end();
120✔
4159
        auto i = m_sync_connections.begin();
120✔
4160
        while (i != end) {
234✔
4161
            SyncConnection& conn = *i->second;
114✔
4162
            ++i;
114✔
4163
            // Suicide
106✔
4164
            conn.terminate_if_dead(now); // Throws
114✔
4165
        }
114✔
4166
    }
120✔
4167
}
120✔
4168

4169

4170
void ServerImpl::do_close_connections()
4171
{
20✔
4172
    for (auto& entry : m_sync_connections) {
20✔
4173
        SyncConnection& conn = *entry.second;
20✔
4174
        conn.initiate_soft_close(); // Throws
20✔
4175
    }
20✔
4176
}
20✔
4177

4178

4179
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4180
{
4,800✔
4181
    auto i = m_files.find(virt_path);
4,800✔
4182
    if (i == m_files.end())
4,800✔
4183
        return;
2✔
4184
    ServerFile& file = *i->second;
4,798✔
4185
    file.recognize_external_change();
4,798✔
4186
}
4,798✔
4187

4188

4189
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4190
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4191
{
×
4192
    static_cast<void>(timeout);
×
4193
    if (m_sync_stopped)
×
4194
        return;
×
4195
    do_close_connections(); // Throws
×
4196
    m_sync_stopped = true;
×
4197
    bool completion_reached = false;
×
4198
    completion_handler(completion_reached); // Throws
×
4199
}
×
4200

4201

4202
// ============================ SyncConnection implementation ============================
4203

4204
SyncConnection::~SyncConnection() noexcept
4205
{
1,970✔
4206
    m_sessions_enlisted_to_send.clear();
1,970✔
4207
    m_sessions.clear();
1,970✔
4208
}
1,970✔
4209

4210

4211
void SyncConnection::initiate()
4212
{
1,970✔
4213
    m_last_activity_at = steady_clock_now();
1,970✔
4214
    logger.debug("Sync Connection initiated");
1,970✔
4215
    m_websocket.initiate_server_websocket_after_handshake();
1,970✔
4216
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
1,970✔
4217
                     m_appservices_request_id);
1,970✔
4218
}
1,970✔
4219

4220

4221
template <class... Params>
4222
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4223
{
1,148✔
4224
    terminate_sessions();                              // Throws
1,148✔
4225
    logger.log(log_level, log_message, log_params...); // Throws
1,148✔
4226
    m_websocket.stop();
1,148✔
4227
    m_ssl_stream.reset();
1,148✔
4228
    m_socket.reset();
1,148✔
4229
    // Suicide
652✔
4230
    m_server.remove_sync_connection(m_id);
1,148✔
4231
}
1,148✔
4232

4233

4234
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4235
{
114✔
4236
    milliseconds_type time = steady_duration(m_last_activity_at, now);
114✔
4237
    const Server::Config& config = m_server.get_config();
114✔
4238
    if (m_is_closing) {
114✔
4239
        if (time >= config.soft_close_timeout) {
×
4240
            // Suicide
4241
            terminate(Logger::Level::detail,
×
4242
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4243
        }
×
4244
    }
×
4245
    else {
114✔
4246
        if (time >= config.connection_reaper_timeout) {
114✔
4247
            // Suicide
2✔
4248
            terminate(Logger::Level::detail,
4✔
4249
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4250
        }
4✔
4251
    }
114✔
4252
}
114✔
4253

4254

4255
void SyncConnection::enlist_to_send(Session* sess) noexcept
4256
{
112,170✔
4257
    REALM_ASSERT(m_send_trigger);
112,170✔
4258
    REALM_ASSERT(!m_is_closing);
112,170✔
4259
    REALM_ASSERT(!sess->is_enlisted_to_send());
112,170✔
4260
    m_sessions_enlisted_to_send.push_back(sess);
112,170✔
4261
    m_send_trigger->trigger();
112,170✔
4262
}
112,170✔
4263

4264

4265
void SyncConnection::handle_protocol_error(Status status)
4266
{
×
4267
    logger.error("%1", status);
×
4268
    switch (status.code()) {
×
4269
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4270
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4271
            break;
×
4272
        case ErrorCodes::LimitExceeded:
×
4273
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4274
            break;
×
4275
        default:
×
4276
            protocol_error(ProtocolError::other_error);
×
4277
            break;
×
4278
    }
×
4279
}
×
4280

4281
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4282
                                          std::string signed_user_token, bool need_client_file_ident,
4283
                                          bool is_subserver)
4284
{
6,976✔
4285
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
6,976✔
4286
    bool was_inserted = p.second;
6,976✔
4287
    if (REALM_UNLIKELY(!was_inserted)) {
6,976✔
4288
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4289
                     session_ident);                           // Throws
×
4290
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4291
        return;
×
4292
    }
×
4293
    try {
6,976✔
4294
        p.first->second.reset(new Session(*this, session_ident)); // Throws
6,976✔
4295
    }
6,976✔
4296
    catch (...) {
2,828✔
4297
        m_sessions.erase(p.first);
×
4298
        throw;
×
4299
    }
×
4300

2,828✔
4301
    Session& sess = *p.first->second;
6,976✔
4302
    sess.initiate(); // Throws
6,976✔
4303
    ProtocolError error;
6,976✔
4304
    bool success =
6,976✔
4305
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
6,976✔
4306
                                  error); // Throws
6,976✔
4307
    if (REALM_UNLIKELY(!success))         // Throws
6,976✔
4308
        protocol_error(error, &sess);     // Throws
2,842✔
4309
}
6,976✔
4310

4311

4312
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4313
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4314
                                           version_type scan_client_version, version_type latest_server_version,
4315
                                           salt_type latest_server_version_salt)
4316
{
6,156✔
4317
    auto i = m_sessions.find(session_ident);
6,156✔
4318
    if (REALM_UNLIKELY(i == m_sessions.end())) {
6,156✔
4319
        bad_session_ident("IDENT", session_ident); // Throws
×
4320
        return;
×
4321
    }
×
4322
    Session& sess = *i->second;
6,156✔
4323
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
6,156✔
4324
        message_after_unbind("IDENT", session_ident); // Throws
×
4325
        return;
×
4326
    }
×
4327
    if (REALM_UNLIKELY(sess.error_occurred())) {
6,156✔
4328
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
8✔
4329
        // messages, other than UNBIND, must be ignored.
8✔
4330
        return;
16✔
4331
    }
16✔
4332
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
6,140✔
4333
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4334
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4335
        return;
×
4336
    }
×
4337
    if (REALM_UNLIKELY(sess.ident_message_received())) {
6,140✔
4338
        logger.error("Received second IDENT message for session"); // Throws
×
4339
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4340
        return;
×
4341
    }
×
4342

2,148✔
4343
    ProtocolError error = {};
6,140✔
4344
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
6,140✔
4345
                                              scan_client_version, latest_server_version, latest_server_version_salt,
6,140✔
4346
                                              error); // Throws
6,140✔
4347
    if (REALM_UNLIKELY(!success))                     // Throws
6,140✔
4348
        protocol_error(error, &sess);                 // Throws
2,164✔
4349
}
6,140✔
4350

4351
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4352
                                            version_type progress_server_version, version_type locked_server_version,
4353
                                            const UploadChangesets& upload_changesets)
4354
{
46,848✔
4355
    auto i = m_sessions.find(session_ident);
46,848✔
4356
    if (REALM_UNLIKELY(i == m_sessions.end())) {
46,848✔
4357
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4358
        return;
×
4359
    }
×
4360
    Session& sess = *i->second;
46,848✔
4361
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
46,848✔
4362
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4363
        return;
×
4364
    }
×
4365
    if (REALM_UNLIKELY(sess.error_occurred())) {
46,848✔
4366
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4367
        // messages, other than UNBIND, must be ignored.
4368
        return;
×
4369
    }
×
4370
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
46,848✔
4371
        message_before_ident("UPLOAD", session_ident); // Throws
×
4372
        return;
×
4373
    }
×
4374

22,980✔
4375
    ProtocolError error = {};
46,848✔
4376
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
46,848✔
4377
                                               locked_server_version, upload_changesets, error); // Throws
46,848✔
4378
    if (REALM_UNLIKELY(!success))                                                                // Throws
46,848✔
4379
        protocol_error(error, &sess);                                                            // Throws
22,980✔
4380
}
46,848✔
4381

4382

4383
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4384
{
14,094✔
4385
    auto i = m_sessions.find(session_ident);
14,094✔
4386
    if (REALM_UNLIKELY(i == m_sessions.end())) {
14,094✔
4387
        bad_session_ident("MARK", session_ident);
×
4388
        return;
×
4389
    }
×
4390
    Session& sess = *i->second;
14,094✔
4391
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
14,094✔
4392
        message_after_unbind("MARK", session_ident); // Throws
×
4393
        return;
×
4394
    }
×
4395
    if (REALM_UNLIKELY(sess.error_occurred())) {
14,094✔
4396
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
24✔
4397
        // messages, other than UNBIND, must be ignored.
24✔
4398
        return;
48✔
4399
    }
48✔
4400
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
14,046✔
4401
        message_before_ident("MARK", session_ident); // Throws
×
4402
        return;
×
4403
    }
×
4404

5,970✔
4405
    ProtocolError error;
14,046✔
4406
    bool success = sess.receive_mark_message(request_ident, error); // Throws
14,046✔
4407
    if (REALM_UNLIKELY(!success))                                   // Throws
14,046✔
4408
        protocol_error(error, &sess);                               // Throws
5,970✔
4409
}
14,046✔
4410

4411

4412
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4413
{
4,222✔
4414
    auto i = m_sessions.find(session_ident); // Throws
4,222✔
4415
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,222✔
4416
        bad_session_ident("UNBIND", session_ident); // Throws
×
4417
        return;
×
4418
    }
×
4419
    Session& sess = *i->second;
4,222✔
4420
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,222✔
4421
        message_after_unbind("UNBIND", session_ident); // Throws
×
4422
        return;
×
4423
    }
×
4424

1,352✔
4425
    sess.receive_unbind_message(); // Throws
4,222✔
4426
    // NOTE: The session might have gotten destroyed at this time!
1,352✔
4427
}
4,222✔
4428

4429

4430
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4431
{
172✔
4432
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
172✔
4433
    m_send_pong = true;
172✔
4434
    m_last_ping_timestamp = timestamp;
172✔
4435
    if (!m_is_sending)
172✔
4436
        send_next_message();
172✔
4437
}
172✔
4438

4439

4440
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4441
                                           std::string_view error_body)
4442
{
×
4443
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4444
                 session_ident); // Throws
×
4445
    auto i = m_sessions.find(session_ident);
×
4446
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4447
        bad_session_ident("ERROR", session_ident);
×
4448
        return;
×
4449
    }
×
4450
    Session& sess = *i->second;
×
4451
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4452
        message_after_unbind("ERROR", session_ident); // Throws
×
4453
        return;
×
4454
    }
×
4455

4456
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4457
}
×
4458

4459
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4460
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4461
{
8,078✔
4462
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
8,078✔
4463
        return logger.log(level, message.c_str());
×
4464
    }
×
4465

3,140✔
4466
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
8,078✔
4467
    {
8,078✔
4468
        std::lock_guard lock(m_log_mutex);
8,078✔
4469
        m_log_messages.push(std::move(log_msg));
8,078✔
4470
    }
8,078✔
4471
    m_send_trigger->trigger();
8,078✔
4472
}
8,078✔
4473

4474

4475
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4476
{
×
4477
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4478
                 session_ident);                      // Throws
×
4479
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4480
}
×
4481

4482

4483
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4484
{
×
4485
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4486
                 session_ident);                      // Throws
×
4487
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4488
}
×
4489

4490

4491
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4492
{
×
4493
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4494
                 session_ident);                      // Throws
×
4495
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4496
}
×
4497

4498

4499
void SyncConnection::handle_message_received(const char* data, size_t size)
4500
{
78,462✔
4501
    // parse_message_received() parses the message and calls the
35,380✔
4502
    // proper handler on the SyncConnection object (this).
35,380✔
4503
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
78,462✔
4504
    return;
78,462✔
4505
}
78,462✔
4506

4507

4508
void SyncConnection::handle_ping_received(const char* data, size_t size)
4509
{
×
4510
    // parse_message_received() parses the message and calls the
4511
    // proper handler on the SyncConnection object (this).
4512
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4513
    return;
×
4514
}
×
4515

4516

4517
void SyncConnection::send_next_message()
4518
{
111,982✔
4519
    REALM_ASSERT(!m_is_sending);
111,982✔
4520
    REALM_ASSERT(!m_sending_pong);
111,982✔
4521
    if (m_send_pong) {
111,982✔
4522
        send_pong(m_last_ping_timestamp);
172✔
4523
        if (m_sending_pong)
172✔
4524
            return;
172✔
4525
    }
111,810✔
4526
    for (;;) {
163,626✔
4527
        Session* sess = m_sessions_enlisted_to_send.pop_front();
163,626✔
4528
        if (!sess) {
163,626✔
4529
            // No sessions were enlisted to send
23,616✔
4530
            if (REALM_LIKELY(!m_is_closing))
51,660✔
4531
                break; // Check to see if there are any log messages to go out
51,650✔
4532
            // Send a connection level ERROR
10✔
4533
            REALM_ASSERT(!is_session_level_error(m_error_code));
20✔
4534
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
20✔
4535
            return;
20✔
4536
        }
20✔
4537
        sess->send_message(); // Throws
111,966✔
4538
        // NOTE: The session might have gotten destroyed at this time!
54,490✔
4539

54,490✔
4540
        // At this point, `m_is_sending` is true if, and only if the session
54,490✔
4541
        // chose to send a message. If it chose to not send a message, we must
54,490✔
4542
        // loop back and give the next session in `m_sessions_enlisted_to_send`
54,490✔
4543
        // a chance.
54,490✔
4544
        if (m_is_sending)
111,966✔
4545
            return;
60,162✔
4546
    }
111,966✔
4547
    {
80,222✔
4548
        std::lock_guard lock(m_log_mutex);
51,628✔
4549
        if (!m_log_messages.empty()) {
51,628✔
4550
            send_log_message(m_log_messages.front());
8,020✔
4551
            m_log_messages.pop();
8,020✔
4552
        }
8,020✔
4553
    }
51,628✔
4554
    // Otherwise, nothing to do
23,600✔
4555
}
51,628✔
4556

4557

4558
void SyncConnection::initiate_write_output_buffer()
4559
{
68,184✔
4560
    auto handler = [this](std::error_code ec, size_t) {
68,136✔
4561
        if (!ec) {
68,126✔
4562
            handle_write_output_buffer();
67,954✔
4563
        }
67,954✔
4564
    };
68,126✔
4565

31,712✔
4566
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
68,184✔
4567
                                   std::move(handler)); // Throws
68,184✔
4568
    m_is_sending = true;
68,184✔
4569
}
68,184✔
4570

4571

4572
void SyncConnection::initiate_pong_output_buffer()
4573
{
172✔
4574
    auto handler = [this](std::error_code ec, size_t) {
172✔
4575
        if (!ec) {
172✔
4576
            handle_pong_output_buffer();
172✔
4577
        }
172✔
4578
    };
172✔
4579

68✔
4580
    REALM_ASSERT(!m_is_sending);
172✔
4581
    REALM_ASSERT(!m_sending_pong);
172✔
4582
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
172✔
4583
                                   std::move(handler)); // Throws
172✔
4584

68✔
4585
    m_is_sending = true;
172✔
4586
    m_sending_pong = true;
172✔
4587
}
172✔
4588

4589

4590
void SyncConnection::send_pong(milliseconds_type timestamp)
4591
{
172✔
4592
    REALM_ASSERT(m_send_pong);
172✔
4593
    REALM_ASSERT(!m_sending_pong);
172✔
4594
    m_send_pong = false;
172✔
4595
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
172✔
4596

68✔
4597
    OutputBuffer& out = get_output_buffer();
172✔
4598
    get_server_protocol().make_pong(out, timestamp); // Throws
172✔
4599

68✔
4600
    initiate_pong_output_buffer(); // Throws
172✔
4601
}
172✔
4602

4603
void SyncConnection::send_log_message(const LogMessage& log_msg)
4604
{
8,020✔
4605
    OutputBuffer& out = get_output_buffer();
8,020✔
4606
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
8,020✔
4607
                                           log_msg.co_id); // Throws
8,020✔
4608

3,126✔
4609
    initiate_write_output_buffer(); // Throws
8,020✔
4610
}
8,020✔
4611

4612

4613
void SyncConnection::handle_write_output_buffer()
4614
{
67,954✔
4615
    release_output_buffer();
67,954✔
4616
    m_is_sending = false;
67,954✔
4617
    send_next_message(); // Throws
67,954✔
4618
}
67,954✔
4619

4620

4621
void SyncConnection::handle_pong_output_buffer()
4622
{
172✔
4623
    release_output_buffer();
172✔
4624
    REALM_ASSERT(m_is_sending);
172✔
4625
    REALM_ASSERT(m_sending_pong);
172✔
4626
    m_is_sending = false;
172✔
4627
    m_sending_pong = false;
172✔
4628
    send_next_message(); // Throws
172✔
4629
}
172✔
4630

4631

4632
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4633
{
20✔
4634
    const char* message = get_protocol_error_message(int(error_code));
20✔
4635
    std::size_t message_size = std::strlen(message);
20✔
4636
    bool try_again = determine_try_again(error_code);
20✔
4637

10✔
4638
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
20✔
4639
                  message_size, try_again, session_ident); // Throws
20✔
4640

10✔
4641
    OutputBuffer& out = get_output_buffer();
20✔
4642
    int protocol_version = get_client_protocol_version();
20✔
4643
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
20✔
4644
                                             session_ident); // Throws
20✔
4645

10✔
4646
    auto handler = [this](std::error_code ec, size_t) {
20✔
4647
        handle_write_error(ec); // Throws
20✔
4648
    };
20✔
4649
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
20✔
4650
    m_is_sending = true;
20✔
4651
}
20✔
4652

4653

4654
void SyncConnection::handle_write_error(std::error_code ec)
4655
{
20✔
4656
    m_is_sending = false;
20✔
4657
    REALM_ASSERT(m_is_closing);
20✔
4658
    if (!m_ssl_stream) {
20✔
4659
        m_socket->shutdown(network::Socket::shutdown_send, ec);
20✔
4660
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
20!
4661
            throw std::system_error(ec);
×
4662
    }
20✔
4663
}
20✔
4664

4665

4666
// For connection level errors, `sess` is ignored. For session level errors, a
4667
// session must be specified.
4668
//
4669
// If a session is specified, that session object will have been detached from
4670
// the ServerFile object (and possibly destroyed) upon return from
4671
// protocol_error().
4672
//
4673
// If a session is specified for a protocol level error, that session object
4674
// will have been destroyed upon return from protocol_error(). For session level
4675
// errors, the specified session will have been destroyed upon return from
4676
// protocol_error() if, and only if the negotiated protocol version is less than
4677
// 23.
4678
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4679
{
80✔
4680
    REALM_ASSERT(!m_is_closing);
80✔
4681
    bool session_level = is_session_level_error(error_code);
80✔
4682
    REALM_ASSERT(!session_level || sess);
80✔
4683
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
80✔
4684
    if (logger.would_log(util::Logger::Level::debug)) {
80✔
4685
        const char* message = get_protocol_error_message(int(error_code));
×
4686
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4687
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4688
    }
×
4689
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
80✔
4690
    if (session_level) {
80✔
4691
        sess->initiate_deactivation(error_code); // Throws
80✔
4692
        return;
80✔
4693
    }
80✔
4694
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4695
}
×
4696

4697

4698
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4699
{
20✔
4700
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
20✔
4701

10✔
4702
    // With recent versions of the protocol (when the version is greater than,
10✔
4703
    // or equal to 23), this function will only be called for connection level
10✔
4704
    // errors, never for session specific errors. However, for the purpose of
10✔
4705
    // emulating earlier protocol versions, this function might be called for
10✔
4706
    // session specific errors too.
10✔
4707
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
20✔
4708
    REALM_ASSERT(!is_session_level_error(error_code));
20✔
4709

10✔
4710
    REALM_ASSERT(m_send_trigger);
20✔
4711
    REALM_ASSERT(!m_is_closing);
20✔
4712
    m_is_closing = true;
20✔
4713

10✔
4714
    m_error_code = error_code;
20✔
4715
    m_error_session_ident = session_ident;
20✔
4716

10✔
4717
    // Don't waste time and effort sending any other messages
10✔
4718
    m_send_pong = false;
20✔
4719
    m_sessions_enlisted_to_send.clear();
20✔
4720

10✔
4721
    m_receiving_session = nullptr;
20✔
4722

10✔
4723
    terminate_sessions(); // Throws
20✔
4724

10✔
4725
    m_send_trigger->trigger();
20✔
4726
}
20✔
4727

4728

4729
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4730
{
682✔
4731
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
594✔
4732
    // Suicide
404✔
4733
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
682✔
4734
}
682✔
4735

4736

4737
void SyncConnection::close_due_to_error(std::error_code ec)
4738
{
462✔
4739
    // Suicide
246✔
4740
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
462✔
4741
              ec.message()); // Throws
462✔
4742
}
462✔
4743

4744

4745
void SyncConnection::terminate_sessions()
4746
{
1,168✔
4747
    for (auto& entry : m_sessions) {
1,814✔
4748
        Session& sess = *entry.second;
1,814✔
4749
        sess.terminate(); // Throws
1,814✔
4750
    }
1,814✔
4751
    m_sessions_enlisted_to_send.clear();
1,168✔
4752
    m_sessions.clear();
1,168✔
4753
}
1,168✔
4754

4755

4756
void SyncConnection::initiate_soft_close()
4757
{
20✔
4758
    if (!m_is_closing) {
20✔
4759
        session_ident_type session_ident = 0;                                    // Not session specific
20✔
4760
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
20✔
4761
    }
20✔
4762
}
20✔
4763

4764

4765
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4766
{
4,216✔
4767
    m_sessions.erase(session_ident);
4,216✔
4768
}
4,216✔
4769

4770
} // anonymous namespace
4771

4772

4773
// ============================ sync::Server implementation ============================
4774

4775
class Server::Implementation : public ServerImpl {
4776
public:
4777
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4778
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
4779
    {
1,376✔
4780
    }
1,376✔
4781
    virtual ~Implementation() {}
1,376✔
4782
};
4783

4784

4785
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4786
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
4787
{
1,376✔
4788
}
1,376✔
4789

4790

4791
Server::Server(Server&& serv) noexcept
4792
    : m_impl{std::move(serv.m_impl)}
4793
{
×
4794
}
×
4795

4796

4797
Server::~Server() noexcept {}
1,376✔
4798

4799

4800
void Server::start()
4801
{
708✔
4802
    m_impl->start(); // Throws
708✔
4803
}
708✔
4804

4805

4806
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4807
{
668✔
4808
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
668✔
4809
}
668✔
4810

4811

4812
network::Endpoint Server::listen_endpoint() const
4813
{
1,382✔
4814
    return m_impl->listen_endpoint(); // Throws
1,382✔
4815
}
1,382✔
4816

4817

4818
void Server::run()
4819
{
1,320✔
4820
    m_impl->run(); // Throws
1,320✔
4821
}
1,320✔
4822

4823

4824
void Server::stop() noexcept
4825
{
2,214✔
4826
    m_impl->stop();
2,214✔
4827
}
2,214✔
4828

4829

4830
uint_fast64_t Server::errors_seen() const noexcept
4831
{
668✔
4832
    return m_impl->errors_seen;
668✔
4833
}
668✔
4834

4835

4836
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4837
                                                      milliseconds_type timeout)
4838
{
×
4839
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4840
}
×
4841

4842

4843
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4844
{
4✔
4845
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4846
}
4✔
4847

4848

4849
void Server::close_connections()
4850
{
20✔
4851
    m_impl->close_connections();
20✔
4852
}
20✔
4853

4854

4855
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4856
{
72✔
4857
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4858
}
72✔
4859

4860

4861
void Server::recognize_external_change(const std::string& virt_path)
4862
{
4,800✔
4863
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4864
}
4,800✔
4865

4866

4867
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4868
{
×
4869
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4870
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc