• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_281922

31 Oct 2023 09:13AM UTC coverage: 90.445% (-0.08%) from 90.528%
github_pull_request_281922

Pull #7039

Evergreen

jedelbo
Merge branch 'next-major' into je/global-key
Pull Request #7039: Remove ability to synchronize objects without primary key

95324 of 175822 branches covered (0.0%)

101 of 105 new or added lines in 13 files covered. (96.19%)

238 existing lines in 19 files now uncovered.

232657 of 257235 relevant lines covered (90.45%)

6351359.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.92
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/load_file.hpp>
29
#include <realm/util/memory_stream.hpp>
30
#include <realm/util/optional.hpp>
31
#include <realm/util/platform_info.hpp>
32
#include <realm/util/random.hpp>
33
#include <realm/util/safe_int_ops.hpp>
34
#include <realm/util/scope_exit.hpp>
35
#include <realm/util/scratch_allocator.hpp>
36
#include <realm/util/thread.hpp>
37
#include <realm/util/thread_exec_guard.hpp>
38
#include <realm/util/value_reset_guard.hpp>
39
#include <realm/version.hpp>
40

41
#include <algorithm>
42
#include <atomic>
43
#include <cctype>
44
#include <chrono>
45
#include <cmath>
46
#include <condition_variable>
47
#include <cstdint>
48
#include <cstdio>
49
#include <cstring>
50
#include <functional>
51
#include <locale>
52
#include <map>
53
#include <memory>
54
#include <queue>
55
#include <sstream>
56
#include <stdexcept>
57
#include <thread>
58
#include <vector>
59

60
// NOTE: The protocol specification is in `/doc/protocol.md`
61

62

63
// FIXME: Verify that session identifier spoofing cannot be used to get access
64
// to sessions belonging to other network conections in any way.
65
// FIXME: Seems that server must close connection with zero sessions after a
66
// certain timeout.
67

68

69
using namespace realm;
70
using namespace realm::sync;
71
using namespace realm::util;
72

73
// clang-format off
74
using ServerHistory         = _impl::ServerHistory;
75
using ServerProtocol        = _impl::ServerProtocol;
76
using ServerFileAccessCache = _impl::ServerFileAccessCache;
77
using ServerImplBase = _impl::ServerImplBase;
78

79
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
80
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
81
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
82
using IntegrationResult = ServerHistory::IntegrationResult;
83
using BootstrapError = ServerHistory::BootstrapError;
84
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
85
using ClientType = ServerHistory::ClientType;
86
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
87
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
88

89
using UploadChangeset = ServerProtocol::UploadChangeset;
90
// clang-format on
91

92

93
using UploadChangesets = std::vector<UploadChangeset>;
94

95
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
96

97

98
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
99
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
100
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
101
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
102

103

104
namespace {
105

106
enum class SchedStatus { done = 0, pending, in_progress };
107

108
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
109
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
110
{
×
111
    return "com.mongodb.realm-sync/";
×
112
}
×
113

114
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
115
{
28✔
116
    if (str.size() > cutoff) {
28✔
117
        return "..." + str.substr(str.size() - cutoff);
×
118
    }
×
119
    else {
28✔
120
        return str;
28✔
121
    }
28✔
122
}
28✔
123

124

125
class HttpListHeaderValueParser {
126
public:
127
    HttpListHeaderValueParser(std::string_view string) noexcept
128
        : m_string{string}
129
    {
1,962✔
130
    }
1,962✔
131
    bool next(std::string_view& elem) noexcept
132
    {
19,620✔
133
        while (m_pos < m_string.size()) {
19,622✔
134
            size_type i = m_pos;
17,658✔
135
            size_type j = m_string.find(',', i);
17,658✔
136
            if (j != std::string_view::npos) {
17,658✔
137
                m_pos = j + 1;
15,696✔
138
            }
15,696✔
139
            else {
1,962✔
140
                j = m_string.size();
1,962✔
141
                m_pos = j;
1,962✔
142
            }
1,962✔
143

8,838✔
144
            // Exclude leading and trailing white space
8,838✔
145
            while (i < j && is_http_lws(m_string[i]))
33,352✔
146
                ++i;
15,694✔
147
            while (j > i && is_http_lws(m_string[j - 1]))
17,658✔
148
                --j;
×
149

8,838✔
150
            if (i != j) {
17,658✔
151
                elem = m_string.substr(i, j - i);
17,656✔
152
                return true;
17,656✔
153
            }
17,656✔
154
        }
17,658✔
155
        return false;
10,800✔
156
    }
19,620✔
157

158
private:
159
    using size_type = std::string_view::size_type;
160
    const std::string_view m_string;
161
    size_type m_pos = 0;
162
    static bool is_http_lws(char ch) noexcept
163
    {
51,006✔
164
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
51,006✔
165
    }
51,006✔
166
};
167

168

169
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
170
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
171
using SteadyTimePoint = SteadyClock::time_point;
172

173
SteadyTimePoint steady_clock_now() noexcept
174
{
152,066✔
175
    return SteadyClock::now();
152,066✔
176
}
152,066✔
177

178
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
179
{
38,204✔
180
    auto duration = end_time - start_time;
38,204✔
181
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
38,204✔
182
    return milliseconds_type(millis_duration);
38,204✔
183
}
38,204✔
184

185

186
bool determine_try_again(ProtocolError error_code) noexcept
187
{
96✔
188
    return (error_code == ProtocolError::connection_closed);
96✔
189
}
96✔
190

191

192
class ServerFile;
193
class ServerImpl;
194
class HTTPConnection;
195
class SyncConnection;
196
class Session;
197

198

199
using Formatter = util::ResettableExpandableBufferOutputStream;
200
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
201

202
using ProtocolVersionRange = std::pair<int, int>;
203

204
class MiscBuffers {
205
public:
206
    Formatter formatter;
207
    OutputBuffer download_message;
208

209
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
210
    ProtocolVersionRanges protocol_version_ranges;
211

212
    std::vector<char> compress;
213

214
    MiscBuffers()
215
    {
7,964✔
216
        formatter.imbue(std::locale::classic());
7,964✔
217
        download_message.imbue(std::locale::classic());
7,964✔
218
    }
7,964✔
219
};
220

221

222
struct DownloadCache {
223
    std::unique_ptr<char[]> body;
224
    std::size_t uncompressed_body_size;
225
    std::size_t compressed_body_size;
226
    bool body_is_compressed;
227
    version_type end_version;
228
    DownloadCursor download_progress;
229
    std::uint_fast64_t downloadable_bytes;
230
    std::size_t num_changesets;
231
    std::size_t accum_original_size;
232
    std::size_t accum_compacted_size;
233
};
234

235

236
// An unblocked work unit is comprised of one Work object for each of the files
237
// that contribute work to the work unit, generally one reference file and a
238
// number of partial files.
239
class Work {
240
public:
241
    // In general, primary work is all forms of modifying work, including file
242
    // deletion.
243
    bool has_primary_work = false;
244

245
    // Only for reference files
246
    bool might_produce_new_sync_version = false;
247

248
    bool produced_new_realm_version = false;
249
    bool produced_new_sync_version = false;
250
    bool expired_reference_version = false;
251

252
    // True if, and only if changesets_from_downstream contains at least one
253
    // changeset.
254
    bool have_changesets_from_downstream = false;
255

256
    FileIdentAllocSlots file_ident_alloc_slots;
257
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
258
    IntegratableChangesets changesets_from_downstream;
259

260
    VersionInfo version_info;
261

262
    // Result of integration of changesets from downstream clients
263
    IntegrationResult integration_result;
264

265
    void reset() noexcept
266
    {
38,194✔
267
        has_primary_work = false;
38,194✔
268

19,060✔
269
        might_produce_new_sync_version = false;
38,194✔
270

19,060✔
271
        produced_new_realm_version = false;
38,194✔
272
        produced_new_sync_version = false;
38,194✔
273
        expired_reference_version = false;
38,194✔
274
        have_changesets_from_downstream = false;
38,194✔
275

19,060✔
276
        file_ident_alloc_slots.clear();
38,194✔
277
        changeset_buffers.clear();
38,194✔
278
        changesets_from_downstream.clear();
38,194✔
279

19,060✔
280
        version_info = {};
38,194✔
281
        integration_result = {};
38,194✔
282
    }
38,194✔
283
};
284

285

286
class WorkerState {
287
public:
288
    FileIdentAllocSlots file_ident_alloc_slots;
289
    util::ScratchMemory scratch_memory;
290
    bool use_file_cache = true;
291
    std::unique_ptr<ServerHistory> reference_hist;
292
    DBRef reference_sg;
293
};
294

295

296
// ============================ SessionQueue ============================
297

298
class SessionQueue {
299
public:
300
    void push_back(Session*) noexcept;
301
    Session* pop_front() noexcept;
302
    void clear() noexcept;
303

304
private:
305
    Session* m_back = nullptr;
306
};
307

308

309
// ============================ FileIdentReceiver ============================
310

311
class FileIdentReceiver {
312
public:
313
    virtual void receive_file_ident(SaltedFileIdent) = 0;
314

315
protected:
316
    ~FileIdentReceiver() {}
5,598✔
317
};
318

319

320
// ============================ WorkerBox =============================
321

322
class WorkerBox {
323
public:
324
    using JobType = util::UniqueFunction<void(WorkerState&)>;
325
    void add_work(WorkerState& state, JobType job)
326
    {
×
327
        std::unique_lock<std::mutex> lock(m_mutex);
×
328
        if (m_jobs.size() >= m_queue_limit) {
×
329
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
330
            // than to queue it.
×
331
            run_a_job(lock, state, job);
×
332
        }
×
333
        else {
×
334
            // Create worker threads on demand (if all existing threads are active):
×
335
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
336
                m_threads.emplace_back([this]() {
×
337
                    WorkerState state;
×
338
                    state.use_file_cache = false;
×
339
                    JobType the_job;
×
340
                    std::unique_lock<std::mutex> lock(m_mutex);
×
341
                    for (;;) {
×
342
                        while (m_jobs.empty() && !m_finish_up)
×
343
                            m_changes.wait(lock);
×
344
                        if (m_finish_up)
×
345
                            break; // terminate thread
×
346
                        the_job = std::move(m_jobs.back());
×
347
                        m_jobs.pop_back();
×
348
                        run_a_job(lock, state, the_job);
×
349
                        m_changes.notify_all();
×
350
                    }
×
351
                });
×
352
            }
×
353

×
354
            // Submit the job for execution:
×
355
            m_jobs.emplace_back(std::move(job));
×
356
            m_changes.notify_all();
×
357
        }
×
358
    }
×
359

360
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
361
    // propagation of exceptions.
362
    void wait_completion(WorkerState& state)
363
    {
×
364
        std::unique_lock<std::mutex> lock(m_mutex);
×
365
        while (!m_jobs.empty() || m_active > 0) {
×
366
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
367
                JobType the_job = std::move(m_jobs.back());
×
368
                m_jobs.pop_back();
×
369
                run_a_job(lock, state, the_job);
×
370
            }
×
371
            else {
×
372
                m_changes.wait(lock);
×
373
            }
×
374
        }
×
375
        if (m_epr) {
×
376
            std::rethrow_exception(m_epr);
×
377
        }
×
378
    }
×
379

380
    WorkerBox(unsigned int num_threads)
381
    {
×
382
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
383
        m_max_num_threads = num_threads;
×
384
    }
×
385

386
    ~WorkerBox()
387
    {
×
388
        {
×
389
            std::unique_lock<std::mutex> lock(m_mutex);
×
390
            m_finish_up = true;
×
391
            m_changes.notify_all();
×
392
        }
×
393
        for (auto& e : m_threads)
×
394
            e.join();
×
395
    }
×
396

397
private:
398
    std::mutex m_mutex;
399
    std::condition_variable m_changes;
400
    std::vector<std::thread> m_threads;
401
    std::vector<JobType> m_jobs;
402
    unsigned int m_active = 0;
403
    bool m_finish_up = false;
404
    unsigned int m_queue_limit = 0;
405
    unsigned int m_max_num_threads = 0;
406
    std::exception_ptr m_epr;
407

408
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
409
    {
×
410
        ++m_active;
×
411
        lock.unlock();
×
412
        try {
×
413
            job(state);
×
414
            lock.lock();
×
415
        }
×
416
        catch (...) {
×
417
            lock.lock();
×
418
            if (!m_epr)
×
419
                m_epr = std::current_exception();
×
420
        }
×
421
        --m_active;
×
422
    }
×
423
};
424

425

426
// ============================ ServerFile ============================
427

428
class ServerFile : public util::RefCountBase {
429
public:
430
    util::PrefixLogger logger;
431

432
    // Logger to be used by the worker thread
433
    util::PrefixLogger wlogger;
434

435
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
436
               bool disable_sync_to_disk);
437
    ~ServerFile() noexcept;
438

439
    void initialize();
440
    void activate();
441

442
    ServerImpl& get_server() noexcept
443
    {
40,888✔
444
        return m_server;
40,888✔
445
    }
40,888✔
446

447
    const std::string& get_real_path() const noexcept
448
    {
×
449
        return m_file.realm_path;
×
450
    }
×
451

452
    const std::string& get_virt_path() const noexcept
453
    {
×
454
        return m_file.virt_path;
×
455
    }
×
456

457
    ServerFileAccessCache::File& access()
458
    {
49,146✔
459
        return m_file.access(); // Throws
49,146✔
460
    }
49,146✔
461

462
    ServerFileAccessCache::File& worker_access()
463
    {
38,124✔
464
        return m_worker_file.access(); // Throws
38,124✔
465
    }
38,124✔
466

467
    version_type get_realm_version() const noexcept
468
    {
×
469
        return m_version_info.realm_version;
×
470
    }
×
471

472
    version_type get_sync_version() const noexcept
473
    {
×
474
        return m_version_info.sync_version.version;
×
475
    }
×
476

477
    SaltedVersion get_salted_sync_version() const noexcept
478
    {
100,802✔
479
        return m_version_info.sync_version;
100,802✔
480
    }
100,802✔
481

482
    DownloadCache& get_download_cache() noexcept;
483

484
    void register_client_access(file_ident_type client_file_ident);
485

486
    using file_ident_request_type = std::int_fast64_t;
487

488
    // Initiate a request for a new client file identifier.
489
    //
490
    // Unless the request is cancelled, the identifier will be delivered to the
491
    // receiver by way of an invocation of
492
    // FileIdentReceiver::receive_file_ident().
493
    //
494
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
495
    // until after request_file_ident() has returned (no callback reentrance).
496
    //
497
    // New client file identifiers will be delivered to receivers in the order
498
    // that they were requested.
499
    //
500
    // The returned value is a nonzero integer that can be used to cancel the
501
    // request before the file identifier is delivered using
502
    // cancel_file_ident_request().
503
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
504

505
    // Cancel the specified file identifier request.
506
    //
507
    // It is an error to call this function after the identifier has been
508
    // delivered.
509
    void cancel_file_ident_request(file_ident_request_type) noexcept;
510

511
    void add_unidentified_session(Session*);
512
    void identify_session(Session*, file_ident_type client_file_ident);
513

514
    void remove_unidentified_session(Session*) noexcept;
515
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
516

517
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
518

519
    bool can_add_changesets_from_downstream() const noexcept;
520
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
521
                                        version_type locked_server_version, const UploadChangeset*,
522
                                        std::size_t num_changesets);
523

524
    // bootstrap_client_session calls the function of same name in server_history
525
    // but corrects the upload_progress with information from pending
526
    // integratable changesets. A situation can occur where a client terminates
527
    // a session and starts a new session and re-uploads changesets that are known
528
    // by the ServerFile object but not by the ServerHistory.
529
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
530
                                            SaltedVersion server_version, ClientType client_type,
531
                                            UploadCursor& upload_progress, version_type& locked_server_version,
532
                                            Logger&);
533

534
    // NOTE: This function is executed by the worker thread
535
    void worker_process_work_unit(WorkerState&);
536

537
    void recognize_external_change();
538

539
private:
540
    ServerImpl& m_server;
541
    ServerFileAccessCache::Slot m_file;
542

543
    // In general, `m_version_info` refers to the last snapshot of the Realm
544
    // file that is supposed to be visible to remote peers engaging in regular
545
    // Realm file synchronization.
546
    VersionInfo m_version_info;
547

548
    file_ident_request_type m_last_file_ident_request = 0;
549

550
    // The set of sessions whose client file identifier is not yet known, i.e.,
551
    // those for which an IDENT message has not yet been received,
552
    std::set<Session*> m_unidentified_sessions;
553

554
    // A map of the sessions whose client file identifier is known, i.e, those
555
    // for which an IDENT message has been received.
556
    std::map<file_ident_type, Session*> m_identified_sessions;
557

558
    // Used when a file used as partial view wants to allocate a client file
559
    // identifier from the reference Realm.
560
    file_ident_request_type m_file_ident_request = 0;
561

562
    struct FileIdentRequestInfo {
563
        FileIdentReceiver* receiver;
564
        file_ident_type proxy_file;
565
        ClientType client_type;
566
    };
567

568
    // When nonempty, it counts towards outstanding blocked work (see
569
    // `m_has_blocked_work`).
570
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
571

572
    // Changesets received from the downstream clients, and waiting to be
573
    // integrated, as well as information about the clients progress in terms of
574
    // integrating changesets received from the server. When nonempty, it counts
575
    // towards outstanding blocked work (see `m_has_blocked_work`).
576
    //
577
    // At any given time, the set of changesets from a particular client-side
578
    // file may be comprised of changesets received via distinct sessions.
579
    //
580
    // See also `m_num_changesets_from_downstream`.
581
    IntegratableChangesets m_changesets_from_downstream;
582

583
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
584
    //
585
    // Its purpose is also to initialize
586
    // `Work::have_changesets_from_downstream`.
587
    std::size_t m_num_changesets_from_downstream = 0;
588

589
    // The total size, in bytes, of the changesets that were received from
590
    // clients, are targeting this file, and are currently part of the blocked
591
    // work unit.
592
    //
593
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
594
    // purpose is to allow the server to keep track of the accumulated size of
595
    // changesets being processed, or waiting to be processed (metric
596
    // `upload.pending.bytes`) (see
597
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
598
    //
599
    // Its purpose is also to enable the "very poor man's" backpressure solution
600
    // (see can_add_changesets_from_downstream()).
601
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
602

603
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
604
    // currently unblocked work unit.
605
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
606

607
    // When nonempty, it counts towards outstanding blocked work (see
608
    // `m_has_blocked_work`).
609
    std::vector<std::string> m_permission_changes;
610

611
    // True iff this file, or any of its associated partial files (when
612
    // applicable), has a nonzero amount of outstanding work that is currently
613
    // held back from being passed to the worker thread because a previously
614
    // accumulated chunk of work related to this file is currently in progress.
615
    bool m_has_blocked_work = false;
616

617
    // A file, that is not a partial file, is considered *exposed to the worker
618
    // thread* from the point in time where it is submitted to the worker
619
    // (Worker::enqueue()) and up until the point in time where
620
    // group_postprocess_stage_1() starts to execute. A partial file is
621
    // considered *exposed to the worker thread* precisely when the associated
622
    // reference file is exposed to the worker thread, but only if it was in
623
    // `m_reference_file->m_work.partial_files` at the point in time where the
624
    // reference file was passed to the worker.
625
    //
626
    // While this file is exposed to the worker thread, all members of `m_work`
627
    // other than `changesets_from_downstream` may be accessed and modified by
628
    // the worker thread only.
629
    //
630
    // While this file is exposed to the worker thread,
631
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
632
    // must not be modified by any thread. This special status of
633
    // `m_work.changesets_from_downstream` is required to allow
634
    // ServerFile::bootstrap_client_session() to read from it at any time.
635
    Work m_work;
636

637
    // For reference files, set to true when work is unblocked, and reset back
638
    // to false when the work finalization process completes
639
    // (group_postprocess_stage_3()). Always zero for partial files.
640
    bool m_has_work_in_progress = 0;
641

642
    // This one must only be accessed by the worker thread.
643
    //
644
    // More specifically, `m_worker_file.access()` must only be called by the
645
    // worker thread, and if it was ever called, it must be closed by the worker
646
    // thread before the ServerFile object is destroyed, if destruction happens
647
    // before the destruction of the server object itself.
648
    ServerFileAccessCache::Slot m_worker_file;
649

650
    std::vector<std::int_fast64_t> m_deleting_connections;
651

652
    DownloadCache m_download_cache;
653

654
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
655
    void on_work_added();
656
    void group_unblock_work();
657
    void unblock_work();
658

659
    /// Resume history scanning in all sessions bound to this file. To be called
660
    /// after a successfull integration of a changeset.
661
    void resume_download() noexcept;
662

663
    // NOTE: These functions are executed by the worker thread
664
    void worker_allocate_file_identifiers();
665
    bool worker_integrate_changes_from_downstream(WorkerState&);
666
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
667
                                           DBRef& sg_ptr);
668
    ServerHistory& get_reference_file_history(WorkerState& state);
669
    void group_postprocess_stage_1();
670
    void group_postprocess_stage_2();
671
    void group_postprocess_stage_3();
672
    void group_finalize_work_stage_1();
673
    void group_finalize_work_stage_2();
674
    void finalize_work_stage_1();
675
    void finalize_work_stage_2();
676
};
677

678

679
inline DownloadCache& ServerFile::get_download_cache() noexcept
680
{
39,136✔
681
    return m_download_cache;
39,136✔
682
}
39,136✔
683

684
inline void ServerFile::group_finalize_work_stage_1()
685
{
37,728✔
686
    finalize_work_stage_1(); // Throws
37,728✔
687
}
37,728✔
688

689
inline void ServerFile::group_finalize_work_stage_2()
690
{
37,730✔
691
    finalize_work_stage_2(); // Throws
37,730✔
692
}
37,730✔
693

694

695
// ============================ Worker ============================
696

697
// All write transaction on server-side Realm files performed on behalf of the
698
// server, must be performed by the worker thread, not the network event loop
699
// thread. This is to ensure that the network event loop thread never gets
700
// blocked waiting for the worker thread to end a long running write
701
// transaction.
702
//
703
// FIXME: Currently, the event loop thread does perform a number of write
704
// transactions, but only on subtier nodes of a star topology server cluster.
705
class Worker : public ServerHistory::Context {
706
public:
707
    std::shared_ptr<util::Logger> logger_ptr;
708
    util::Logger& logger;
709

710
    explicit Worker(ServerImpl&);
711

712
    ServerFileAccessCache& get_file_access_cache() noexcept;
713

714
    void enqueue(ServerFile*);
715

716
    // Overriding members of ServerHistory::Context
717
    std::mt19937_64& server_history_get_random() noexcept override final;
718
    sync::Transformer& get_transformer() override final;
719
    util::Buffer<char>& get_transform_buffer() override final;
720

721
private:
722
    ServerImpl& m_server;
723
    std::mt19937_64 m_random;
724
    const std::unique_ptr<Transformer> m_transformer;
725
    util::Buffer<char> m_transform_buffer;
726
    ServerFileAccessCache m_file_access_cache;
727

728
    util::Mutex m_mutex;
729
    util::CondVar m_cond; // Protected by `m_mutex`
730

731
    bool m_stop = false; // Protected by `m_mutex`
732

733
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
734

735
    WorkerState m_state;
736

737
    void run();
738
    void stop() noexcept;
739

740
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
741
};
742

743

744
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
745
{
1,080✔
746
    return m_file_access_cache;
1,080✔
747
}
1,080✔
748

749

750
// ============================ ServerImpl ============================
751

752
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
753
public:
754
    std::uint_fast64_t errors_seen = 0;
755

756
    std::atomic<milliseconds_type> m_par_time;
757
    std::atomic<milliseconds_type> m_seq_time;
758

759
    util::Mutex last_client_accesses_mutex;
760

761
    const std::shared_ptr<util::Logger> logger_ptr;
762
    util::Logger& logger;
763

764
    network::Service& get_service() noexcept
765
    {
78,840✔
766
        return m_service;
78,840✔
767
    }
78,840✔
768

769
    const network::Service& get_service() const noexcept
770
    {
×
771
        return m_service;
×
772
    }
×
773

774
    std::mt19937_64& get_random() noexcept
775
    {
62,504✔
776
        return m_random;
62,504✔
777
    }
62,504✔
778

779
    const Server::Config& get_config() const noexcept
780
    {
120,932✔
781
        return m_config;
120,932✔
782
    }
120,932✔
783

784
    std::size_t get_max_upload_backlog() const noexcept
785
    {
44,944✔
786
        return m_max_upload_backlog;
44,944✔
787
    }
44,944✔
788

789
    const std::string& get_root_dir() const noexcept
790
    {
5,598✔
791
        return m_root_dir;
5,598✔
792
    }
5,598✔
793

794
    network::ssl::Context& get_ssl_context() noexcept
795
    {
48✔
796
        return *m_ssl_context;
48✔
797
    }
48✔
798

799
    const AccessControl& get_access_control() const noexcept
800
    {
×
801
        return m_access_control;
×
802
    }
×
803

804
    ProtocolVersionRange get_protocol_version_range() const noexcept
805
    {
1,962✔
806
        return m_protocol_version_range;
1,962✔
807
    }
1,962✔
808

809
    ServerProtocol& get_server_protocol() noexcept
810
    {
131,262✔
811
        return m_server_protocol;
131,262✔
812
    }
131,262✔
813

814
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
815
    {
4,300✔
816
        return m_compress_memory_arena;
4,300✔
817
    }
4,300✔
818

819
    MiscBuffers& get_misc_buffers() noexcept
820
    {
45,394✔
821
        return m_misc_buffers;
45,394✔
822
    }
45,394✔
823

824
    int_fast64_t get_current_server_session_ident() const noexcept
825
    {
×
826
        return m_current_server_session_ident;
×
827
    }
×
828

829
    util::ScratchMemory& get_scratch_memory() noexcept
830
    {
×
831
        return m_scratch_memory;
×
832
    }
×
833

834
    Worker& get_worker() noexcept
835
    {
40,282✔
836
        return m_worker;
40,282✔
837
    }
40,282✔
838

839
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
840
    {
×
841
        parallel_section = m_par_time;
×
842
        sequential_section = m_seq_time;
×
843
    }
×
844

845
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
846
    ~ServerImpl() noexcept;
847

848
    void start();
849

850
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
851
    {
668✔
852
        m_config.listen_address = listen_address;
668✔
853
        m_config.listen_port = listen_port;
668✔
854
        m_config.reuse_address = reuse_address;
668✔
855

336✔
856
        start(); // Throws
668✔
857
    }
668✔
858

859
    network::Endpoint listen_endpoint() const
860
    {
7,970✔
861
        return m_acceptor.local_endpoint();
7,970✔
862
    }
7,970✔
863

864
    void run();
865
    void stop() noexcept;
866

867
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
868

869
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
870
    void remove_sync_connection(int_fast64_t connection_id);
871

872
    size_t get_number_of_http_connections()
873
    {
×
874
        return m_http_connections.size();
×
875
    }
×
876

877
    size_t get_number_of_sync_connections()
878
    {
×
879
        return m_sync_connections.size();
×
880
    }
×
881

882
    bool is_sync_stopped()
883
    {
40,158✔
884
        return m_sync_stopped;
40,158✔
885
    }
40,158✔
886

887
    const std::set<std::string>& get_realm_names() const noexcept
888
    {
×
889
        return m_realm_names;
×
890
    }
×
891

892
    // virt_path must be valid when get_or_create_file() is called.
893
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
894
    {
5,570✔
895
        util::bind_ptr<ServerFile> file = get_file(virt_path);
5,570✔
896
        if (REALM_LIKELY(file))
5,570✔
897
            return file;
4,926✔
898

436✔
899
        _impl::VirtualPathComponents virt_path_components =
1,080✔
900
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,080✔
901
        REALM_ASSERT(virt_path_components.is_valid);
1,080✔
902

436✔
903
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,080✔
904
        m_realm_names.insert(virt_path);         // Throws
1,080✔
905
        {
1,080✔
906
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,080✔
907
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,080✔
908
                                      disable_sync_to_disk)); // Throws
1,080✔
909
        }
1,080✔
910

436✔
911
        file->initialize();
1,080✔
912
        m_files[virt_path] = file; // Throws
1,080✔
913
        file->activate();          // Throws
1,080✔
914
        return file;
1,080✔
915
    }
1,080✔
916

917
    std::unique_ptr<ServerHistory> make_history_for_path()
918
    {
×
919
        return std::make_unique<ServerHistory>(*this);
×
920
    }
×
921

922
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
923
    {
5,570✔
924
        auto i = m_files.find(virt_path);
5,570✔
925
        if (REALM_LIKELY(i != m_files.end()))
5,570✔
926
            return i->second;
4,926✔
927
        return {};
1,080✔
928
    }
1,080✔
929

930
    // Returns the number of seconds since the Epoch of
931
    // std::chrono::system_clock.
932
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
933
    {
×
934
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
935
            return m_config.token_expiration_clock->now();
×
936
        return std::chrono::system_clock::now();
×
937
    }
×
938

939
    void set_connection_reaper_timeout(milliseconds_type);
940

941
    void close_connections();
942
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
943

944
    void recognize_external_change(const std::string& virt_path);
945

946
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
947
                                                  milliseconds_type timeout);
948

949
    // Server global outputbuffers that can be reused.
950
    // The server is single threaded, so there are no
951
    // synchronization issues.
952
    // output_buffers_count is equal to the
953
    // maximum number of buffers needed at any point.
954
    static constexpr int output_buffers_count = 1;
955
    OutputBuffer output_buffers[output_buffers_count];
956

957
    bool is_load_balancing_allowed() const
958
    {
×
959
        return m_allow_load_balancing;
×
960
    }
×
961

962
    // inc_byte_size_for_pending_downstream_changesets() must be called by
963
    // ServerFile objects when changesets from downstream clients have been
964
    // received.
965
    //
966
    // dec_byte_size_for_pending_downstream_changesets() must be called by
967
    // ServerFile objects when changesets from downstream clients have been
968
    // processed or discarded.
969
    //
970
    // ServerImpl uses this information to keep a running tally (metric
971
    // `upload.pending.bytes`) of the total byte size of pending changesets from
972
    // downstream clients.
973
    //
974
    // These functions must be called on the network thread.
975
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
976
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
977

978
    // Overriding member functions in _impl::ServerHistory::Context
979
    std::mt19937_64& server_history_get_random() noexcept override final;
980
    Transformer& get_transformer() noexcept override final;
981
    util::Buffer<char>& get_transform_buffer() noexcept override final;
982

983
private:
984
    Server::Config m_config;
985
    network::Service m_service;
986
    std::mt19937_64 m_random;
987
    const std::size_t m_max_upload_backlog;
988
    const std::string m_root_dir;
989
    const AccessControl m_access_control;
990
    const ProtocolVersionRange m_protocol_version_range;
991

992
    // The reserved files will be closed in situations where the server
993
    // runs out of file descriptors.
994
    std::unique_ptr<File> m_reserved_files[5];
995

996
    // The set of all Realm files known to this server, represented by their
997
    // virtual path.
998
    //
999
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
1000
    // reported by an invocation of _impl::get_realm_names()), then the
1001
    // corresponding virtual path is in `m_realm_names`, assuming no external
1002
    // file-system level intervention.
1003
    std::set<std::string> m_realm_names;
1004

1005
    std::unique_ptr<network::ssl::Context> m_ssl_context;
1006
    ServerFileAccessCache m_file_access_cache;
1007
    Worker m_worker;
1008
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1009
    network::Acceptor m_acceptor;
1010
    std::int_fast64_t m_next_conn_id = 0;
1011
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1012
    network::Endpoint m_next_http_conn_endpoint;
1013
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1014
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1015
    ServerProtocol m_server_protocol;
1016
    compression::CompressMemoryArena m_compress_memory_arena;
1017
    MiscBuffers m_misc_buffers;
1018
    std::unique_ptr<Transformer> m_transformer;
1019
    util::Buffer<char> m_transform_buffer;
1020
    int_fast64_t m_current_server_session_ident;
1021
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1022
    bool m_allow_load_balancing = false;
1023

1024
    util::Mutex m_mutex;
1025

1026
    bool m_stopped = false; // Protected by `m_mutex`
1027

1028
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1029
    // When m_sync_stopped is true, the server does not perform any sync.
1030
    bool m_sync_stopped = false;
1031

1032
    std::atomic<bool> m_running{false}; // Debugging facility
1033

1034
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1035

1036
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1037

1038
    util::ScratchMemory m_scratch_memory;
1039

1040
    void listen();
1041
    void initiate_accept();
1042
    void handle_accept(std::error_code);
1043

1044
    void reap_connections();
1045
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1046
    void do_close_connections();
1047

1048
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1049
    {
7,964✔
1050
        if (config.max_upload_backlog == 0)
7,964✔
1051
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
7,964✔
1052
        return config.max_upload_backlog;
×
1053
    }
×
1054

1055
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1056
    {
7,962✔
1057
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
7,962✔
1058
        const int actual_max = get_current_protocol_version();
7,962✔
1059
        static_assert(actual_min <= actual_max, "");
7,962✔
1060
        int min = actual_min;
7,962✔
1061
        int max = actual_max;
7,962✔
1062
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
7,962!
1063
            if (config.max_protocol_version < min)
×
1064
                throw Server::NoSupportedProtocolVersions();
×
1065
            max = config.max_protocol_version;
×
1066
        }
×
1067
        return {min, max};
7,962✔
1068
    }
7,962✔
1069

1070
    void do_recognize_external_change(const std::string& virt_path);
1071

1072
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1073
                                                     milliseconds_type timeout);
1074
};
1075

1076
// ============================ SyncConnection ============================
1077

1078
class SyncConnection : public websocket::Config {
1079
public:
1080
    const std::shared_ptr<util::Logger> logger_ptr;
1081
    util::Logger& logger;
1082

1083
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1084
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1085
    // Clients with sync protocol version less than 10 do not support log messages
1086
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1087

1088
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1089
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1090
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1091
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1092
        : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1093
                                                          serv.logger_ptr)} // Throws
1094
        , logger{*logger_ptr}
1095
        , m_server{serv}
1096
        , m_id{id}
1097
        , m_socket{std::move(socket)}
1098
        , m_ssl_stream{std::move(ssl_stream)}
1099
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
1100
        , m_websocket{*this}
1101
        , m_client_protocol_version{client_protocol_version}
1102
        , m_client_user_agent{std::move(client_user_agent)}
1103
        , m_remote_endpoint{std::move(remote_endpoint)}
1104
        , m_appservices_request_id{std::move(appservices_request_id)}
1105
    {
1,962✔
1106
        // Make the output buffer stream throw std::bad_alloc if it fails to
982✔
1107
        // expand the buffer
982✔
1108
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
1,962✔
1109

982✔
1110
        network::Service& service = m_server.get_service();
1,962✔
1111
        auto handler = [this](Status status) {
93,648✔
1112
            if (!status.is_ok())
93,648✔
1113
                return;
×
1114
            if (!m_is_sending)
93,648✔
1115
                send_next_message(); // Throws
40,910✔
1116
        };
93,648✔
1117
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
1,962✔
1118
    }
1,962✔
1119

1120
    ~SyncConnection() noexcept;
1121

1122
    ServerImpl& get_server() noexcept
1123
    {
110,522✔
1124
        return m_server;
110,522✔
1125
    }
110,522✔
1126

1127
    ServerProtocol& get_server_protocol() noexcept
1128
    {
131,266✔
1129
        return m_server.get_server_protocol();
131,266✔
1130
    }
131,266✔
1131

1132
    int get_client_protocol_version()
1133
    {
101,278✔
1134
        return m_client_protocol_version;
101,278✔
1135
    }
101,278✔
1136

1137
    const std::string& get_client_user_agent() const noexcept
1138
    {
5,568✔
1139
        return m_client_user_agent;
5,568✔
1140
    }
5,568✔
1141

1142
    const std::string& get_remote_endpoint() const noexcept
1143
    {
5,570✔
1144
        return m_remote_endpoint;
5,570✔
1145
    }
5,570✔
1146

1147
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1148
    {
1,962✔
1149
        return logger_ptr;
1,962✔
1150
    }
1,962✔
1151

1152
    std::mt19937_64& websocket_get_random() noexcept final override
1153
    {
61,422✔
1154
        return m_server.get_random();
61,422✔
1155
    }
61,422✔
1156

1157
    bool websocket_binary_message_received(const char* data, size_t size) final override
1158
    {
70,338✔
1159
        using sf = _impl::SimulatedFailure;
70,338✔
1160
        if (sf::check_trigger(sf::sync_server__read_head)) {
70,338✔
1161
            // Suicide
262✔
1162
            read_error(sf::sync_server__read_head);
478✔
1163
            return false;
478✔
1164
        }
478✔
1165
        // After a connection level error has occurred, all incoming messages
35,054✔
1166
        // will be ignored. By continuing to read until end of input, the server
35,054✔
1167
        // is able to know when the client closes the connection, which in
35,054✔
1168
        // general means that is has received the ERROR message.
35,054✔
1169
        if (REALM_LIKELY(!m_is_closing)) {
69,860✔
1170
            m_last_activity_at = steady_clock_now();
69,842✔
1171
            handle_message_received(data, size);
69,842✔
1172
        }
69,842✔
1173
        return true;
69,860✔
1174
    }
69,860✔
1175

1176
    bool websocket_ping_message_received(const char* data, size_t size) final override
1177
    {
×
1178
        if (REALM_LIKELY(!m_is_closing)) {
×
1179
            m_last_activity_at = steady_clock_now();
×
1180
            handle_ping_received(data, size);
×
1181
        }
×
1182
        return true;
×
1183
    }
×
1184

1185
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1186
    {
61,430✔
1187
        if (m_ssl_stream) {
61,430✔
1188
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1189
        }
70✔
1190
        else {
61,360✔
1191
            m_socket->async_write(data, size, std::move(handler)); // Throws
61,360✔
1192
        }
61,360✔
1193
    }
61,430✔
1194

1195
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1196
    {
212,470✔
1197
        if (m_ssl_stream) {
212,470✔
1198
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
176✔
1199
        }
176✔
1200
        else {
212,294✔
1201
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
212,294✔
1202
        }
212,294✔
1203
    }
212,470✔
1204

1205
    void async_read_until(char* buffer, size_t size, char delim,
1206
                          websocket::ReadCompletionHandler handler) final override
1207
    {
×
1208
        if (m_ssl_stream) {
×
1209
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1210
                                           std::move(handler)); // Throws
×
1211
        }
×
1212
        else {
×
1213
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1214
                                       std::move(handler)); // Throws
×
1215
        }
×
1216
    }
×
1217

1218
    void websocket_read_error_handler(std::error_code ec) final override
1219
    {
696✔
1220
        read_error(ec);
696✔
1221
    }
696✔
1222

1223
    void websocket_write_error_handler(std::error_code ec) final override
1224
    {
×
1225
        write_error(ec);
×
1226
    }
×
1227

1228
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
1229
                                           const std::string_view*) final override
1230
    {
×
1231
        // WebSocket class has already logged a message for this error
1232
        close_due_to_error(ec); // Throws
×
1233
    }
×
1234

1235
    void websocket_protocol_error_handler(std::error_code ec) final override
1236
    {
×
1237
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1238
        close_due_to_error(ec);                                              // Throws
×
1239
    }
×
1240

1241
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1242
    {
×
1243
        // This is not called since we handle HTTP request in handle_request_for_sync()
1244
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
×
1245
    }
×
1246

1247
    int_fast64_t get_id() const noexcept
1248
    {
×
1249
        return m_id;
×
1250
    }
×
1251

1252
    network::Socket& get_socket() noexcept
1253
    {
×
1254
        return *m_socket;
×
1255
    }
×
1256

1257
    void initiate();
1258

1259
    // Commits suicide
1260
    template <class... Params>
1261
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1262

1263
    // Commits suicide
1264
    void terminate_if_dead(SteadyTimePoint now);
1265

1266
    void enlist_to_send(Session*) noexcept;
1267

1268
    // Sessions should get the output_buffer and insert a message, after which
1269
    // they call initiate_write_output_buffer().
1270
    OutputBuffer& get_output_buffer()
1271
    {
61,430✔
1272
        m_output_buffer.reset();
61,430✔
1273
        return m_output_buffer;
61,430✔
1274
    }
61,430✔
1275

1276
    // More advanced memory strategies can be implemented if needed.
1277
    void release_output_buffer() {}
61,216✔
1278

1279
    // When this function is called, the connection will initiate a write with
1280
    // its output_buffer. Sessions use this method.
1281
    void initiate_write_output_buffer();
1282

1283
    void initiate_pong_output_buffer();
1284

1285
    void handle_protocol_error(Status status);
1286

1287
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1288
                              bool need_client_file_ident, bool is_subserver);
1289

1290
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1291
                               salt_type client_file_ident_salt, version_type scan_server_version,
1292
                               version_type scan_client_version, version_type latest_server_version,
1293
                               salt_type latest_server_version_salt);
1294

1295
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1296
                                version_type progress_server_version, version_type locked_server_version,
1297
                                const UploadChangesets&);
1298

1299
    void receive_mark_message(session_ident_type, request_ident_type);
1300

1301
    void receive_unbind_message(session_ident_type);
1302

1303
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1304

1305
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1306

1307
    void protocol_error(ProtocolError, Session* = nullptr);
1308

1309
    void initiate_soft_close();
1310

1311
    void discard_session(session_ident_type) noexcept;
1312

1313
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1314
                          std::optional<std::string> co_id = std::nullopt);
1315

1316
private:
1317
    ServerImpl& m_server;
1318
    const int_fast64_t m_id;
1319
    std::unique_ptr<network::Socket> m_socket;
1320
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1321
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1322

1323
    websocket::Socket m_websocket;
1324
    std::unique_ptr<char[]> m_input_body_buffer;
1325
    OutputBuffer m_output_buffer;
1326
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1327

1328
    // The protocol version in use by the connected client.
1329
    const int m_client_protocol_version;
1330

1331
    // The user agent description passed by the client.
1332
    const std::string m_client_user_agent;
1333

1334
    const std::string m_remote_endpoint;
1335

1336
    const std::string m_appservices_request_id;
1337

1338
    // A queue of sessions that have enlisted for an opportunity to send a
1339
    // message. Sessions will be served in the order that they enlist. A session
1340
    // can only occur once in this queue (linked list). If the queue is not
1341
    // empty, and no message is currently being written to the socket, the first
1342
    // session is taken out of the queue, and then granted an opportunity to
1343
    // send a message.
1344
    //
1345
    // Sessions will never be destroyed while in this queue. This is ensured
1346
    // because the connection owns the sessions that are associated with it, and
1347
    // the connection only removes a session from m_sessions at points in time
1348
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1349
    // (Connection::send_next_message() and Connection::~Connection()).
1350
    SessionQueue m_sessions_enlisted_to_send;
1351

1352
    Session* m_receiving_session = nullptr;
1353

1354
    bool m_is_sending = false;
1355
    bool m_is_closing = false;
1356

1357
    bool m_send_pong = false;
1358
    bool m_sending_pong = false;
1359

1360
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1361

1362
    milliseconds_type m_last_ping_timestamp = 0;
1363

1364
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1365
    // set to true (initiation of soft close). Otherwise, if no messages have
1366
    // been received from the client, this is the time at which the connection
1367
    // object was initiated (completion of WebSocket handshake). Otherwise this
1368
    // is the time at which the last message was received from the client.
1369
    SteadyTimePoint m_last_activity_at;
1370

1371
    // These are initialized by do_initiate_soft_close().
1372
    //
1373
    // With recent versions of the protocol (when the version is greater than,
1374
    // or equal to 23), `m_error_session_ident` is always zero.
1375
    ProtocolError m_error_code = {};
1376
    session_ident_type m_error_session_ident = 0;
1377

1378
    struct LogMessage {
1379
        session_ident_type sess_ident;
1380
        util::Logger::Level level;
1381
        std::string message;
1382
        std::optional<std::string> co_id;
1383
    };
1384

1385
    std::mutex m_log_mutex;
1386
    std::queue<LogMessage> m_log_messages;
1387

1388
    static std::string make_logger_prefix(int_fast64_t id)
1389
    {
1,962✔
1390
        std::ostringstream out;
1,962✔
1391
        out.imbue(std::locale::classic());
1,962✔
1392
        out << "Sync Connection[" << id << "]: "; // Throws
1,962✔
1393
        return out.str();                         // Throws
1,962✔
1394
    }
1,962✔
1395

1396
    // The return value of handle_message_received() designates whether
1397
    // message processing should continue. If the connection object is
1398
    // destroyed during execution of handle_message_received(), the return
1399
    // value must be false.
1400
    void handle_message_received(const char* data, size_t size);
1401

1402
    void handle_ping_received(const char* data, size_t size);
1403

1404
    void send_next_message();
1405
    void send_pong(milliseconds_type timestamp);
1406
    void send_log_message(const LogMessage& log_msg);
1407

1408
    void handle_write_output_buffer();
1409
    void handle_pong_output_buffer();
1410

1411
    void initiate_write_error(ProtocolError, session_ident_type);
1412
    void handle_write_error(std::error_code ec);
1413

1414
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1415
    void read_error(std::error_code);
1416
    void write_error(std::error_code);
1417

1418
    void close_due_to_close_by_client(std::error_code);
1419
    void close_due_to_error(std::error_code);
1420

1421
    void terminate_sessions();
1422

1423
    void bad_session_ident(const char* message_type, session_ident_type);
1424
    void message_after_unbind(const char* message_type, session_ident_type);
1425
    void message_before_ident(const char* message_type, session_ident_type);
1426
};
1427

1428

1429
inline void SyncConnection::read_error(std::error_code ec)
1430
{
1,174✔
1431
    REALM_ASSERT(ec != util::error::operation_aborted);
1,174✔
1432
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,174✔
1433
        // Suicide
378✔
1434
        close_due_to_close_by_client(ec); // Throws
696✔
1435
        return;
696✔
1436
    }
696✔
1437
    if (ec == util::MiscExtErrors::delim_not_found) {
478✔
1438
        logger.error("Input message head delimited not found"); // Throws
×
1439
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1440
        return;
×
1441
    }
×
1442

262✔
1443
    logger.error("Reading failed: %1", ec.message()); // Throws
478✔
1444

262✔
1445
    // Suicide
262✔
1446
    close_due_to_error(ec); // Throws
478✔
1447
}
478✔
1448

1449
inline void SyncConnection::write_error(std::error_code ec)
1450
{
×
1451
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1452
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1453
        // Suicide
1454
        close_due_to_close_by_client(ec); // Throws
×
1455
        return;
×
1456
    }
×
1457
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1458

1459
    // Suicide
1460
    close_due_to_error(ec); // Throws
×
1461
}
×
1462

1463

1464
// ============================ HTTPConnection ============================
1465

1466
std::string g_user_agent = "User-Agent";
1467

1468
class HTTPConnection {
1469
public:
1470
    const std::shared_ptr<Logger> logger_ptr;
1471
    util::Logger& logger;
1472

1473
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1474
        : logger_ptr{std::make_shared<PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1475
                                                    serv.logger_ptr)} // Throws
1476
        , logger{*logger_ptr}
1477
        , m_server{serv}
1478
        , m_id{id}
1479
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1480
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1481
        , m_http_server{*this, logger_ptr}
1482
    {
9,954✔
1483
        // Make the output buffer stream throw std::bad_alloc if it fails to
4,926✔
1484
        // expand the buffer
4,926✔
1485
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
9,954✔
1486

4,926✔
1487
        if (is_ssl) {
9,954✔
1488
            using namespace network::ssl;
48✔
1489
            Context& ssl_context = serv.get_ssl_context();
48✔
1490
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1491
                                                    Stream::server); // Throws
48✔
1492
        }
48✔
1493
    }
9,954✔
1494

1495
    ServerImpl& get_server() noexcept
1496
    {
×
1497
        return m_server;
×
1498
    }
×
1499

1500
    int_fast64_t get_id() const noexcept
1501
    {
1,992✔
1502
        return m_id;
1,992✔
1503
    }
1,992✔
1504

1505
    network::Socket& get_socket() noexcept
1506
    {
11,632✔
1507
        return *m_socket;
11,632✔
1508
    }
11,632✔
1509

1510
    template <class H>
1511
    void async_write(const char* data, size_t size, H handler)
1512
    {
1,982✔
1513
        if (m_ssl_stream) {
1,982✔
1514
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1515
        }
14✔
1516
        else {
1,968✔
1517
            m_socket->async_write(data, size, std::move(handler)); // Throws
1,968✔
1518
        }
1,968✔
1519
    }
1,982✔
1520

1521
    template <class H>
1522
    void async_read(char* buffer, size_t size, H handler)
1523
    {
8✔
1524
        if (m_ssl_stream) {
8✔
1525
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1526
                                     std::move(handler)); // Throws
×
1527
        }
×
1528
        else {
8✔
1529
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1530
                                 std::move(handler)); // Throws
8✔
1531
        }
8✔
1532
    }
8✔
1533

1534
    template <class H>
1535
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1536
    {
17,716✔
1537
        if (m_ssl_stream) {
17,716✔
1538
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1539
                                           std::move(handler)); // Throws
126✔
1540
        }
126✔
1541
        else {
17,590✔
1542
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
17,590✔
1543
                                       std::move(handler)); // Throws
17,590✔
1544
        }
17,590✔
1545
    }
17,716✔
1546

1547
    void initiate(std::string remote_endpoint)
1548
    {
1,992✔
1549
        m_last_activity_at = steady_clock_now();
1,992✔
1550
        m_remote_endpoint = std::move(remote_endpoint);
1,992✔
1551

998✔
1552
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
1,992✔
1553

998✔
1554
        if (m_ssl_stream) {
1,992✔
1555
            initiate_ssl_handshake(); // Throws
24✔
1556
        }
24✔
1557
        else {
1,968✔
1558
            initiate_http(); // Throws
1,968✔
1559
        }
1,968✔
1560
    }
1,992✔
1561

1562
    void respond_200_ok()
1563
    {
×
1564
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1565
    }
×
1566

1567
    void respond_404_not_found()
1568
    {
×
1569
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1570
    }
×
1571

1572
    void respond_503_service_unavailable()
1573
    {
×
1574
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1575
    }
×
1576

1577
    // Commits suicide
1578
    template <class... Params>
1579
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1580
    {
30✔
1581
        logger.log(log_level, log_message, log_params...); // Throws
30✔
1582
        m_ssl_stream.reset();
30✔
1583
        m_socket.reset();
30✔
1584
        m_server.remove_http_connection(m_id); // Suicide
30✔
1585
    }
30✔
1586

1587
    // Commits suicide
1588
    void terminate_if_dead(SteadyTimePoint now)
1589
    {
2✔
1590
        milliseconds_type time = steady_duration(m_last_activity_at, now);
2✔
1591
        const Server::Config& config = m_server.get_config();
2✔
1592
        if (m_is_sending) {
2✔
1593
            if (time >= config.http_response_timeout) {
2✔
1594
                // Suicide
1595
                terminate(Logger::Level::detail,
×
1596
                          "HTTP connection closed (request timeout)"); // Throws
×
1597
            }
×
1598
        }
2✔
1599
        else {
×
1600
            if (time >= config.http_request_timeout) {
×
1601
                // Suicide
1602
                terminate(Logger::Level::detail,
×
1603
                          "HTTP connection closed (response timeout)"); // Throws
×
1604
            }
×
1605
        }
×
1606
    }
2✔
1607

1608
    std::string get_appservices_request_id() const
1609
    {
1,982✔
1610
        return m_appservices_request_id.to_string();
1,982✔
1611
    }
1,982✔
1612

1613
private:
1614
    ServerImpl& m_server;
1615
    const int_fast64_t m_id;
1616
    const ObjectId m_appservices_request_id = ObjectId::gen();
1617
    std::unique_ptr<network::Socket> m_socket;
1618
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1619
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1620
    HTTPServer<HTTPConnection> m_http_server;
1621
    OutputBuffer m_output_buffer;
1622
    bool m_is_sending = false;
1623
    SteadyTimePoint m_last_activity_at;
1624
    std::string m_remote_endpoint;
1625
    int m_negotiated_protocol_version = 0;
1626

1627
    void initiate_ssl_handshake()
1628
    {
24✔
1629
        auto handler = [this](std::error_code ec) {
24✔
1630
            if (ec != util::error::operation_aborted)
24✔
1631
                handle_ssl_handshake(ec); // Throws
24✔
1632
        };
24✔
1633
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1634
    }
24✔
1635

1636
    void handle_ssl_handshake(std::error_code ec)
1637
    {
24✔
1638
        if (ec) {
24✔
1639
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
10✔
1640
            close_due_to_error(ec);                                         // Throws
10✔
1641
            return;
10✔
1642
        }
10✔
1643
        initiate_http(); // Throws
14✔
1644
    }
14✔
1645

1646
    void initiate_http()
1647
    {
1,982✔
1648
        logger.debug("Connection initiates HTTP receipt");
1,982✔
1649

992✔
1650
        auto handler = [this](HTTPRequest request, std::error_code ec) {
1,982✔
1651
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
1,982✔
1652
                return;
992✔
1653
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
1,982✔
1654
                logger.error("Malformed HTTP request");
×
1655
                close_due_to_error(ec); // Throws
×
1656
                return;
×
1657
            }
×
1658
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
1,982✔
1659
                logger.error("Bad HTTP request");
8✔
1660
                const char* body = "The HTTP request was corrupted";
8✔
1661
                handle_400_bad_request(body); // Throws
8✔
1662
                return;
8✔
1663
            }
8✔
1664
            if (REALM_UNLIKELY(ec)) {
1,974✔
1665
                read_error(ec); // Throws
×
1666
                return;
×
1667
            }
×
1668
            handle_http_request(std::move(request)); // Throws
1,974✔
1669
        };
1,974✔
1670
        m_http_server.async_receive_request(std::move(handler)); // Throws
1,982✔
1671
    }
1,982✔
1672

1673
    void handle_http_request(const HTTPRequest& request)
1674
    {
1,974✔
1675
        StringData path = request.path;
1,974✔
1676

988✔
1677
        logger.debug("HTTP request received, request = %1", request);
1,974✔
1678

988✔
1679
        m_is_sending = true;
1,974✔
1680
        m_last_activity_at = steady_clock_now();
1,974✔
1681

988✔
1682
        // FIXME: When thinking of this function as a switching device, it seem
988✔
1683
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
988✔
1684
        // supposed to be mandatory, then that check ought to be delegated to
988✔
1685
        // handle_request_for_sync(), as that will yield a sharper separation of
988✔
1686
        // concerns.
988✔
1687
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
1,974✔
1688
            handle_request_for_sync(request); // Throws
1,962✔
1689
        }
1,962✔
1690
        else {
12✔
1691
            handle_404_not_found(request); // Throws
12✔
1692
        }
12✔
1693
    }
1,974✔
1694

1695
    void handle_request_for_sync(const HTTPRequest& request)
1696
    {
1,962✔
1697
        if (m_server.is_sync_stopped()) {
1,962✔
1698
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1699
                         "stopped"); // Throws
×
1700
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1701
                                                    "connections"); // Throws
×
1702
            return;
×
1703
        }
×
1704

982✔
1705
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
1,962✔
1706

982✔
1707
        // Figure out whether there are any protocol versions supported by both
982✔
1708
        // the client and the server, and if so, choose the newest one of them.
982✔
1709
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
1,962✔
1710
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
1,962✔
1711
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
1,962✔
1712
        {
1,962✔
1713
            protocol_version_ranges.clear();
1,962✔
1714
            util::MemoryInputStream in;
1,962✔
1715
            in.imbue(std::locale::classic());
1,962✔
1716
            in.unsetf(std::ios_base::skipws);
1,962✔
1717
            std::string_view value;
1,962✔
1718
            if (sec_websocket_protocol)
1,962✔
1719
                value = *sec_websocket_protocol;
1,962✔
1720
            HttpListHeaderValueParser parser{value};
1,962✔
1721
            std::string_view elem;
1,962✔
1722
            while (parser.next(elem)) {
19,618✔
1723
                // FIXME: Use std::string_view::begins_with() in C++20.
8,836✔
1724
                const StringData protocol{elem};
17,656✔
1725
                std::string_view prefix;
17,656✔
1726
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
17,656✔
1727
                    prefix = get_pbs_websocket_protocol_prefix();
17,658✔
1728
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
2,147,483,647!
1729
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1730
                if (!prefix.empty()) {
17,658✔
1731
                    auto parse_version = [&](std::string_view str) {
17,658✔
1732
                        in.set_buffer(str.data(), str.data() + str.size());
17,656✔
1733
                        int version = 0;
17,656✔
1734
                        in >> version;
17,656✔
1735
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
17,656✔
1736
                            return version;
17,658✔
1737
                        return -1;
2,147,483,647✔
1738
                    };
2,147,483,647✔
1739
                    int min, max;
17,658✔
1740
                    std::string_view range = elem.substr(prefix.size());
17,658✔
1741
                    auto i = range.find('-');
17,658✔
1742
                    if (i != std::string_view::npos) {
17,658✔
1743
                        min = parse_version(range.substr(0, i));
×
1744
                        max = parse_version(range.substr(i + 1));
×
1745
                    }
×
1746
                    else {
17,658✔
1747
                        min = parse_version(range);
17,658✔
1748
                        max = min;
17,658✔
1749
                    }
17,658✔
1750
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
17,658✔
1751
                        protocol_version_ranges.emplace_back(min, max); // Throws
17,658✔
1752
                        continue;
17,658✔
1753
                    }
17,658✔
1754
                    logger.error("Protocol version negotiation failed: Client sent malformed "
×
1755
                                 "specification of supported protocol versions: '%1'",
×
1756
                                 elem); // Throws
×
1757
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
×
1758
                                           "specification of supported protocol "
×
1759
                                           "versions\n"); // Throws
×
1760
                    return;
×
1761
                }
×
1762
                logger.warn("Unrecognized protocol token in HTTP response header "
2,147,483,647✔
1763
                            "Sec-WebSocket-Protocol: '%1'",
2,147,483,647✔
1764
                            elem); // Throws
2,147,483,647✔
1765
            }
2,147,483,647✔
1766
            if (protocol_version_ranges.empty()) {
1,962✔
1767
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1768
                             "specification of supported protocol versions"); // Throws
×
1769
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1770
                                       "of supported protocol versions\n"); // Throws
×
1771
                return;
×
1772
            }
×
1773
        }
1,962✔
1774
        {
1,962✔
1775
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
1,962✔
1776
            int server_min = server_range.first;
1,962✔
1777
            int server_max = server_range.second;
1,962✔
1778
            int best_match = 0;
1,962✔
1779
            int overall_client_min = std::numeric_limits<int>::max();
1,962✔
1780
            int overall_client_max = std::numeric_limits<int>::min();
1,962✔
1781
            for (const auto& range : protocol_version_ranges) {
17,658✔
1782
                int client_min = range.first;
17,658✔
1783
                int client_max = range.second;
17,658✔
1784
                if (client_max >= server_min && client_min <= server_max) {
17,658✔
1785
                    // Overlap
8,838✔
1786
                    int version = std::min(client_max, server_max);
17,658✔
1787
                    if (version > best_match) {
17,658✔
1788
                        best_match = version;
1,962✔
1789
                    }
1,962✔
1790
                }
17,658✔
1791
                if (client_min < overall_client_min)
17,658✔
1792
                    overall_client_min = client_min;
17,658✔
1793
                if (client_max > overall_client_max)
17,658✔
1794
                    overall_client_max = client_max;
1,962✔
1795
            }
17,658✔
1796
            Formatter& formatter = misc_buffers.formatter;
1,962✔
1797
            if (REALM_UNLIKELY(best_match == 0)) {
1,962✔
1798
                const char* elaboration = "No version supported by both client and server";
×
1799
                const char* identifier_hint = nullptr;
×
1800
                if (overall_client_max < server_min) {
×
1801
                    // Client is too old
1802
                    elaboration = "Client is too old for server";
×
1803
                    identifier_hint = "CLIENT_TOO_OLD";
×
1804
                }
×
1805
                else if (overall_client_min > server_max) {
×
1806
                    // Client is too new
1807
                    elaboration = "Client is too new for server";
×
1808
                    identifier_hint = "CLIENT_TOO_NEW";
×
1809
                }
×
1810
                auto format_ranges = [&](const auto& list) {
×
1811
                    bool nonfirst = false;
×
1812
                    for (auto range : list) {
×
1813
                        if (nonfirst)
×
1814
                            formatter << ", "; // Throws
×
1815
                        int min = range.first, max = range.second;
×
1816
                        REALM_ASSERT(min <= max);
×
1817
                        formatter << min;
×
1818
                        if (max != min)
×
1819
                            formatter << "-" << max;
×
1820
                        nonfirst = true;
×
1821
                    }
×
1822
                };
×
1823
                using Range = ProtocolVersionRange;
×
1824
                formatter.reset();
×
1825
                format_ranges(protocol_version_ranges); // Throws
×
1826
                logger.error("Protocol version negotiation failed: %1 "
×
1827
                             "(client supports: %2)",
×
1828
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1829
                formatter.reset();
×
1830
                formatter << "Protocol version negotiation failed: "
×
1831
                             ""
×
1832
                          << elaboration << ".\n\n";                                   // Throws
×
1833
                formatter << "Server supports: ";                                      // Throws
×
1834
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1835
                formatter << "\n";                                                     // Throws
×
1836
                formatter << "Client supports: ";                                      // Throws
×
1837
                format_ranges(protocol_version_ranges);                                // Throws
×
1838
                formatter << "\n\n";                                                   // Throws
×
1839
                formatter << "REALM_SYNC_PROTOCOL_MISMATCH";                           // Throws
×
1840
                if (identifier_hint)
×
1841
                    formatter << ":" << identifier_hint;                      // Throws
×
1842
                formatter << "\n";                                            // Throws
×
1843
                handle_400_bad_request({formatter.data(), formatter.size()}); // Throws
×
1844
                return;
×
1845
            }
×
1846
            m_negotiated_protocol_version = best_match;
1,962✔
1847
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
1,962✔
1848
                         m_negotiated_protocol_version); // Throws
1,962✔
1849
            formatter.reset();
1,962✔
1850
        }
1,962✔
1851

982✔
1852
        std::string sec_websocket_protocol_2;
1,962✔
1853
        {
1,962✔
1854
            std::string_view prefix =
1,962✔
1855
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
1,962✔
1856
                    ? get_old_pbs_websocket_protocol_prefix()
982✔
1857
                    : get_pbs_websocket_protocol_prefix();
1,962✔
1858
            std::ostringstream out;
1,962✔
1859
            out.imbue(std::locale::classic());
1,962✔
1860
            out << prefix << m_negotiated_protocol_version; // Throws
1,962✔
1861
            sec_websocket_protocol_2 = std::move(out).str();
1,962✔
1862
        }
1,962✔
1863

982✔
1864
        std::error_code ec;
1,962✔
1865
        util::Optional<HTTPResponse> response =
1,962✔
1866
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
1,962✔
1867

982✔
1868
        if (ec) {
1,962✔
1869
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1870
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1871
            }
×
1872
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1873
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1874
            }
×
1875
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1876
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1877
            }
×
1878
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1879
                logger.error("The header Sec-WebSocket-Key is missing");
×
1880
            }
×
1881

1882
            logger.error("The HTTP request with the error is:\n%1", request);
×
1883
            logger.error("Check the proxy configuration and make sure that the "
×
1884
                         "HTTP request is a valid Websocket request.");
×
1885
            close_due_to_error(ec);
×
1886
            return;
×
1887
        }
×
1888
        REALM_ASSERT(response);
1,962✔
1889
        add_common_http_response_headers(*response);
1,962✔
1890

982✔
1891
        std::string user_agent;
1,962✔
1892
        {
1,962✔
1893
            auto i = request.headers.find(g_user_agent);
1,962✔
1894
            if (i != request.headers.end())
1,962✔
1895
                user_agent = i->second; // Throws (copy)
1,962✔
1896
        }
1,962✔
1897

982✔
1898
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
1,962✔
1899
                        this](std::error_code ec) {
1,962✔
1900
            // If the operation is aborted, the socket object may have been destroyed.
982✔
1901
            if (ec != util::error::operation_aborted) {
1,962✔
1902
                if (ec) {
1,962✔
1903
                    write_error(ec);
×
1904
                    return;
×
1905
                }
×
1906

982✔
1907
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
1,962✔
1908
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
1,962✔
1909
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
1,962✔
1910
                    get_appservices_request_id()); // Throws
1,962✔
1911
                SyncConnection& sync_conn_ref = *sync_conn;
1,962✔
1912
                m_server.add_sync_connection(m_id, std::move(sync_conn));
1,962✔
1913
                m_server.remove_http_connection(m_id);
1,962✔
1914
                sync_conn_ref.initiate();
1,962✔
1915
            }
1,962✔
1916
        };
1,962✔
1917
        m_http_server.async_send_response(*response, std::move(handler));
1,962✔
1918
    }
1,962✔
1919

1920
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1921
    {
20✔
1922
        std::string body_2 = std::string(body); // Throws
20✔
1923

10✔
1924
        HTTPResponse response;
20✔
1925
        response.status = http_status;
20✔
1926
        add_common_http_response_headers(response);
20✔
1927
        response.headers["Connection"] = "close";
20✔
1928

10✔
1929
        if (!body_2.empty()) {
20✔
1930
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1931
            response.body = std::move(body_2);
20✔
1932
        }
20✔
1933

10✔
1934
        auto handler = [this](std::error_code ec) {
20✔
1935
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1936
                return;
10✔
1937
            if (REALM_UNLIKELY(ec)) {
20✔
1938
                write_error(ec);
×
1939
                return;
×
1940
            }
×
1941
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1942
        };
20✔
1943
        m_http_server.async_send_response(response, std::move(handler));
20✔
1944
    }
20✔
1945

1946
    void handle_400_bad_request(std::string_view body)
1947
    {
8✔
1948
        logger.detail("400 Bad Request");
8✔
1949
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1950
    }
8✔
1951

1952
    void handle_404_not_found(const HTTPRequest&)
1953
    {
12✔
1954
        logger.detail("404 Not Found"); // Throws
12✔
1955
        handle_text_response(HTTPStatus::NotFound,
12✔
1956
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1957
    }
12✔
1958

1959
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1960
    {
×
1961
        logger.debug("503 Service Unavailable");                       // Throws
×
1962
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1963
    }
×
1964

1965
    void add_common_http_response_headers(HTTPResponse& response)
1966
    {
1,982✔
1967
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
1,982✔
1968
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
1,982✔
1969
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
10✔
1970
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1971
        }
20✔
1972
    }
1,982✔
1973

1974
    void read_error(std::error_code ec)
1975
    {
×
1976
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1977
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
×
1978
            // Suicide
1979
            close_due_to_close_by_client(ec); // Throws
×
1980
            return;
×
1981
        }
×
1982
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1983
            logger.error("Input message head delimited not found"); // Throws
×
1984
            close_due_to_error(ec);                                 // Throws
×
1985
            return;
×
1986
        }
×
1987

1988
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1989

1990
        // Suicide
1991
        close_due_to_error(ec); // Throws
×
1992
    }
×
1993

1994
    void write_error(std::error_code ec)
1995
    {
×
1996
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1997
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1998
            // Suicide
1999
            close_due_to_close_by_client(ec); // Throws
×
2000
            return;
×
2001
        }
×
2002
        logger.error("Writing failed: %1", ec.message()); // Throws
×
2003

2004
        // Suicide
2005
        close_due_to_error(ec); // Throws
×
2006
    }
×
2007

2008
    void close_due_to_close_by_client(std::error_code ec)
2009
    {
×
2010
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
×
2011
        // Suicide
2012
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
×
2013
    }
×
2014

2015
    void close_due_to_error(std::error_code ec)
2016
    {
10✔
2017
        // Suicide
6✔
2018
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
10✔
2019
                  ec.message()); // Throws
10✔
2020
    }
10✔
2021

2022
    static std::string make_logger_prefix(int_fast64_t id)
2023
    {
9,956✔
2024
        std::ostringstream out;
9,956✔
2025
        out.imbue(std::locale::classic());
9,956✔
2026
        out << "HTTP Connection[" << id << "]: "; // Throws
9,956✔
2027
        return out.str();                         // Throws
9,956✔
2028
    }
9,956✔
2029
};
2030

2031

2032
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2033
public:
2034
    std::size_t num_changesets = 0;
2035
    std::size_t accum_original_size = 0;
2036
    std::size_t accum_compacted_size = 0;
2037

2038
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2039
        : m_protocol{protocol}
2040
        , m_buffer{buffer}
2041
        , m_logger{logger}
2042
    {
39,132✔
2043
    }
39,132✔
2044

2045
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2046
    {
40,972✔
2047
        version_type client_version = entry.remote_version;
40,972✔
2048
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
40,972✔
2049
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
40,972✔
2050
        ++num_changesets;
40,972✔
2051
        accum_original_size += original_size;
40,972✔
2052
        accum_compacted_size += entry.changeset.size();
40,972✔
2053
    }
40,972✔
2054

2055
private:
2056
    ServerProtocol& m_protocol;
2057
    OutputBuffer& m_buffer;
2058
    util::Logger& m_logger;
2059
};
2060

2061

2062
// ============================ Session ============================
2063

2064
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2065
//   Protocol             ent file    IDENT    message   message   Error     message
2066
//   state                identifier  message  received  received  occurred  sent
2067
// ---------------------------------------------------------------------------------
2068
//   AllocatingIdent      yes         yes      no        no        no        no
2069
//   SendIdent            no          yes      no        no        no        no
2070
//   WaitForIdent         no          no       no        no        no        no
2071
//   WaitForUnbind        maybe       no       yes       no        no        no
2072
//   SendError            maybe       maybe    maybe     no        yes       no
2073
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2074
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2075
//
2076
//
2077
//   Condition                      Expression
2078
// ----------------------------------------------------------
2079
//   Need client file identifier    need_client_file_ident()
2080
//   Send IDENT message             must_send_ident_message()
2081
//   IDENT message received         ident_message_received()
2082
//   UNBIND message received        unbind_message_received()
2083
//   Error occurred                 error_occurred()
2084
//   ERROR message sent             m_error_message_sent
2085
//
2086
//
2087
//   Protocol
2088
//   state                Will send              Can receive
2089
// -----------------------------------------------------------------------
2090
//   AllocatingIdent      none                   UNBIND
2091
//   SendIdent            IDENT                  UNBIND
2092
//   WaitForIdent         none                   IDENT, UNBIND
2093
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2094
//                        MARK, ALLOC            ALLOC, UNBIND
2095
//   SendError            ERROR                  any
2096
//   WaitForUnbindErr     none                   any
2097
//   SendUnbound          UNBOUND                none
2098
//
2099
class Session final : private FileIdentReceiver {
2100
public:
2101
    util::PrefixLogger logger;
2102

2103
    Session(SyncConnection& conn, session_ident_type session_ident)
2104
        : logger{util::LogCategory::server, make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2105
        , m_connection{conn}
2106
        , m_session_ident{session_ident}
2107
    {
5,598✔
2108
    }
5,598✔
2109

2110
    ~Session() noexcept
2111
    {
5,596✔
2112
        REALM_ASSERT(!is_enlisted_to_send());
5,596✔
2113
        detach_from_server_file();
5,596✔
2114
    }
5,596✔
2115

2116
    SyncConnection& get_connection() noexcept
2117
    {
39,148✔
2118
        return m_connection;
39,148✔
2119
    }
39,148✔
2120

2121
    const Optional<std::array<char, 64>>& get_encryption_key()
2122
    {
×
2123
        return m_connection.get_server().get_config().encryption_key;
×
2124
    }
×
2125

2126
    session_ident_type get_session_ident() const noexcept
2127
    {
152✔
2128
        return m_session_ident;
152✔
2129
    }
152✔
2130

2131
    ServerProtocol& get_server_protocol() noexcept
2132
    {
55,218✔
2133
        return m_connection.get_server_protocol();
55,218✔
2134
    }
55,218✔
2135

2136
    bool need_client_file_ident() const noexcept
2137
    {
6,824✔
2138
        return (m_file_ident_request != 0);
6,824✔
2139
    }
6,824✔
2140

2141
    bool must_send_ident_message() const noexcept
2142
    {
4,154✔
2143
        return m_send_ident_message;
4,154✔
2144
    }
4,154✔
2145

2146
    bool ident_message_received() const noexcept
2147
    {
333,512✔
2148
        return m_client_file_ident != 0;
333,512✔
2149
    }
333,512✔
2150

2151
    bool unbind_message_received() const noexcept
2152
    {
336,312✔
2153
        return m_unbind_message_received;
336,312✔
2154
    }
336,312✔
2155

2156
    bool error_occurred() const noexcept
2157
    {
328,236✔
2158
        return int(m_error_code) != 0;
328,236✔
2159
    }
328,236✔
2160

2161
    bool relayed_alloc_request_in_progress() const noexcept
2162
    {
×
2163
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2164
    }
×
2165

2166
    // Returns the file identifier (always a nonzero value) of the client side
2167
    // file if ident_message_received() returns true. Otherwise it returns zero.
2168
    file_ident_type get_client_file_ident() const noexcept
2169
    {
×
2170
        return m_client_file_ident;
×
2171
    }
×
2172

2173
    void initiate()
2174
    {
5,598✔
2175
        logger.detail("Session initiated", m_session_ident); // Throws
5,598✔
2176
    }
5,598✔
2177

2178
    void terminate()
2179
    {
4,488✔
2180
        logger.detail("Session terminated", m_session_ident); // Throws
4,488✔
2181
    }
4,488✔
2182

2183
    // Initiate the deactivation process, if it has not been initiated already
2184
    // by the client.
2185
    //
2186
    // IMPORTANT: This function must not be called with protocol versions
2187
    // earlier than 23.
2188
    //
2189
    // The deactivation process will eventually lead to termination of the
2190
    // session.
2191
    //
2192
    // The session will detach itself from the server file when the deactivation
2193
    // process is initiated, regardless of whether it is initiated by the
2194
    // client, or by calling this function.
2195
    void initiate_deactivation(ProtocolError error_code)
2196
    {
76✔
2197
        REALM_ASSERT(is_session_level_error(error_code));
76✔
2198
        REALM_ASSERT(!error_occurred()); // Must only be called once
76✔
2199

34✔
2200
        // If the UNBIND message has been received, then the client has
34✔
2201
        // initiated the deactivation process already.
34✔
2202
        if (REALM_LIKELY(!unbind_message_received())) {
76✔
2203
            detach_from_server_file();
76✔
2204
            m_error_code = error_code;
76✔
2205
            // Protocol state is now SendError
34✔
2206
            ensure_enlisted_to_send();
76✔
2207
            return;
76✔
2208
        }
76✔
2209
        // Protocol state was SendUnbound, and remains unchanged
34✔
2210
    }
76✔
2211

2212
    bool is_enlisted_to_send() const noexcept
2213
    {
263,048✔
2214
        return m_next != nullptr;
263,048✔
2215
    }
263,048✔
2216

2217
    void ensure_enlisted_to_send() noexcept
2218
    {
51,420✔
2219
        if (!is_enlisted_to_send())
51,420✔
2220
            enlist_to_send();
49,938✔
2221
    }
51,420✔
2222

2223
    void enlist_to_send() noexcept
2224
    {
105,252✔
2225
        m_connection.enlist_to_send(this);
105,252✔
2226
    }
105,252✔
2227

2228
    // Overriding memeber function in FileIdentReceiver
2229
    void receive_file_ident(SaltedFileIdent file_ident) override final
2230
    {
1,334✔
2231
        // Protocol state must be AllocatingIdent or WaitForUnbind
542✔
2232
        if (!ident_message_received()) {
1,334✔
2233
            REALM_ASSERT(need_client_file_ident());
1,334✔
2234
            REALM_ASSERT(m_send_ident_message);
1,334✔
2235
        }
1,334✔
2236
        else {
×
2237
            REALM_ASSERT(!m_send_ident_message);
×
2238
        }
×
2239
        REALM_ASSERT(!unbind_message_received());
1,334✔
2240
        REALM_ASSERT(!error_occurred());
1,334✔
2241
        REALM_ASSERT(!m_error_message_sent);
1,334✔
2242

542✔
2243
        m_file_ident_request = 0;
1,334✔
2244
        m_allocated_file_ident = file_ident;
1,334✔
2245

542✔
2246
        // If the protocol state was AllocatingIdent, it is now SendIdent,
542✔
2247
        // otherwise it continues to be WaitForUnbind.
542✔
2248

542✔
2249
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,334✔
2250
                     file_ident.salt); // Throws
1,334✔
2251

542✔
2252
        ensure_enlisted_to_send();
1,334✔
2253
    }
1,334✔
2254

2255
    // Called by the associated connection object when this session is granted
2256
    // an opportunity to initiate the sending of a message.
2257
    //
2258
    // This function may lead to the destruction of the session object
2259
    // (suicide).
2260
    void send_message()
2261
    {
104,816✔
2262
        if (REALM_LIKELY(!unbind_message_received())) {
104,816✔
2263
            if (REALM_LIKELY(!error_occurred())) {
102,210✔
2264
                if (REALM_LIKELY(ident_message_received())) {
102,134✔
2265
                    // State is WaitForUnbind.
53,236✔
2266
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
100,798✔
2267
                    if (REALM_LIKELY(!relayed_alloc)) {
100,798✔
2268
                        // Send DOWNLOAD or MARK.
53,236✔
2269
                        continue_history_scan(); // Throws
100,796✔
2270
                        // Session object may have been
53,236✔
2271
                        // destroyed at this point (suicide)
53,236✔
2272
                        return;
100,796✔
2273
                    }
100,796✔
2274
                    send_alloc_message(); // Throws
2✔
2275
                    return;
2✔
2276
                }
2✔
2277
                // State is SendIdent
542✔
2278
                send_ident_message(); // Throws
1,336✔
2279
                return;
1,336✔
2280
            }
1,336✔
2281
            // State is SendError
32✔
2282
            send_error_message(); // Throws
76✔
2283
            return;
76✔
2284
        }
76✔
2285
        // State is SendUnbound
1,052✔
2286
        send_unbound_message(); // Throws
2,606✔
2287
        terminate();            // Throws
2,606✔
2288
        m_connection.discard_session(m_session_ident);
2,606✔
2289
        // This session is now destroyed!
1,052✔
2290
    }
2,606✔
2291

2292
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2293
                              bool is_subserver, ProtocolError& error)
2294
    {
5,598✔
2295
        if (logger.would_log(util::Logger::Level::info)) {
5,598✔
2296
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
×
2297
                          "need_client_file_ident=%3, is_subserver=%4)",
×
2298
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
×
2299
                          int(is_subserver)); // Throws
×
2300
        }
×
2301

2,734✔
2302
        ServerImpl& server = m_connection.get_server();
5,598✔
2303
        _impl::VirtualPathComponents virt_path_components =
5,598✔
2304
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
5,598✔
2305

2,734✔
2306
        if (!virt_path_components.is_valid) {
5,598✔
2307
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2308
                         "signed_user_token='%2')",
28✔
2309
                         path,
28✔
2310
                         short_token_fmt(signed_user_token)); // Throws
28✔
2311
            error = ProtocolError::illegal_realm_path;
28✔
2312
            return false;
28✔
2313
        }
28✔
2314

2,720✔
2315
        // The user has proper permissions at this stage.
2,720✔
2316

2,720✔
2317
        m_server_file = server.get_or_create_file(path); // Throws
5,570✔
2318

2,720✔
2319
        m_server_file->add_unidentified_session(this); // Throws
5,570✔
2320

2,720✔
2321
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
5,570✔
2322
                    m_connection.get_client_protocol_version(),
5,570✔
2323
                    m_connection.get_client_user_agent()); // Throws
5,570✔
2324

2,720✔
2325
        m_is_subserver = is_subserver;
5,570✔
2326
        if (REALM_LIKELY(!need_client_file_ident)) {
5,570✔
2327
            // Protocol state is now WaitForUnbind
1,632✔
2328
            return true;
3,050✔
2329
        }
3,050✔
2330

1,088✔
2331
        // FIXME: We must make a choice about client file ident for read only
1,088✔
2332
        // sessions. They should have a special read-only client file ident.
1,088✔
2333
        file_ident_type proxy_file = 0; // No proxy
2,520✔
2334
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
2,520✔
2335
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
2,520✔
2336
        m_send_ident_message = true;
2,520✔
2337
        // Protocol state is now AllocatingIdent
1,088✔
2338

1,088✔
2339
        return true;
2,520✔
2340
    }
2,520✔
2341

2342
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2343
                               version_type scan_server_version, version_type scan_client_version,
2344
                               version_type latest_server_version, salt_type latest_server_version_salt,
2345
                               ProtocolError& error)
2346
    {
4,154✔
2347
        // Protocol state must be WaitForIdent
2,108✔
2348
        REALM_ASSERT(!need_client_file_ident());
4,154✔
2349
        REALM_ASSERT(!m_send_ident_message);
4,154✔
2350
        REALM_ASSERT(!ident_message_received());
4,154✔
2351
        REALM_ASSERT(!unbind_message_received());
4,154✔
2352
        REALM_ASSERT(!error_occurred());
4,154✔
2353
        REALM_ASSERT(!m_error_message_sent);
4,154✔
2354

2,108✔
2355
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
4,154✔
2356
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
4,154✔
2357
                     "latest_server_version_salt=%6)",
4,154✔
2358
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
4,154✔
2359
                     latest_server_version, latest_server_version_salt); // Throws
4,154✔
2360

2,108✔
2361
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
4,154✔
2362
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
4,154✔
2363
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
4,154✔
2364
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
4,154✔
2365
        UploadCursor upload_threshold = {0, 0};
4,154✔
2366
        version_type locked_server_version = 0;
4,154✔
2367
        BootstrapError error_2 =
4,154✔
2368
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
4,154✔
2369
                                                    client_type, upload_threshold, locked_server_version,
4,154✔
2370
                                                    logger); // Throws
4,154✔
2371
        switch (error_2) {
4,154✔
2372
            case BootstrapError::no_error:
4,120✔
2373
                break;
4,120✔
2374
            case BootstrapError::client_file_expired:
✔
2375
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2376
                error = ProtocolError::client_file_expired;
×
2377
                return false;
×
2378
            case BootstrapError::bad_client_file_ident:
✔
2379
                logger.error("Bad client file ident (%1) in IDENT message",
×
2380
                             client_file_ident); // Throws
×
2381
                error = ProtocolError::bad_client_file_ident;
×
2382
                return false;
×
2383
            case BootstrapError::bad_client_file_ident_salt:
4✔
2384
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2385
                             client_file_ident_salt); // Throws
4✔
2386
                error = ProtocolError::diverging_histories;
4✔
2387
                return false;
4✔
2388
            case BootstrapError::bad_download_server_version:
✔
2389
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2390
                error = ProtocolError::bad_server_version;
×
2391
                return false;
×
2392
            case BootstrapError::bad_download_client_version:
4✔
2393
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2394
                error = ProtocolError::bad_client_version;
4✔
2395
                return false;
4✔
2396
            case BootstrapError::bad_server_version:
20✔
2397
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2398
                error = ProtocolError::bad_server_version;
20✔
2399
                return false;
20✔
2400
            case BootstrapError::bad_server_version_salt:
4✔
2401
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2402
                error = ProtocolError::diverging_histories;
4✔
2403
                return false;
4✔
2404
            case BootstrapError::bad_client_type:
✔
2405
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2406
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2407
                                                              // `bad_client_type`.
2408
                return false;
×
2409
        }
4,120✔
2410

2,090✔
2411
        // Make sure there is no other session currently associcated with the
2,090✔
2412
        // same client-side file
2,090✔
2413
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
4,120✔
2414
            SyncConnection& other_conn = other_sess->get_connection();
×
2415
            // It is a protocol violation if the other session is associated
2416
            // with the same connection
2417
            if (&other_conn == &m_connection) {
×
2418
                logger.error("Client file already bound in other session associated with "
×
2419
                             "the same connection"); // Throws
×
2420
                error = ProtocolError::bound_in_other_session;
×
2421
                return false;
×
2422
            }
×
2423
            // When the other session is associated with a different connection
2424
            // (`other_conn`), the clash may be due to the server not yet having
2425
            // realized that the other connection has been closed by the
2426
            // client. If so, the other connention is a "zombie". In the
2427
            // interest of getting rid of zombie connections as fast as
2428
            // possible, we shall assume that a clash with a session in another
2429
            // connection is always due to that other connection being a
2430
            // zombie. And when such a situation is detected, we want to close
2431
            // the zombie connection immediately.
2432
            auto log_level = Logger::Level::detail;
×
2433
            other_conn.terminate(log_level,
×
2434
                                 "Sync connection closed (superseded session)"); // Throws
×
2435
        }
×
2436

2,090✔
2437
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
4,120✔
2438

2,090✔
2439
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
4,120✔
2440
                                                                  m_session_ident, client_file_ident));
4,120✔
2441

2,090✔
2442
        m_server_file->identify_session(this, client_file_ident); // Throws
4,120✔
2443

2,090✔
2444
        m_client_file_ident = client_file_ident;
4,120✔
2445
        m_download_progress = download_progress;
4,120✔
2446
        m_upload_threshold = upload_threshold;
4,120✔
2447
        m_locked_server_version = locked_server_version;
4,120✔
2448

2,090✔
2449
        ServerImpl& server = m_connection.get_server();
4,120✔
2450
        const Server::Config& config = server.get_config();
4,120✔
2451
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
4,120✔
2452

2,090✔
2453
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
4,120✔
2454
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2455
                                              client_file_ident); // Throws
×
2456
        }
×
2457

2,090✔
2458
        // Protocol  state is now WaitForUnbind
2,090✔
2459
        enlist_to_send();
4,120✔
2460
        return true;
4,120✔
2461
    }
4,120✔
2462

2463
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2464
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2465
                                ProtocolError& error)
2466
    {
44,942✔
2467
        // Protocol state must be WaitForUnbind
22,860✔
2468
        REALM_ASSERT(!m_send_ident_message);
44,942✔
2469
        REALM_ASSERT(ident_message_received());
44,942✔
2470
        REALM_ASSERT(!unbind_message_received());
44,942✔
2471
        REALM_ASSERT(!error_occurred());
44,942✔
2472
        REALM_ASSERT(!m_error_message_sent);
44,942✔
2473

22,860✔
2474
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
44,942✔
2475
                      "locked_server_version=%3, num_changesets=%4)",
44,942✔
2476
                      progress_client_version, progress_server_version, locked_server_version,
44,942✔
2477
                      upload_changesets.size()); // Throws
44,942✔
2478

22,860✔
2479
        // We are unable to reproduce the cursor object for the upload progress
22,860✔
2480
        // when the protocol version is less than 29, because the client does
22,860✔
2481
        // not provide the required information. When the protocol version is
22,860✔
2482
        // less than 25, we can always get a consistent cursor by taking it from
22,860✔
2483
        // the changeset that was uploaded last, but in protocol versions 25,
22,860✔
2484
        // 26, 27, and 28, things are more complicated. Here, we receive new
22,860✔
2485
        // values for `last_integrated_server_version` which we cannot afford to
22,860✔
2486
        // ignore, but we do not know what client versions they correspond
22,860✔
2487
        // to. Fortunately, we can produce a cursor that works, and is mutually
22,860✔
2488
        // consistent with previous cursors, by simply bumping
22,860✔
2489
        // `upload_progress.client_version` when
22,860✔
2490
        // `upload_progress.last_intgerated_server_version` grows.
22,860✔
2491
        //
22,860✔
2492
        // To see that this scheme works, consider the last changeset, A, that
22,860✔
2493
        // will have already been uploaded and integrated at the beginning of
22,860✔
2494
        // the next session, and the first changeset, B, that follows A in the
22,860✔
2495
        // client side history, and is not upload skippable (of local origin and
22,860✔
2496
        // nonempty). We then need to show that A will be skipped, if uploaded
22,860✔
2497
        // in the next session, but B will not.
22,860✔
2498
        //
22,860✔
2499
        // Let V be the client version produced by A, and let T be the value of
22,860✔
2500
        // `upload_progress.client_version` as determined in this session, which
22,860✔
2501
        // is used as threshold in the next session. Then we know that A is
22,860✔
2502
        // skipped during the next session if V is less than, or equal to T. If
22,860✔
2503
        // the protocol version is at least 29, the protocol requires that T is
22,860✔
2504
        // greater than, or equal to V. If the protocol version is less than 25,
22,860✔
2505
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
22,860✔
2506
        // or 28, we construct T such that it is always greater than, or equal
22,860✔
2507
        // to V, so in all cases, A will be skipped during the next session.
22,860✔
2508
        //
22,860✔
2509
        // Let W be the client version on which B is based. We then know that B
22,860✔
2510
        // will be retained if, and only if W is greater than, or equalto T. If
22,860✔
2511
        // the protocol version is at least 29, we know that T is less than, or
22,860✔
2512
        // equal to W, since B is not integrated until the next session. If the
22,860✔
2513
        // protocol version is less tahn 25, we know that T is V. Since V must
22,860✔
2514
        // be less than, or equal to W, we again know that T is less than, or
22,860✔
2515
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
22,860✔
2516
        // construct T such that it is equal to V + N, where N is the number of
22,860✔
2517
        // observed increments in `last_integrated_server_version` since the
22,860✔
2518
        // client version prodiced by A. For each of these observed increments,
22,860✔
2519
        // there must have been a distinct new client version, but all these
22,860✔
2520
        // client versions must be less than, or equal to W, since B is not
22,860✔
2521
        // integrated until the next session. Therefore, we know that T = V + N
22,860✔
2522
        // is less than, or qual to W. So, in all cases, B will not skipped
22,860✔
2523
        // during the next session.
22,860✔
2524
        int protocol_version = m_connection.get_client_protocol_version();
44,942✔
2525
        static_cast<void>(protocol_version); // No protocol diversion (yet)
44,942✔
2526

22,860✔
2527
        UploadCursor upload_progress;
44,942✔
2528
        upload_progress = {progress_client_version, progress_server_version};
44,942✔
2529

22,860✔
2530
        // `upload_progress.client_version` must be nondecreasing across the
22,860✔
2531
        // session.
22,860✔
2532
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
44,942✔
2533
        if (REALM_UNLIKELY(!good_1)) {
44,942✔
2534
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2535
                         m_upload_progress.client_version); // Throws
×
2536
            error = ProtocolError::bad_client_version;
×
2537
            return false;
×
2538
        }
×
2539
        // `upload_progress.last_integrated_server_version` must be a version
22,860✔
2540
        // that the client can have heard about.
22,860✔
2541
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
44,942✔
2542
        if (REALM_UNLIKELY(!good_2)) {
44,942✔
2543
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2544
                         upload_progress.last_integrated_server_version,
×
2545
                         m_download_progress.server_version); // Throws
×
2546
            error = ProtocolError::bad_server_version;
×
2547
            return false;
×
2548
        }
×
2549

22,860✔
2550
        // `upload_progress` must be consistent.
22,860✔
2551
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
44,942✔
2552
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2553
                         upload_progress.last_integrated_server_version); // Throws
×
2554
            error = ProtocolError::bad_server_version;
×
2555
            return false;
×
2556
        }
×
2557
        // `upload_progress` and `m_upload_threshold` must be mutually
22,860✔
2558
        // consistent.
22,860✔
2559
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
44,942✔
2560
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2561
                         "threshold (%3, %4)",
×
2562
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2563
                         m_upload_threshold.client_version,
×
2564
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2565
            error = ProtocolError::bad_server_version;
×
2566
            return false;
×
2567
        }
×
2568
        // `upload_progress` and `m_upload_progress` must be mutually
22,860✔
2569
        // consistent.
22,860✔
2570
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
44,942✔
2571
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2572
                         "upload progress (%3, %4)",
×
2573
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2574
                         m_upload_progress.client_version,
×
2575
                         m_upload_progress.last_integrated_server_version); // Throws
×
2576
            error = ProtocolError::bad_server_version;
×
2577
            return false;
×
2578
        }
×
2579

22,860✔
2580
        version_type locked_server_version_2 = locked_server_version;
44,942✔
2581

22,860✔
2582
        // `locked_server_version_2` must be nondecreasing over the lifetime of
22,860✔
2583
        // the client-side file.
22,860✔
2584
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
44,942✔
2585
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2586
                         m_locked_server_version); // Throws
×
2587
            error = ProtocolError::bad_server_version;
×
2588
            return false;
×
2589
        }
×
2590
        // `locked_server_version_2` must be a version that the client can have
22,860✔
2591
        // heard about.
22,860✔
2592
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
44,942✔
2593
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2594
                         m_download_progress.server_version); // Throws
×
2595
            error = ProtocolError::bad_server_version;
×
2596
            return false;
×
2597
        }
×
2598

22,860✔
2599
        std::size_t num_previously_integrated_changesets = 0;
44,942✔
2600
        if (!upload_changesets.empty()) {
44,942✔
2601
            UploadCursor up = m_upload_progress;
23,614✔
2602
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
36,920✔
2603
                // `uc.upload_cursor.client_version` must be increasing across
17,862✔
2604
                // all the changesets in this UPLOAD message, and all must be
17,862✔
2605
                // greater than upload_progress.client_version of previous
17,862✔
2606
                // UPLOAD message.
17,862✔
2607
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
36,920✔
2608
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2609
                                 "changeset (%1 <= %2)",
×
2610
                                 uc.upload_cursor.client_version,
×
2611
                                 up.client_version); // Throws
×
2612
                    error = ProtocolError::bad_client_version;
×
2613
                    return false;
×
2614
                }
×
2615
                // `uc.upload_progress` must be consistent.
17,862✔
2616
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
36,920✔
2617
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2618
                                 uc.upload_cursor.client_version,
×
2619
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2620
                    error = ProtocolError::bad_server_version;
×
2621
                    return false;
×
2622
                }
×
2623
                // `uc.upload_progress` must be mutually consistent with
17,862✔
2624
                // previous upload cursor.
17,862✔
2625
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
36,920✔
2626
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2627
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2628
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2629
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2630
                    error = ProtocolError::bad_server_version;
×
2631
                    return false;
×
2632
                }
×
2633
                // `uc.upload_progress` must be mutually consistent with
17,862✔
2634
                // threshold, that is, for changesets that have not previously
17,862✔
2635
                // been integrated, it is important that the specified value of
17,862✔
2636
                // `last_integrated_server_version` is greater than, or equal to
17,862✔
2637
                // the reciprocal history base version.
17,862✔
2638
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
36,920✔
2639
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
36,920✔
2640
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2641
                                 "inconsistent with threshold (%3, %4)",
×
2642
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2643
                                 m_upload_threshold.client_version,
×
2644
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2645
                    error = ProtocolError::bad_server_version;
×
2646
                    return false;
×
2647
                }
×
2648
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
36,920✔
2649
                if (previously_integrated)
36,920✔
2650
                    ++num_previously_integrated_changesets;
2,428✔
2651
                up = uc.upload_cursor;
36,920✔
2652
            }
36,920✔
2653
            // `upload_progress.client_version` must be greater than, or equal
12,272✔
2654
            // to client versions produced by each of the changesets in this
12,272✔
2655
            // UPLOAD message.
12,272✔
2656
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
23,614✔
2657
                logger.error("Upload progress less than client version produced by uploaded "
×
2658
                             "changeset (%1 > %2)",
×
2659
                             up.client_version,
×
2660
                             upload_progress.client_version); // Throws
×
2661
                error = ProtocolError::bad_client_version;
×
2662
                return false;
×
2663
            }
×
2664
            // The upload cursor of last uploaded changeset must be mutually
12,272✔
2665
            // consistent with the reported upload progress.
12,272✔
2666
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
23,614✔
2667
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2668
                             "inconsistent with upload progress (%3, %4)",
×
2669
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2670
                             upload_progress.last_integrated_server_version); // Throws
×
2671
                error = ProtocolError::bad_server_version;
×
2672
                return false;
×
2673
            }
×
2674
        }
44,942✔
2675

22,860✔
2676
        // FIXME: Part of a very poor man's substitute for a proper backpressure
22,860✔
2677
        // scheme.
22,860✔
2678
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
44,942✔
2679
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2680
            // Using this exact error code, because it causes `try_again` flag
2681
            // to be set to true, which causes the client to wait for about 5
2682
            // minuites before trying to connect again.
2683
            error = ProtocolError::connection_closed;
×
2684
            return false;
×
2685
        }
×
2686

22,860✔
2687
        m_upload_progress = upload_progress;
44,942✔
2688

22,860✔
2689
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
44,942✔
2690
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
44,942✔
2691

22,860✔
2692
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
44,942✔
2693
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
44,942✔
2694

22,860✔
2695
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
44,942✔
2696
        if (!have_anything_to_do)
44,942✔
2697
            return true;
244✔
2698

22,716✔
2699
        if (!have_real_upload_progress)
44,698✔
2700
            upload_progress = m_upload_threshold;
×
2701

22,716✔
2702
        if (num_previously_integrated_changesets > 0) {
44,698✔
2703
            logger.detail("Ignoring %1 previously integrated changesets",
760✔
2704
                          num_previously_integrated_changesets); // Throws
760✔
2705
        }
760✔
2706
        if (num_changesets_to_integrate > 0) {
44,698✔
2707
            logger.detail("Initiate integration of %1 remote changesets",
23,280✔
2708
                          num_changesets_to_integrate); // Throws
23,280✔
2709
        }
23,280✔
2710

22,716✔
2711
        REALM_ASSERT(m_server_file);
44,698✔
2712
        ServerFile& file = *m_server_file;
44,698✔
2713
        std::size_t offset = num_previously_integrated_changesets;
44,698✔
2714
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
44,698✔
2715
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
44,698✔
2716

22,716✔
2717
        m_locked_server_version = locked_server_version_2;
44,698✔
2718
        return true;
44,698✔
2719
    }
44,698✔
2720

2721
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2722
    {
12,078✔
2723
        // Protocol state must be WaitForUnbind
5,972✔
2724
        REALM_ASSERT(!m_send_ident_message);
12,078✔
2725
        REALM_ASSERT(ident_message_received());
12,078✔
2726
        REALM_ASSERT(!unbind_message_received());
12,078✔
2727
        REALM_ASSERT(!error_occurred());
12,078✔
2728
        REALM_ASSERT(!m_error_message_sent);
12,078✔
2729

5,972✔
2730
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
12,078✔
2731

5,972✔
2732
        m_download_completion_request = request_ident;
12,078✔
2733

5,972✔
2734
        ensure_enlisted_to_send();
12,078✔
2735
        return true;
12,078✔
2736
    }
12,078✔
2737

2738
    // Returns true if the deactivation process has been completed, at which
2739
    // point the caller (SyncConnection::receive_unbind_message()) should
2740
    // terminate the session.
2741
    //
2742
    // CAUTION: This function may commit suicide!
2743
    void receive_unbind_message()
2744
    {
2,864✔
2745
        // Protocol state may be anything but SendUnbound
1,294✔
2746
        REALM_ASSERT(!m_unbind_message_received);
2,864✔
2747

1,294✔
2748
        logger.detail("Received: UNBIND"); // Throws
2,864✔
2749

1,294✔
2750
        detach_from_server_file();
2,864✔
2751
        m_unbind_message_received = true;
2,864✔
2752

1,294✔
2753
        // Detect completion of the deactivation process
1,294✔
2754
        if (m_error_message_sent) {
2,864✔
2755
            // Deactivation process completed
14✔
2756
            terminate(); // Throws
28✔
2757
            m_connection.discard_session(m_session_ident);
28✔
2758
            // This session is now destroyed!
14✔
2759
            return;
28✔
2760
        }
28✔
2761

1,280✔
2762
        // Protocol state is now SendUnbound
1,280✔
2763
        ensure_enlisted_to_send();
2,836✔
2764
    }
2,836✔
2765

2766
    void receive_error_message(session_ident_type, int, std::string_view)
2767
    {
×
2768
        REALM_ASSERT(!m_unbind_message_received);
×
2769

2770
        logger.detail("Received: ERROR"); // Throws
×
2771
    }
×
2772

2773
private:
2774
    SyncConnection& m_connection;
2775

2776
    const session_ident_type m_session_ident;
2777

2778
    // Not null if, and only if this session is in
2779
    // m_connection.m_sessions_enlisted_to_send.
2780
    Session* m_next = nullptr;
2781

2782
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2783
    // reset to null when the deactivation process is initiated, either when the
2784
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2785
    util::bind_ptr<ServerFile> m_server_file;
2786

2787
    bool m_disable_download = false;
2788
    bool m_is_subserver = false;
2789

2790
    using file_ident_request_type = ServerFile::file_ident_request_type;
2791

2792
    // When nonzero, this session has an outstanding request for a client file
2793
    // identifier.
2794
    file_ident_request_type m_file_ident_request = 0;
2795

2796
    // Payload for next outgoing ALLOC message.
2797
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2798

2799
    // Zero until the session receives an IDENT message from the client.
2800
    file_ident_type m_client_file_ident = 0;
2801

2802
    // Zero until initiate_deactivation() is called.
2803
    ProtocolError m_error_code = {};
2804

2805
    // The current point of progression of the download process. Set to (<server
2806
    // version>, <client version>) of the IDENT message when the IDENT message
2807
    // is received. At the time of return from continue_history_scan(), it
2808
    // points to the latest server version such that all preceding changesets in
2809
    // the server-side history have been downloaded, are currently being
2810
    // downloaded, or are *download excluded*.
2811
    DownloadCursor m_download_progress = {0, 0};
2812

2813
    request_ident_type m_download_completion_request = 0;
2814

2815
    // Records the progress of the upload process. Used to check that the client
2816
    // uploads changesets in order. Also, when m_upload_progress >
2817
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2818
    // version of the upload progress.
2819
    UploadCursor m_upload_progress = {0, 0};
2820

2821
    // Initialized on reception of the IDENT message. Specifies the actual
2822
    // upload progress (as recorded on the server-side) at the beginning of the
2823
    // session, and it remains fixed throughout the session.
2824
    //
2825
    // m_upload_threshold includes the progress resulting from the received
2826
    // changesets that have not yet been integrated (only relevant for
2827
    // synchronous backup).
2828
    UploadCursor m_upload_threshold = {0, 0};
2829

2830
    // Works partially as a cache of the persisted value, and partially as a way
2831
    // of checking that the client respects that it can never decrease.
2832
    version_type m_locked_server_version = 0;
2833

2834
    bool m_send_ident_message = false;
2835
    bool m_unbind_message_received = false;
2836
    bool m_error_message_sent = false;
2837

2838
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2839
    /// has been sent in the current session. The variable is used to ensure
2840
    /// that a DOWNLOAD message is always sent in a session. The received
2841
    /// DOWNLOAD message is needed by the client to ensure that its current
2842
    /// download progress is up to date.
2843
    bool m_one_download_message_sent = false;
2844

2845
    static std::string make_logger_prefix(session_ident_type session_ident)
2846
    {
5,598✔
2847
        std::ostringstream out;
5,598✔
2848
        out.imbue(std::locale::classic());
5,598✔
2849
        out << "Session[" << session_ident << "]: "; // Throws
5,598✔
2850
        return out.str();                            // Throws
5,598✔
2851
    }
5,598✔
2852

2853
    // Scan the history for changesets to be downloaded.
2854
    // If the history is longer than the end point of the previous scan,
2855
    // a DOWNLOAD message will be sent.
2856
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2857
    // requested to be notified about download completion.
2858
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2859
    //
2860
    // This function may lead to the destruction of the session object
2861
    // (suicide).
2862
    void continue_history_scan()
2863
    {
100,798✔
2864
        // Protocol state must be WaitForUnbind
53,238✔
2865
        REALM_ASSERT(!m_send_ident_message);
100,798✔
2866
        REALM_ASSERT(ident_message_received());
100,798✔
2867
        REALM_ASSERT(!unbind_message_received());
100,798✔
2868
        REALM_ASSERT(!error_occurred());
100,798✔
2869
        REALM_ASSERT(!m_error_message_sent);
100,798✔
2870
        REALM_ASSERT(!is_enlisted_to_send());
100,798✔
2871

53,238✔
2872
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
100,798✔
2873
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
100,798✔
2874

53,238✔
2875
        ServerImpl& server = m_connection.get_server();
100,798✔
2876
        const Server::Config& config = server.get_config();
100,798✔
2877
        if (REALM_UNLIKELY(m_disable_download))
100,798✔
2878
            return;
53,238✔
2879

53,238✔
2880
        bool have_more_to_scan =
100,798✔
2881
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
100,798✔
2882
        if (have_more_to_scan) {
100,798✔
2883
            m_server_file->register_client_access(m_client_file_ident);     // Throws
39,130✔
2884
            const ServerHistory& history = m_server_file->access().history; // Throws
39,130✔
2885
            const char* body;
39,130✔
2886
            std::size_t uncompressed_body_size;
39,130✔
2887
            std::size_t compressed_body_size = 0;
39,130✔
2888
            bool body_is_compressed = false;
39,130✔
2889
            version_type end_version = last_server_version.version;
39,130✔
2890
            DownloadCursor download_progress;
39,130✔
2891
            UploadCursor upload_progress = {0, 0};
39,130✔
2892
            std::uint_fast64_t downloadable_bytes = 0;
39,130✔
2893
            std::size_t num_changesets;
39,130✔
2894
            std::size_t accum_original_size;
39,130✔
2895
            std::size_t accum_compacted_size;
39,130✔
2896
            ServerProtocol& protocol = get_server_protocol();
39,130✔
2897
            bool disable_download_compaction = config.disable_download_compaction;
39,130✔
2898
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
39,130!
2899
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
20,982!
2900
            DownloadCache& cache = m_server_file->get_download_cache();
39,130✔
2901
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
39,130!
2902
            if (fetch_from_cache) {
39,130✔
2903
                body = cache.body.get();
×
2904
                uncompressed_body_size = cache.uncompressed_body_size;
×
2905
                compressed_body_size = cache.compressed_body_size;
×
2906
                body_is_compressed = cache.body_is_compressed;
×
2907
                download_progress = cache.download_progress;
×
2908
                downloadable_bytes = cache.downloadable_bytes;
×
2909
                num_changesets = cache.num_changesets;
×
2910
                accum_original_size = cache.accum_original_size;
×
2911
                accum_compacted_size = cache.accum_compacted_size;
×
2912
            }
×
2913
            else {
39,130✔
2914
                // Discard the old cached DOWNLOAD body before generating a new
20,982✔
2915
                // one to be cached. This can make a big difference because the
20,982✔
2916
                // size of that body can be very large (10GiB has been seen in a
20,982✔
2917
                // real-world case).
20,982✔
2918
                if (enable_cache)
39,130✔
2919
                    cache.body = {};
×
2920

20,982✔
2921
                OutputBuffer& out = server.get_misc_buffers().download_message;
39,130✔
2922
                out.reset();
39,130✔
2923
                download_progress = m_download_progress;
39,130✔
2924
                auto fetch_and_compress = [&](std::size_t max_download_size) {
39,134✔
2925
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
39,134✔
2926
                    std::uint_fast64_t cumulative_byte_size_current;
39,134✔
2927
                    std::uint_fast64_t cumulative_byte_size_total;
39,134✔
2928
                    bool not_expired = history.fetch_download_info(
39,134✔
2929
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
39,134✔
2930
                        cumulative_byte_size_current, cumulative_byte_size_total, disable_download_compaction,
39,134✔
2931
                        max_download_size); // Throws
39,134✔
2932
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
39,134✔
2933
                    SyncConnection& conn = get_connection();
39,134✔
2934
                    if (REALM_UNLIKELY(!not_expired)) {
39,134✔
2935
                        logger.debug("History scanning failed: Client file entry "
×
2936
                                     "expired during session"); // Throws
×
2937
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2938
                        // Session object may have been destroyed at this point
2939
                        // (suicide).
2940
                        return false;
×
2941
                    }
×
2942

20,984✔
2943
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
39,134✔
2944
                    uncompressed_body_size = out.size();
39,134✔
2945
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
39,134✔
2946
                    body = uncompressed.data();
39,134✔
2947
                    std::size_t max_uncompressed = 1024;
39,134✔
2948
                    if (uncompressed.size() > max_uncompressed) {
39,134✔
2949
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,300✔
2950
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,300✔
2951
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,300✔
2952
                        if (buffer.size() < uncompressed.size()) {
4,300✔
2953
                            body = buffer.data();
4,300✔
2954
                            compressed_body_size = buffer.size();
4,300✔
2955
                            body_is_compressed = true;
4,300✔
2956
                        }
4,300✔
2957
                    }
4,300✔
2958
                    num_changesets = handler.num_changesets;
39,134✔
2959
                    accum_original_size = handler.accum_original_size;
39,134✔
2960
                    accum_compacted_size = handler.accum_compacted_size;
39,134✔
2961
                    return true;
39,134✔
2962
                };
39,134✔
2963
                if (enable_cache) {
39,130✔
2964
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2965
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2966
                        // Session object may have been destroyed at this point
2967
                        // (suicide).
2968
                        return;
×
2969
                    }
×
2970
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2971
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2972
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2973
                    std::copy(body, body + body_size, cache.body.get());
×
2974
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2975
                    cache.compressed_body_size = compressed_body_size;
×
2976
                    cache.body_is_compressed = body_is_compressed;
×
2977
                    cache.end_version = end_version;
×
2978
                    cache.download_progress = download_progress;
×
2979
                    cache.downloadable_bytes = downloadable_bytes;
×
2980
                    cache.num_changesets = num_changesets;
×
2981
                    cache.accum_original_size = accum_original_size;
×
2982
                    cache.accum_compacted_size = accum_compacted_size;
×
2983
                }
×
2984
                else {
39,130✔
2985
                    std::size_t max_download_size = config.max_download_size;
39,130✔
2986
                    if (!fetch_and_compress(max_download_size)) { // Throws
39,130✔
2987
                        // Session object may have been destroyed at this point
2988
                        // (suicide).
2989
                        return;
×
2990
                    }
×
2991
                }
39,130✔
2992
            }
39,130✔
2993

20,982✔
2994
            OutputBuffer& out = m_connection.get_output_buffer();
39,130✔
2995
            protocol.make_download_message(
39,130✔
2996
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
39,130✔
2997
                download_progress.last_integrated_client_version, last_server_version.version,
39,130✔
2998
                last_server_version.salt, upload_progress.client_version,
39,130✔
2999
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
39,130✔
3000
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
39,130✔
3001

20,982✔
3002
            if (!disable_download_compaction) {
39,134✔
3003
                std::size_t saved = accum_original_size - accum_compacted_size;
39,134✔
3004
                double saved_2 = (accum_original_size == 0 ? 0 : std::round(saved * 100.0 / accum_original_size));
30,358✔
3005
                logger.detail("Download compaction: Saved %1 bytes (%2%%)", saved, saved_2); // Throws
39,134✔
3006
            }
39,134✔
3007

20,982✔
3008
            m_download_progress = download_progress;
39,130✔
3009
            logger.debug("Setting of m_download_progress.server_version = %1",
39,130✔
3010
                         m_download_progress.server_version); // Throws
39,130✔
3011
            send_download_message();
39,130✔
3012
            m_one_download_message_sent = true;
39,130✔
3013

20,982✔
3014
            enlist_to_send();
39,130✔
3015
        }
39,130✔
3016
        else if (m_download_completion_request) {
61,668✔
3017
            // Send a MARK message
5,966✔
3018
            request_ident_type request_ident = m_download_completion_request;
12,066✔
3019
            send_mark_message(request_ident);  // Throws
12,066✔
3020
            m_download_completion_request = 0; // Request handled
12,066✔
3021
            enlist_to_send();
12,066✔
3022
        }
12,066✔
3023
    }
100,798✔
3024

3025
    void send_ident_message()
3026
    {
1,336✔
3027
        // Protocol state must be SendIdent
542✔
3028
        REALM_ASSERT(!need_client_file_ident());
1,336✔
3029
        REALM_ASSERT(m_send_ident_message);
1,336✔
3030
        REALM_ASSERT(!ident_message_received());
1,336✔
3031
        REALM_ASSERT(!unbind_message_received());
1,336✔
3032
        REALM_ASSERT(!error_occurred());
1,336✔
3033
        REALM_ASSERT(!m_error_message_sent);
1,336✔
3034

542✔
3035
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,336✔
3036

542✔
3037
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,336✔
3038
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,336✔
3039

542✔
3040
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,336✔
3041
                     client_file_ident_salt); // Throws
1,336✔
3042

542✔
3043
        ServerProtocol& protocol = get_server_protocol();
1,336✔
3044
        OutputBuffer& out = m_connection.get_output_buffer();
1,336✔
3045
        int protocol_version = m_connection.get_client_protocol_version();
1,336✔
3046
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,336✔
3047
                                    client_file_ident_salt); // Throws
1,336✔
3048
        m_connection.initiate_write_output_buffer();         // Throws
1,336✔
3049

542✔
3050
        m_allocated_file_ident.ident = 0; // Consumed
1,336✔
3051
        m_send_ident_message = false;
1,336✔
3052
        // Protocol state is now WaitForStateRequest or WaitForIdent
542✔
3053
    }
1,336✔
3054

3055
    void send_download_message()
3056
    {
39,134✔
3057
        m_connection.initiate_write_output_buffer(); // Throws
39,134✔
3058
    }
39,134✔
3059

3060
    void send_mark_message(request_ident_type request_ident)
3061
    {
12,066✔
3062
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
12,066✔
3063

5,966✔
3064
        ServerProtocol& protocol = get_server_protocol();
12,066✔
3065
        OutputBuffer& out = m_connection.get_output_buffer();
12,066✔
3066
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
12,066✔
3067
        m_connection.initiate_write_output_buffer();                     // Throws
12,066✔
3068
    }
12,066✔
3069

3070
    void send_alloc_message()
3071
    {
×
3072
        // Protocol state must be WaitForUnbind
3073
        REALM_ASSERT(!m_send_ident_message);
×
3074
        REALM_ASSERT(ident_message_received());
×
3075
        REALM_ASSERT(!unbind_message_received());
×
3076
        REALM_ASSERT(!error_occurred());
×
3077
        REALM_ASSERT(!m_error_message_sent);
×
3078

3079
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3080

3081
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3082
        REALM_ASSERT(false);
×
3083

3084
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3085

3086
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3087

3088
        ServerProtocol& protocol = get_server_protocol();
×
3089
        OutputBuffer& out = m_connection.get_output_buffer();
×
3090
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3091
        m_connection.initiate_write_output_buffer();                   // Throws
×
3092

3093
        m_allocated_file_ident.ident = 0; // Consumed
×
3094

3095
        // Other messages may be waiting to be sent.
3096
        enlist_to_send();
×
3097
    }
×
3098

3099
    void send_unbound_message()
3100
    {
2,610✔
3101
        // Protocol state must be SendUnbound
1,054✔
3102
        REALM_ASSERT(unbind_message_received());
2,610✔
3103
        REALM_ASSERT(!m_error_message_sent);
2,610✔
3104

1,054✔
3105
        logger.debug("Sending: UNBOUND"); // Throws
2,610✔
3106

1,054✔
3107
        ServerProtocol& protocol = get_server_protocol();
2,610✔
3108
        OutputBuffer& out = m_connection.get_output_buffer();
2,610✔
3109
        protocol.make_unbound_message(out, m_session_ident); // Throws
2,610✔
3110
        m_connection.initiate_write_output_buffer();         // Throws
2,610✔
3111
    }
2,610✔
3112

3113
    void send_error_message()
3114
    {
76✔
3115
        // Protocol state must be SendError
34✔
3116
        REALM_ASSERT(!unbind_message_received());
76✔
3117
        REALM_ASSERT(error_occurred());
76✔
3118
        REALM_ASSERT(!m_error_message_sent);
76✔
3119

34✔
3120
        REALM_ASSERT(is_session_level_error(m_error_code));
76✔
3121

34✔
3122
        ProtocolError error_code = m_error_code;
76✔
3123
        const char* message = get_protocol_error_message(int(error_code));
76✔
3124
        std::size_t message_size = std::strlen(message);
76✔
3125
        bool try_again = determine_try_again(error_code);
76✔
3126

34✔
3127
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
76✔
3128
                      try_again); // Throws
76✔
3129

34✔
3130
        ServerProtocol& protocol = get_server_protocol();
76✔
3131
        OutputBuffer& out = m_connection.get_output_buffer();
76✔
3132
        int protocol_version = m_connection.get_client_protocol_version();
76✔
3133
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
76✔
3134
                                    m_session_ident); // Throws
76✔
3135
        m_connection.initiate_write_output_buffer();  // Throws
76✔
3136

34✔
3137
        m_error_message_sent = true;
76✔
3138
        // Protocol state is now WaitForUnbindErr
34✔
3139
    }
76✔
3140

3141
    void send_log_message(util::Logger::Level level, const std::string&& message)
3142
    {
4,122✔
3143
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
4,122✔
3144
            return logger.log(level, message.c_str());
×
3145
        }
×
3146

2,092✔
3147
        m_connection.send_log_message(level, std::move(message), m_session_ident);
4,122✔
3148
    }
4,122✔
3149

3150
    // Idempotent
3151
    void detach_from_server_file() noexcept
3152
    {
8,536✔
3153
        if (!m_server_file)
8,536✔
3154
            return;
2,968✔
3155
        ServerFile& file = *m_server_file;
5,568✔
3156
        if (ident_message_received()) {
5,568✔
3157
            file.remove_identified_session(m_client_file_ident);
4,122✔
3158
        }
4,122✔
3159
        else {
1,446✔
3160
            file.remove_unidentified_session(this);
1,446✔
3161
        }
1,446✔
3162
        if (m_file_ident_request != 0)
5,568✔
3163
            file.cancel_file_ident_request(m_file_ident_request);
1,184✔
3164
        m_server_file.reset();
5,568✔
3165
    }
5,568✔
3166

3167
    friend class SessionQueue;
3168
};
3169

3170

3171
// ============================ SessionQueue implementation ============================
3172

3173
void SessionQueue::push_back(Session* sess) noexcept
3174
{
105,254✔
3175
    REALM_ASSERT(!sess->m_next);
105,254✔
3176
    if (m_back) {
105,254✔
3177
        sess->m_next = m_back->m_next;
38,408✔
3178
        m_back->m_next = sess;
38,408✔
3179
    }
38,408✔
3180
    else {
66,846✔
3181
        sess->m_next = sess;
66,846✔
3182
    }
66,846✔
3183
    m_back = sess;
105,254✔
3184
}
105,254✔
3185

3186

3187
Session* SessionQueue::pop_front() noexcept
3188
{
151,726✔
3189
    Session* sess = nullptr;
151,726✔
3190
    if (m_back) {
151,726✔
3191
        sess = m_back->m_next;
104,816✔
3192
        if (sess != m_back) {
104,816✔
3193
            m_back->m_next = sess->m_next;
38,114✔
3194
        }
38,114✔
3195
        else {
66,702✔
3196
            m_back = nullptr;
66,702✔
3197
        }
66,702✔
3198
        sess->m_next = nullptr;
104,816✔
3199
    }
104,816✔
3200
    return sess;
151,726✔
3201
}
151,726✔
3202

3203

3204
void SessionQueue::clear() noexcept
3205
{
3,178✔
3206
    if (m_back) {
3,178✔
3207
        Session* sess = m_back;
140✔
3208
        for (;;) {
434✔
3209
            Session* next = sess->m_next;
434✔
3210
            sess->m_next = nullptr;
434✔
3211
            if (next == m_back)
434✔
3212
                break;
140✔
3213
            sess = next;
294✔
3214
        }
294✔
3215
        m_back = nullptr;
140✔
3216
    }
140✔
3217
}
3,178✔
3218

3219

3220
// ============================ ServerFile implementation ============================
3221

3222
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3223
                       std::string real_path, bool disable_sync_to_disk)
3224
    : logger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.logger_ptr}               // Throws
3225
    , wlogger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.get_worker().logger_ptr} // Throws
3226
    , m_server{server}
3227
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
3228
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
3229
{
1,080✔
3230
}
1,080✔
3231

3232

3233
ServerFile::~ServerFile() noexcept
3234
{
1,080✔
3235
    REALM_ASSERT(m_unidentified_sessions.empty());
1,080✔
3236
    REALM_ASSERT(m_identified_sessions.empty());
1,080✔
3237
    REALM_ASSERT(m_file_ident_request == 0);
1,080✔
3238
}
1,080✔
3239

3240

3241
void ServerFile::initialize()
3242
{
1,080✔
3243
    const ServerHistory& history = access().history; // Throws
1,080✔
3244
    file_ident_type partial_file_ident = 0;
1,080✔
3245
    version_type partial_progress_reference_version = 0;
1,080✔
3246
    bool has_upstream_sync_status;
1,080✔
3247
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,080✔
3248
                       partial_progress_reference_version); // Throws
1,080✔
3249
    REALM_ASSERT(!has_upstream_sync_status);
1,080✔
3250
    REALM_ASSERT(partial_file_ident == 0);
1,080✔
3251
}
1,080✔
3252

3253

3254
void ServerFile::activate() {}
1,080✔
3255

3256

3257
// This function must be called only after a completed invocation of
3258
// initialize(). Both functinos must only ever be called by the network event
3259
// loop thread.
3260
void ServerFile::register_client_access(file_ident_type) {}
87,952✔
3261

3262

3263
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file, ClientType client_type)
3264
    -> file_ident_request_type
3265
{
2,520✔
3266
    auto request = ++m_last_file_ident_request;
2,520✔
3267
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
2,520✔
3268

1,088✔
3269
    on_work_added(); // Throws
2,520✔
3270
    return request;
2,520✔
3271
}
2,520✔
3272

3273

3274
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3275
{
1,184✔
3276
    auto i = m_file_ident_requests.find(request);
1,184✔
3277
    REALM_ASSERT(i != m_file_ident_requests.end());
1,184✔
3278
    FileIdentRequestInfo& info = i->second;
1,184✔
3279
    REALM_ASSERT(info.receiver);
1,184✔
3280
    info.receiver = nullptr;
1,184✔
3281
}
1,184✔
3282

3283

3284
void ServerFile::add_unidentified_session(Session* sess)
3285
{
5,570✔
3286
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
5,570✔
3287
    m_unidentified_sessions.insert(sess); // Throws
5,570✔
3288
}
5,570✔
3289

3290

3291
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3292
{
4,122✔
3293
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
4,122✔
3294
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
4,122✔
3295

2,092✔
3296
    m_identified_sessions[client_file_ident] = sess; // Throws
4,122✔
3297
    m_unidentified_sessions.erase(sess);
4,122✔
3298
}
4,122✔
3299

3300

3301
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3302
{
1,448✔
3303
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
1,448✔
3304
    m_unidentified_sessions.erase(sess);
1,448✔
3305
}
1,448✔
3306

3307

3308
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3309
{
4,122✔
3310
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
4,122✔
3311
    m_identified_sessions.erase(client_file_ident);
4,122✔
3312
}
4,122✔
3313

3314

3315
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3316
{
4,120✔
3317
    auto i = m_identified_sessions.find(client_file_ident);
4,120✔
3318
    if (i == m_identified_sessions.end())
4,120✔
3319
        return nullptr;
4,120✔
3320
    return i->second;
×
3321
}
×
3322

3323
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3324
{
44,940✔
3325
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
44,940✔
3326
}
44,940✔
3327

3328

3329
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3330
                                                version_type locked_server_version, const UploadChangeset* changesets,
3331
                                                std::size_t num_changesets)
3332
{
44,696✔
3333
    register_client_access(client_file_ident); // Throws
44,696✔
3334

22,714✔
3335
    bool dirty = false;
44,696✔
3336

22,714✔
3337
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
44,696✔
3338
    std::size_t num_bytes = 0;
44,696✔
3339
    for (std::size_t i = 0; i < num_changesets; ++i) {
79,186✔
3340
        const UploadChangeset& uc = changesets[i];
34,490✔
3341
        auto& changesets = list.changesets;
34,490✔
3342
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,490✔
3343
                                uc.changeset); // Throws
34,490✔
3344
        num_bytes += uc.changeset.size();
34,490✔
3345
        dirty = true;
34,490✔
3346
    }
34,490✔
3347

22,714✔
3348
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
44,696✔
3349
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
44,696✔
3350
    if (upload_progress.client_version > list.upload_progress.client_version) {
44,696✔
3351
        list.upload_progress = upload_progress;
44,696✔
3352
        dirty = true;
44,696✔
3353
    }
44,696✔
3354

22,714✔
3355
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
44,696✔
3356
    if (locked_server_version > list.locked_server_version) {
44,696✔
3357
        list.locked_server_version = locked_server_version;
38,752✔
3358
        dirty = true;
38,752✔
3359
    }
38,752✔
3360

22,714✔
3361
    if (REALM_LIKELY(dirty)) {
44,696✔
3362
        if (num_changesets > 0) {
44,696✔
3363
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
23,280✔
3364
        }
23,280✔
3365
        else {
21,416✔
3366
            on_work_added(); // Throws
21,416✔
3367
        }
21,416✔
3368
    }
44,696✔
3369
}
44,696✔
3370

3371

3372
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3373
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3374
                                                    ClientType client_type, UploadCursor& upload_progress,
3375
                                                    version_type& locked_server_version, Logger& logger)
3376
{
4,154✔
3377
    // The Realm file may contain a later snapshot than the one reflected by
2,108✔
3378
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
2,108✔
3379
    if (server_version.version > m_version_info.sync_version.version)
4,154✔
3380
        return BootstrapError::bad_server_version;
20✔
3381

2,098✔
3382
    const ServerHistory& hist = access().history; // Throws
4,134✔
3383
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
4,134✔
3384
                                                         client_type, upload_progress, locked_server_version,
4,134✔
3385
                                                         logger); // Throws
4,134✔
3386

2,098✔
3387
    // FIXME: Rather than taking previously buffered changesets from the same
2,098✔
3388
    // client file into account when determining the upload progress, and then
2,098✔
3389
    // allowing for an error during the integration of those changesets to be
2,098✔
3390
    // reported to, and terminate the new session, consider to instead postpone
2,098✔
3391
    // the bootstrapping of the new session until all previously buffered
2,098✔
3392
    // changesets from same client file have been fully processed.
2,098✔
3393

2,098✔
3394
    if (error == BootstrapError::no_error) {
4,134✔
3395
        register_client_access(client_file_ident.ident); // Throws
4,122✔
3396

2,092✔
3397
        // If upload, or releaseing of server versions progressed further during
2,092✔
3398
        // previous sessions than the persisted points, take that into account
2,092✔
3399
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
4,122✔
3400
        if (i != m_work.changesets_from_downstream.end()) {
4,122✔
3401
            const IntegratableChangesetList& list = i->second;
1,398✔
3402
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,398✔
3403
            upload_progress = list.upload_progress;
1,398✔
3404
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,398✔
3405
            locked_server_version = list.locked_server_version;
1,398✔
3406
        }
1,398✔
3407
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
4,122✔
3408
        if (j != m_changesets_from_downstream.end()) {
4,122✔
3409
            const IntegratableChangesetList& list = j->second;
180✔
3410
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
180✔
3411
            upload_progress = list.upload_progress;
180✔
3412
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
180✔
3413
            locked_server_version = list.locked_server_version;
180✔
3414
        }
180✔
3415
    }
4,122✔
3416

2,098✔
3417
    return error;
4,134✔
3418
}
4,134✔
3419

3420
// NOTE: This function is executed by the worker thread
3421
void ServerFile::worker_process_work_unit(WorkerState& state)
3422
{
38,112✔
3423
    SteadyTimePoint start_time = steady_clock_now();
38,112✔
3424
    milliseconds_type parallel_time = 0;
38,112✔
3425

19,056✔
3426
    Work& work = m_work;
38,112✔
3427
    wlogger.debug("Work unit execution started"); // Throws
38,112✔
3428

19,056✔
3429
    if (work.has_primary_work) {
38,114✔
3430
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
38,114✔
3431
            worker_allocate_file_identifiers(); // Throws
20,326✔
3432

19,056✔
3433
        if (!m_work.changesets_from_downstream.empty())
38,114✔
3434
            worker_integrate_changes_from_downstream(state); // Throws
36,288✔
3435
    }
38,114✔
3436

19,056✔
3437
    wlogger.debug("Work unit execution completed"); // Throws
38,112✔
3438

19,056✔
3439
    milliseconds_type time = steady_duration(start_time);
38,112✔
3440
    milliseconds_type seq_time = time - parallel_time;
38,112✔
3441
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
38,112✔
3442
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
38,112✔
3443

19,056✔
3444
    // Pass control back to the network event loop thread
19,056✔
3445
    network::Service& service = m_server.get_service();
38,112✔
3446
    service.post([this](Status) {
37,966✔
3447
        // FIXME: The safety of capturing `this` here, relies on the fact
18,820✔
3448
        // that ServerFile objects currently are not destroyed until the
18,820✔
3449
        // server object is destroyed.
18,820✔
3450
        group_postprocess_stage_1(); // Throws
37,730✔
3451
        // Suicide may have happened at this point
18,820✔
3452
    }); // Throws
37,730✔
3453
}
38,112✔
3454

3455

3456
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3457
{
23,280✔
3458
    m_num_changesets_from_downstream += num_changesets;
23,280✔
3459

12,064✔
3460
    if (num_bytes > 0) {
23,280✔
3461
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
23,280✔
3462
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
23,280✔
3463
    }
23,280✔
3464

12,064✔
3465
    on_work_added(); // Throws
23,280✔
3466
}
23,280✔
3467

3468

3469
void ServerFile::on_work_added()
3470
{
47,218✔
3471
    if (m_has_blocked_work)
47,218✔
3472
        return;
8,978✔
3473
    m_has_blocked_work = true;
38,240✔
3474
    // Reference file
19,090✔
3475
    if (m_has_work_in_progress)
38,240✔
3476
        return;
12,118✔
3477
    group_unblock_work(); // Throws
26,122✔
3478
}
26,122✔
3479

3480

3481
void ServerFile::group_unblock_work()
3482
{
38,196✔
3483
    REALM_ASSERT(!m_has_work_in_progress);
38,196✔
3484
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
38,196✔
3485
        unblock_work(); // Throws
38,192✔
3486
        const Work& work = m_work;
38,192✔
3487
        if (REALM_LIKELY(work.has_primary_work)) {
38,192✔
3488
            logger.trace("Work unit unblocked"); // Throws
38,120✔
3489
            m_has_work_in_progress = true;
38,120✔
3490
            Worker& worker = m_server.get_worker();
38,120✔
3491
            worker.enqueue(this); // Throws
38,120✔
3492
        }
38,120✔
3493
    }
38,192✔
3494
}
38,196✔
3495

3496

3497
void ServerFile::unblock_work()
3498
{
38,192✔
3499
    REALM_ASSERT(m_has_blocked_work);
38,192✔
3500

19,060✔
3501
    m_work.reset();
38,192✔
3502

19,060✔
3503
    // Discard requests for file identifiers whose receiver is no longer
19,060✔
3504
    // waiting.
19,060✔
3505
    {
38,192✔
3506
        auto i = m_file_ident_requests.begin();
38,192✔
3507
        auto end = m_file_ident_requests.end();
38,192✔
3508
        while (i != end) {
40,582✔
3509
            auto j = i++;
2,390✔
3510
            const FileIdentRequestInfo& info = j->second;
2,390✔
3511
            if (!info.receiver)
2,390✔
3512
                m_file_ident_requests.erase(j);
518✔
3513
        }
2,390✔
3514
    }
38,192✔
3515
    std::size_t n = m_file_ident_requests.size();
38,192✔
3516
    if (n > 0) {
38,192✔
3517
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,836✔
3518
        std::size_t i = 0;
1,836✔
3519
        for (const auto& pair : m_file_ident_requests) {
1,870✔
3520
            const FileIdentRequestInfo& info = pair.second;
1,870✔
3521
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,870✔
3522
            slot.proxy_file = info.proxy_file;
1,870✔
3523
            slot.client_type = info.client_type;
1,870✔
3524
            ++i;
1,870✔
3525
        }
1,870✔
3526
        m_work.has_primary_work = true;
1,836✔
3527
    }
1,836✔
3528

19,060✔
3529
    // FIXME: `ServerFile::m_changesets_from_downstream` and
19,060✔
3530
    // `Work::changesets_from_downstream` should be renamed to something else,
19,060✔
3531
    // as it may contain kinds of data other than changesets.
19,060✔
3532

19,060✔
3533
    using std::swap;
38,192✔
3534
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
38,192✔
3535
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
38,192✔
3536
    bool has_changesets = !m_work.changesets_from_downstream.empty();
38,192✔
3537
    if (has_changesets) {
38,192✔
3538
        m_work.has_primary_work = true;
36,294✔
3539
    }
36,294✔
3540

19,060✔
3541
    // Keep track of the size of pending changesets
19,060✔
3542
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
38,192✔
3543
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
38,192✔
3544
    m_blocked_changesets_from_downstream_byte_size = 0;
38,192✔
3545

19,060✔
3546
    m_num_changesets_from_downstream = 0;
38,192✔
3547
    m_has_blocked_work = false;
38,192✔
3548
}
38,192✔
3549

3550

3551
void ServerFile::resume_download() noexcept
3552
{
22,390✔
3553
    for (const auto& entry : m_identified_sessions) {
35,096✔
3554
        Session& sess = *entry.second;
35,096✔
3555
        sess.ensure_enlisted_to_send();
35,096✔
3556
    }
35,096✔
3557
}
22,390✔
3558

3559

3560
void ServerFile::recognize_external_change()
3561
{
4,798✔
3562
    VersionInfo prev_version_info = m_version_info;
4,798✔
3563
    const ServerHistory& history = access().history;       // Throws
4,798✔
3564
    bool has_upstream_status;                              // Dummy
4,798✔
3565
    sync::file_ident_type partial_file_ident;              // Dummy
4,798✔
3566
    sync::version_type partial_progress_reference_version; // Dummy
4,798✔
3567
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,798✔
3568
                       partial_progress_reference_version); // Throws
4,798✔
3569

2,400✔
3570
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,798✔
3571
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,798✔
3572
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,798✔
3573
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,798✔
3574
        resume_download();
4,798✔
3575
    }
4,798✔
3576
}
4,798✔
3577

3578

3579
// NOTE: This function is executed by the worker thread
3580
void ServerFile::worker_allocate_file_identifiers()
3581
{
1,836✔
3582
    Work& work = m_work;
1,836✔
3583
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,836✔
3584
    ServerHistory& hist = worker_access().history;                                      // Throws
1,836✔
3585
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,836✔
3586
    m_work.produced_new_realm_version = true;
1,836✔
3587
}
1,836✔
3588

3589

3590
// Returns true when, and only when this function produces a new sync version
3591
// (adds a new entry to the sync history).
3592
//
3593
// NOTE: This function is executed by the worker thread
3594
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3595
{
36,288✔
3596
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
36,288✔
3597

18,496✔
3598
    std::unique_ptr<ServerHistory> hist_ptr;
36,288✔
3599
    DBRef sg_ptr;
36,288✔
3600
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
36,288✔
3601
    bool backup_whole_realm = false;
36,288✔
3602
    bool produced_new_realm_version = hist.integrate_client_changesets(
36,288✔
3603
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
36,288✔
3604
        wlogger); // Throws
36,288✔
3605
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
36,288✔
3606
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
36,288✔
3607
    if (produced_new_realm_version) {
36,288✔
3608
        m_work.produced_new_realm_version = true;
36,266✔
3609
        if (produced_new_sync_version) {
36,266✔
3610
            m_work.produced_new_sync_version = true;
17,602✔
3611
        }
17,602✔
3612
    }
36,266✔
3613
    return produced_new_sync_version;
36,288✔
3614
}
36,288✔
3615

3616
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3617
                                                   DBRef& sg_ptr)
3618
{
36,288✔
3619
    if (state.use_file_cache)
36,288✔
3620
        return worker_access().history; // Throws
36,288✔
3621
    const std::string& path = m_worker_file.realm_path;
×
3622
    hist_ptr = m_server.make_history_for_path();                   // Throws
×
3623
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
×
3624
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
×
3625
    sg_ptr->claim_sync_agent();                                    // Throws
×
3626
    return *hist_ptr;                                              // Throws
×
3627
}
×
3628

3629

3630
// When worker thread finishes work unit.
3631
void ServerFile::group_postprocess_stage_1()
3632
{
37,730✔
3633
    REALM_ASSERT(m_has_work_in_progress);
37,730✔
3634

18,820✔
3635
    group_finalize_work_stage_1(); // Throws
37,730✔
3636
    group_finalize_work_stage_2(); // Throws
37,730✔
3637
    group_postprocess_stage_2();   // Throws
37,730✔
3638
}
37,730✔
3639

3640

3641
void ServerFile::group_postprocess_stage_2()
3642
{
37,728✔
3643
    REALM_ASSERT(m_has_work_in_progress);
37,728✔
3644
    group_postprocess_stage_3(); // Throws
37,728✔
3645
    // Suicide may have happened at this point
18,822✔
3646
}
37,728✔
3647

3648

3649
// When all files, including the reference file, have been backed up.
3650
void ServerFile::group_postprocess_stage_3()
3651
{
37,728✔
3652
    REALM_ASSERT(m_has_work_in_progress);
37,728✔
3653
    m_has_work_in_progress = false;
37,728✔
3654

18,822✔
3655
    logger.trace("Work unit postprocessing complete"); // Throws
37,728✔
3656
    if (m_has_blocked_work)
37,728✔
3657
        group_unblock_work(); // Throws
12,076✔
3658
}
37,728✔
3659

3660

3661
void ServerFile::finalize_work_stage_1()
3662
{
37,730✔
3663
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
37,730✔
3664
        // Report the byte size of completed downstream changesets.
9,216✔
3665
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
17,608✔
3666
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
17,608✔
3667
        m_unblocked_changesets_from_downstream_byte_size = 0;
17,608✔
3668
    }
17,608✔
3669

18,820✔
3670
    // Deal with errors (bad changesets) pertaining to downstream clients
18,820✔
3671
    std::size_t num_changesets_removed = 0;
37,730✔
3672
    std::size_t num_bytes_removed = 0;
37,730✔
3673
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
18,832✔
3674
        file_ident_type client_file_ident = entry.first;
16✔
3675
        ExtendedIntegrationError error = entry.second;
16✔
3676
        ProtocolError error_2 = ProtocolError::other_session_error;
16✔
3677
        switch (error) {
16✔
3678
            case ExtendedIntegrationError::client_file_expired:
✔
3679
                logger.debug("Changeset integration failed: Client file entry "
×
3680
                             "expired during session"); // Throws
×
3681
                error_2 = ProtocolError::client_file_expired;
×
3682
                break;
×
3683
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3684
                error_2 = ProtocolError::bad_origin_file_ident;
×
3685
                break;
×
3686
            case ExtendedIntegrationError::bad_changeset:
16✔
3687
                error_2 = ProtocolError::bad_changeset;
16✔
3688
                break;
16✔
3689
        }
16✔
3690
        auto i = m_identified_sessions.find(client_file_ident);
16✔
3691
        if (i != m_identified_sessions.end()) {
16✔
3692
            Session& sess = *i->second;
16✔
3693
            SyncConnection& conn = sess.get_connection();
16✔
3694
            conn.protocol_error(error_2, &sess); // Throws
16✔
3695
        }
16✔
3696
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
16✔
3697
        std::size_t num_changesets = list.changesets.size();
16✔
3698
        std::size_t num_bytes = 0;
16✔
3699
        for (const IntegratableChangeset& ic : list.changesets)
16✔
3700
            num_bytes += ic.changeset.size();
×
3701
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
16✔
3702
                    client_file_ident); // Throws
16✔
3703
        num_changesets_removed += num_changesets;
16✔
3704
        num_bytes_removed += num_bytes;
16✔
3705
        m_changesets_from_downstream.erase(client_file_ident);
16✔
3706
    }
16✔
3707

18,820✔
3708
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
37,730✔
3709
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
37,730✔
3710

18,820✔
3711
    if (num_changesets_removed == 0)
37,730✔
3712
        return;
37,730✔
3713

3714
    m_num_changesets_from_downstream -= num_changesets_removed;
×
3715

3716
    // The byte size of the blocked changesets must be decremented.
3717
    if (num_bytes_removed > 0) {
×
3718
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3719
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3720
    }
×
3721
}
×
3722

3723

3724
void ServerFile::finalize_work_stage_2()
3725
{
37,728✔
3726
    // Expose new snapshot to remote peers
18,820✔
3727
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
37,728✔
3728
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
37,728✔
3729
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
37,682✔
3730
        m_version_info = m_work.version_info;
37,682✔
3731
    }
37,682✔
3732

18,820✔
3733
    bool resume_download_and_upload = m_work.produced_new_sync_version;
37,728✔
3734

18,820✔
3735
    // Deliver allocated file identifiers to requesters
18,820✔
3736
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
37,728✔
3737
    auto begin = m_file_ident_requests.begin();
37,728✔
3738
    auto i = begin;
37,728✔
3739
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
20,102✔
3740
        FileIdentRequestInfo& info = i->second;
1,850✔
3741
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,850✔
3742
        REALM_ASSERT(info.client_type == slot.client_type);
1,850✔
3743
        if (FileIdentReceiver* receiver = info.receiver) {
1,850✔
3744
            info.receiver = nullptr;
1,336✔
3745
            receiver->receive_file_ident(slot.file_ident); // Throws
1,336✔
3746
        }
1,336✔
3747
        ++i;
1,850✔
3748
    }
1,850✔
3749
    m_file_ident_requests.erase(begin, i);
37,728✔
3750

18,820✔
3751
    // Resume download to downstream clients
18,820✔
3752
    if (resume_download_and_upload) {
37,728✔
3753
        resume_download();
17,592✔
3754
    }
17,592✔
3755
}
37,728✔
3756

3757
// ============================ Worker implementation ============================
3758

3759
Worker::Worker(ServerImpl& server)
3760
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, "Worker: ", server.logger_ptr)}
3761
    // Throws
3762
    , logger(*logger_ptr)
3763
    , m_server{server}
3764
    , m_transformer{make_transformer()} // Throws
3765
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
3766
{
7,964✔
3767
    util::seed_prng_nondeterministically(m_random); // Throws
7,964✔
3768
}
7,964✔
3769

3770

3771
void Worker::enqueue(ServerFile* file)
3772
{
38,122✔
3773
    util::LockGuard lock{m_mutex};
38,122✔
3774
    m_queue.push_back(file); // Throws
38,122✔
3775
    m_cond.notify_all();
38,122✔
3776
}
38,122✔
3777

3778

3779
std::mt19937_64& Worker::server_history_get_random() noexcept
3780
{
2,896✔
3781
    return m_random;
2,896✔
3782
}
2,896✔
3783

3784

3785
sync::Transformer& Worker::get_transformer()
3786
{
19,242✔
3787
    return *m_transformer;
19,242✔
3788
}
19,242✔
3789

3790

3791
util::Buffer<char>& Worker::get_transform_buffer()
3792
{
×
3793
    return m_transform_buffer;
×
3794
}
×
3795

3796
void Worker::run()
3797
{
7,912✔
3798
    for (;;) {
46,026✔
3799
        ServerFile* file = nullptr;
46,026✔
3800
        {
46,026✔
3801
            util::LockGuard lock{m_mutex};
46,026✔
3802
            for (;;) {
91,440✔
3803
                if (REALM_UNLIKELY(m_stop))
91,440✔
3804
                    return;
49,590✔
3805
                if (!m_queue.empty()) {
83,528✔
3806
                    file = m_queue.front();
38,114✔
3807
                    m_queue.pop_front();
38,114✔
3808
                    break;
38,114✔
3809
                }
38,114✔
3810
                m_cond.wait(lock);
45,414✔
3811
            }
45,414✔
3812
        }
46,026✔
3813
        file->worker_process_work_unit(m_state); // Throws
42,018✔
3814
    }
38,114✔
3815
}
7,912✔
3816

3817

3818
void Worker::stop() noexcept
3819
{
7,912✔
3820
    util::LockGuard lock{m_mutex};
7,912✔
3821
    m_stop = true;
7,912✔
3822
    m_cond.notify_all();
7,912✔
3823
}
7,912✔
3824

3825

3826
// ============================ ServerImpl implementation ============================
3827

3828
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3829
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::server, std::move(config.logger))}
3830
    , logger{*logger_ptr}
3831
    , m_config{std::move(config)}
3832
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
3833
    , m_root_dir{root_dir} // Throws
3834
    , m_access_control{std::move(pkey)}
3835
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
3836
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
3837
    , m_worker{*this}                                                                    // Throws
3838
    , m_acceptor{get_service()}
3839
    , m_server_protocol{}       // Throws
3840
    , m_compress_memory_arena{} // Throws
3841
{
7,964✔
3842
    if (m_config.ssl) {
7,964✔
3843
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3844
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3845
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3846
    }
24✔
3847
}
7,964✔
3848

3849

3850
ServerImpl::~ServerImpl() noexcept
3851
{
7,964✔
3852
    bool server_destroyed_while_still_running = m_running;
7,964✔
3853
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
7,964✔
3854
}
7,964✔
3855

3856

3857
void ServerImpl::start()
3858
{
7,964✔
3859
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
7,964✔
3860
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
7,964✔
3861
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
7,964✔
3862
                m_protocol_version_range.first,
7,964✔
3863
                m_protocol_version_range.second); // Throws
7,964✔
3864
    logger.info("Platform: %1", util::get_platform_info());
7,964✔
3865
    bool is_debug_build = false;
7,964✔
3866
#if REALM_DEBUG
7,964✔
3867
    is_debug_build = true;
7,964✔
3868
#endif
7,964✔
3869
    {
7,964✔
3870
        const char* lead_text = "Build mode";
7,964✔
3871
        if (is_debug_build) {
7,964✔
3872
            logger.info("%1: Debug", lead_text); // Throws
7,964✔
3873
        }
7,964✔
3874
        else {
×
3875
            logger.info("%1: Release", lead_text); // Throws
×
3876
        }
×
3877
    }
7,964✔
3878
    if (is_debug_build) {
7,964✔
3879
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
7,964✔
3880
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
7,964✔
3881
    }
7,964✔
3882
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
7,964✔
3883
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
7,964✔
3884
    {
7,964✔
3885
        const char* lead_text = "Encryption";
7,964✔
3886
        if (m_config.encryption_key) {
7,964✔
3887
            logger.info("%1: Yes", lead_text); // Throws
4✔
3888
        }
4✔
3889
        else {
7,960✔
3890
            logger.info("%1: No", lead_text); // Throws
7,960✔
3891
        }
7,960✔
3892
    }
7,964✔
3893
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
7,964✔
3894
    {
7,964✔
3895
        const char* lead_text = "Disable sync to disk";
7,964✔
3896
        if (m_config.disable_sync_to_disk) {
7,964✔
3897
            logger.info("%1: All files", lead_text); // Throws
7,276✔
3898
        }
7,276✔
3899
        else {
688✔
3900
            logger.info("%1: No", lead_text); // Throws
688✔
3901
        }
688✔
3902
    }
7,964✔
3903
    if (m_config.disable_sync_to_disk) {
7,964✔
3904
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
7,276✔
3905
                    "never do this in production!"); // Throws
7,276✔
3906
    }
7,276✔
3907
    logger.info("Download compaction: %1",
7,964✔
3908
                (m_config.disable_download_compaction ? "No" : "Yes")); // Throws
7,964✔
3909
    logger.info("Download bootstrap caching: %1",
7,964✔
3910
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
7,964✔
3911
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
7,964✔
3912
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
7,964✔
3913
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
7,964✔
3914
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
7,964✔
3915
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
7,964✔
3916
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
7,964✔
3917
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
7,964✔
3918
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
7,964✔
3919

3,930✔
3920
    m_transformer = make_transformer(); // Throws
7,964✔
3921

3,930✔
3922
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
7,964✔
3923

3,930✔
3924
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
7,964✔
3925

3,930✔
3926
    listen(); // Throws
7,964✔
3927
}
7,964✔
3928

3929

3930
void ServerImpl::run()
3931
{
7,912✔
3932
    auto ta = util::make_temp_assign(m_running, true);
7,912✔
3933

3,904✔
3934
    {
7,912✔
3935
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
7,912✔
3936
        std::string name;
7,912✔
3937
        if (util::Thread::get_name(name)) {
7,912✔
3938
            name += "-worker";
7,912✔
3939
            worker_thread.start_with_signals_blocked(name); // Throws
7,912✔
3940
        }
7,912✔
UNCOV
3941
        else {
×
UNCOV
3942
            worker_thread.start_with_signals_blocked(); // Throws
×
UNCOV
3943
        }
×
3944

3,904✔
3945
        m_service.run(); // Throws
7,912✔
3946

3,904✔
3947
        worker_thread.stop_and_rethrow(); // Throws
7,912✔
3948
    }
7,912✔
3949

3,904✔
3950
    logger.info("Realm sync server stopped");
7,912✔
3951
}
7,912✔
3952

3953

3954
void ServerImpl::stop() noexcept
3955
{
8,802✔
3956
    util::LockGuard lock{m_mutex};
8,802✔
3957
    if (m_stopped)
8,802✔
3958
        return;
838✔
3959
    m_stopped = true;
7,964✔
3960
    m_wait_or_service_stopped_cond.notify_all();
7,964✔
3961
    m_service.stop();
7,964✔
3962
}
7,964✔
3963

3964

3965
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3966
{
23,280✔
3967
    m_pending_changesets_from_downstream_byte_size += byte_size;
23,280✔
3968
    logger.debug("Byte size for pending downstream changesets incremented by "
23,280✔
3969
                 "%1 to reach a total of %2",
23,280✔
3970
                 byte_size,
23,280✔
3971
                 m_pending_changesets_from_downstream_byte_size); // Throws
23,280✔
3972
}
23,280✔
3973

3974

3975
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3976
{
17,608✔
3977
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
17,608✔
3978
    m_pending_changesets_from_downstream_byte_size -= byte_size;
17,608✔
3979
    logger.debug("Byte size for pending downstream changesets decremented by "
17,608✔
3980
                 "%1 to reach a total of %2",
17,608✔
3981
                 byte_size,
17,608✔
3982
                 m_pending_changesets_from_downstream_byte_size); // Throws
17,608✔
3983
}
17,608✔
3984

3985

3986
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3987
{
1,080✔
3988
    return get_random();
1,080✔
3989
}
1,080✔
3990

3991

3992
Transformer& ServerImpl::get_transformer() noexcept
3993
{
×
3994
    return *m_transformer;
×
3995
}
×
3996

3997

3998
util::Buffer<char>& ServerImpl::get_transform_buffer() noexcept
3999
{
×
4000
    return m_transform_buffer;
×
4001
}
×
4002

4003

4004
void ServerImpl::listen()
4005
{
7,964✔
4006
    network::Resolver resolver{get_service()};
7,964✔
4007
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
7,964✔
4008
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
7,964✔
4009
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
7,964✔
4010

3,930✔
4011
    auto i = endpoints.begin();
7,964✔
4012
    auto end = endpoints.end();
7,964✔
4013
    for (;;) {
7,964✔
4014
        std::error_code ec;
7,964✔
4015
        m_acceptor.open(i->protocol(), ec);
7,964✔
4016
        if (!ec) {
7,964✔
4017
            using SocketBase = network::SocketBase;
7,964✔
4018
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
7,964✔
4019
            if (!ec) {
7,964✔
4020
                m_acceptor.bind(*i, ec);
7,964✔
4021
                if (!ec)
7,964✔
4022
                    break;
7,962✔
4023
            }
2✔
4024
            m_acceptor.close();
2✔
4025
        }
2✔
4026
        if (i + 1 == end) {
3,930!
4027
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
4028
                // FIXME: We don't have the error code for previous attempts, so
4029
                // can't print a nice message.
4030
                logger.error("Failed to bind to %1:%2", i2->address(),
×
4031
                             i2->port()); // Throws
×
4032
            }
×
4033
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
4034
                         ec.message()); // Throws
×
4035
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
4036
        }
×
4037
    }
2✔
4038

3,930✔
4039
    m_acceptor.listen(m_config.listen_backlog);
7,964✔
4040

3,930✔
4041
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
7,964✔
4042
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
7,954✔
4043
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
7,964✔
4044
                m_config.listen_backlog, ssl_mode); // Throws
7,964✔
4045

3,930✔
4046
    initiate_accept();
7,964✔
4047
}
7,964✔
4048

4049

4050
void ServerImpl::initiate_accept()
4051
{
9,956✔
4052
    auto handler = [this](std::error_code ec) {
5,922✔
4053
        if (ec != util::error::operation_aborted)
1,992✔
4054
            handle_accept(ec);
1,992✔
4055
    };
1,992✔
4056
    bool is_ssl = bool(m_ssl_context);
9,956✔
4057
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
9,956✔
4058
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
9,956✔
4059
}
9,956✔
4060

4061

4062
void ServerImpl::handle_accept(std::error_code ec)
4063
{
1,992✔
4064
    if (ec) {
1,992✔
4065
        if (ec != util::error::connection_aborted) {
×
4066
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4067

4068
            // We close the reserved files to get a few extra file descriptors.
4069
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4070
                m_reserved_files[i].reset();
×
4071
            }
×
4072

4073
            // FIXME: There are probably errors that need to be treated
4074
            // specially, and not cause the server to "crash".
4075

4076
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4077
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4078
                             "consider increasing the limit in your system config"); // Throws
×
4079
                throw OutOfFilesError(ec);
×
4080
            }
×
4081
            else {
×
4082
                throw std::system_error(ec);
×
4083
            }
×
4084
        }
×
4085
        logger.debug("Skipping aborted connection"); // Throws
×
4086
    }
×
4087
    else {
1,992✔
4088
        HTTPConnection& conn = *m_next_http_conn;
1,992✔
4089
        if (m_config.tcp_no_delay)
1,992✔
4090
            conn.get_socket().set_option(network::SocketBase::no_delay(true));       // Throws
1,676✔
4091
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn));      // Throws
1,992✔
4092
        Formatter& formatter = m_misc_buffers.formatter;
1,992✔
4093
        formatter.reset();
1,992✔
4094
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
1,992✔
4095
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
1,992✔
4096
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
1,992✔
4097
    }
1,992✔
4098
    initiate_accept(); // Throws
1,992✔
4099
}
1,992✔
4100

4101

4102
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4103
{
1,992✔
4104
    m_http_connections.erase(conn_id);
1,992✔
4105
}
1,992✔
4106

4107

4108
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4109
{
1,962✔
4110
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
1,962✔
4111
}
1,962✔
4112

4113

4114
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4115
{
1,178✔
4116
    m_sync_connections.erase(connection_id);
1,178✔
4117
}
1,178✔
4118

4119

4120
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4121
{
4✔
4122
    get_service().post([this, timeout](Status) {
4✔
4123
        m_config.connection_reaper_timeout = timeout;
4✔
4124
    });
4✔
4125
}
4✔
4126

4127

4128
void ServerImpl::close_connections()
4129
{
20✔
4130
    get_service().post([this](Status) {
20✔
4131
        do_close_connections(); // Throws
20✔
4132
    });
20✔
4133
}
20✔
4134

4135

4136
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4137
{
72✔
4138
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4139
}
72✔
4140

4141

4142
void ServerImpl::recognize_external_change(const std::string& virt_path)
4143
{
4,800✔
4144
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4145
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4146
        do_recognize_external_change(virt_path); // Throws
4,800✔
4147
    });                                          // Throws
4,800✔
4148
}
4,800✔
4149

4150

4151
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4152
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4153
{
×
4154
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4155
                "timeout = %1",
×
4156
                timeout); // Throws
×
4157

4158
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4159
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4160
                                                    timeout); // Throws
×
4161
    });
×
4162
}
×
4163

4164

4165
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4166
{
8,060✔
4167
    m_connection_reaper_timer.emplace(get_service());
8,060✔
4168
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
4,026✔
4169
        if (status != ErrorCodes::OperationAborted) {
96✔
4170
            reap_connections();                        // Throws
96✔
4171
            initiate_connection_reaper_timer(timeout); // Throws
96✔
4172
        }
96✔
4173
    }); // Throws
96✔
4174
}
8,060✔
4175

4176

4177
void ServerImpl::reap_connections()
4178
{
96✔
4179
    logger.debug("Discarding dead connections"); // Throws
96✔
4180
    SteadyTimePoint now = steady_clock_now();
96✔
4181
    {
96✔
4182
        auto end = m_http_connections.end();
96✔
4183
        auto i = m_http_connections.begin();
96✔
4184
        while (i != end) {
98✔
4185
            HTTPConnection& conn = *i->second;
2✔
4186
            ++i;
2✔
4187
            // Suicide
4188
            conn.terminate_if_dead(now); // Throws
2✔
4189
        }
2✔
4190
    }
96✔
4191
    {
96✔
4192
        auto end = m_sync_connections.end();
96✔
4193
        auto i = m_sync_connections.begin();
96✔
4194
        while (i != end) {
186✔
4195
            SyncConnection& conn = *i->second;
90✔
4196
            ++i;
90✔
4197
            // Suicide
86✔
4198
            conn.terminate_if_dead(now); // Throws
90✔
4199
        }
90✔
4200
    }
96✔
4201
}
96✔
4202

4203

4204
void ServerImpl::do_close_connections()
4205
{
20✔
4206
    for (auto& entry : m_sync_connections) {
20✔
4207
        SyncConnection& conn = *entry.second;
20✔
4208
        conn.initiate_soft_close(); // Throws
20✔
4209
    }
20✔
4210
}
20✔
4211

4212

4213
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4214
{
4,800✔
4215
    auto i = m_files.find(virt_path);
4,800✔
4216
    if (i == m_files.end())
4,800✔
4217
        return;
2✔
4218
    ServerFile& file = *i->second;
4,798✔
4219
    file.recognize_external_change();
4,798✔
4220
}
4,798✔
4221

4222

4223
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4224
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4225
{
×
4226
    static_cast<void>(timeout);
×
4227
    if (m_sync_stopped)
×
4228
        return;
×
4229
    do_close_connections(); // Throws
×
4230
    m_sync_stopped = true;
×
4231
    bool completion_reached = false;
×
4232
    completion_handler(completion_reached); // Throws
×
4233
}
×
4234

4235

4236
// ============================ SyncConnection implementation ============================
4237

4238
SyncConnection::~SyncConnection() noexcept
4239
{
1,962✔
4240
    m_sessions_enlisted_to_send.clear();
1,962✔
4241
    m_sessions.clear();
1,962✔
4242
}
1,962✔
4243

4244

4245
void SyncConnection::initiate()
4246
{
1,962✔
4247
    m_last_activity_at = steady_clock_now();
1,962✔
4248
    logger.debug("Sync Connection initiated");
1,962✔
4249
    m_websocket.initiate_server_websocket_after_handshake();
1,962✔
4250
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
1,962✔
4251
                     m_appservices_request_id);
1,962✔
4252
}
1,962✔
4253

4254

4255
template <class... Params>
4256
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4257
{
1,178✔
4258
    terminate_sessions();                              // Throws
1,178✔
4259
    logger.log(log_level, log_message, log_params...); // Throws
1,178✔
4260
    m_websocket.stop();
1,178✔
4261
    m_ssl_stream.reset();
1,178✔
4262
    m_socket.reset();
1,178✔
4263
    // Suicide
642✔
4264
    m_server.remove_sync_connection(m_id);
1,178✔
4265
}
1,178✔
4266

4267

4268
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4269
{
90✔
4270
    milliseconds_type time = steady_duration(m_last_activity_at, now);
90✔
4271
    const Server::Config& config = m_server.get_config();
90✔
4272
    if (m_is_closing) {
90✔
4273
        if (time >= config.soft_close_timeout) {
×
4274
            // Suicide
4275
            terminate(Logger::Level::detail,
×
4276
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4277
        }
×
4278
    }
×
4279
    else {
90✔
4280
        if (time >= config.connection_reaper_timeout) {
90✔
4281
            // Suicide
2✔
4282
            terminate(Logger::Level::detail,
4✔
4283
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4284
        }
4✔
4285
    }
90✔
4286
}
90✔
4287

4288

4289
void SyncConnection::enlist_to_send(Session* sess) noexcept
4290
{
105,256✔
4291
    REALM_ASSERT(m_send_trigger);
105,256✔
4292
    REALM_ASSERT(!m_is_closing);
105,256✔
4293
    REALM_ASSERT(!sess->is_enlisted_to_send());
105,256✔
4294
    m_sessions_enlisted_to_send.push_back(sess);
105,256✔
4295
    m_send_trigger->trigger();
105,256✔
4296
}
105,256✔
4297

4298

4299
void SyncConnection::handle_protocol_error(Status status)
4300
{
×
4301
    logger.error("%1", status);
×
4302
    switch (status.code()) {
×
4303
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4304
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4305
            break;
×
4306
        case ErrorCodes::LimitExceeded:
×
4307
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4308
            break;
×
4309
        default:
×
4310
            protocol_error(ProtocolError::other_error);
×
4311
            break;
×
4312
    }
×
4313
}
×
4314

4315
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4316
                                          std::string signed_user_token, bool need_client_file_ident,
4317
                                          bool is_subserver)
4318
{
5,598✔
4319
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
5,598✔
4320
    bool was_inserted = p.second;
5,598✔
4321
    if (REALM_UNLIKELY(!was_inserted)) {
5,598✔
4322
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4323
                     session_ident);                           // Throws
×
4324
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4325
        return;
×
4326
    }
×
4327
    try {
5,598✔
4328
        p.first->second.reset(new Session(*this, session_ident)); // Throws
5,598✔
4329
    }
5,598✔
4330
    catch (...) {
2,734✔
4331
        m_sessions.erase(p.first);
×
4332
        throw;
×
4333
    }
×
4334

2,734✔
4335
    Session& sess = *p.first->second;
5,598✔
4336
    sess.initiate(); // Throws
5,598✔
4337
    ProtocolError error;
5,598✔
4338
    bool success =
5,598✔
4339
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
5,598✔
4340
                                  error); // Throws
5,598✔
4341
    if (REALM_UNLIKELY(!success))         // Throws
5,598✔
4342
        protocol_error(error, &sess);     // Throws
2,748✔
4343
}
5,598✔
4344

4345

4346
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4347
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4348
                                           version_type scan_client_version, version_type latest_server_version,
4349
                                           salt_type latest_server_version_salt)
4350
{
4,170✔
4351
    auto i = m_sessions.find(session_ident);
4,170✔
4352
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,170✔
4353
        bad_session_ident("IDENT", session_ident); // Throws
×
4354
        return;
×
4355
    }
×
4356
    Session& sess = *i->second;
4,170✔
4357
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,170✔
4358
        message_after_unbind("IDENT", session_ident); // Throws
×
4359
        return;
×
4360
    }
×
4361
    if (REALM_UNLIKELY(sess.error_occurred())) {
4,170✔
4362
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
8✔
4363
        // messages, other than UNBIND, must be ignored.
8✔
4364
        return;
16✔
4365
    }
16✔
4366
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
4,154✔
4367
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4368
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4369
        return;
×
4370
    }
×
4371
    if (REALM_UNLIKELY(sess.ident_message_received())) {
4,154✔
4372
        logger.error("Received second IDENT message for session"); // Throws
×
4373
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4374
        return;
×
4375
    }
×
4376

2,108✔
4377
    ProtocolError error = {};
4,154✔
4378
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
4,154✔
4379
                                              scan_client_version, latest_server_version, latest_server_version_salt,
4,154✔
4380
                                              error); // Throws
4,154✔
4381
    if (REALM_UNLIKELY(!success))                     // Throws
4,154✔
4382
        protocol_error(error, &sess);                 // Throws
2,124✔
4383
}
4,154✔
4384

4385
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4386
                                            version_type progress_server_version, version_type locked_server_version,
4387
                                            const UploadChangesets& upload_changesets)
4388
{
44,942✔
4389
    auto i = m_sessions.find(session_ident);
44,942✔
4390
    if (REALM_UNLIKELY(i == m_sessions.end())) {
44,942✔
4391
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4392
        return;
×
4393
    }
×
4394
    Session& sess = *i->second;
44,942✔
4395
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
44,942✔
4396
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4397
        return;
×
4398
    }
×
4399
    if (REALM_UNLIKELY(sess.error_occurred())) {
44,942✔
4400
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4401
        // messages, other than UNBIND, must be ignored.
4402
        return;
×
4403
    }
×
4404
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
44,942✔
4405
        message_before_ident("UPLOAD", session_ident); // Throws
×
4406
        return;
×
4407
    }
×
4408

22,860✔
4409
    ProtocolError error = {};
44,942✔
4410
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
44,942✔
4411
                                               locked_server_version, upload_changesets, error); // Throws
44,942✔
4412
    if (REALM_UNLIKELY(!success))                                                                // Throws
44,942✔
4413
        protocol_error(error, &sess);                                                            // Throws
22,860✔
4414
}
44,942✔
4415

4416

4417
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4418
{
12,126✔
4419
    auto i = m_sessions.find(session_ident);
12,126✔
4420
    if (REALM_UNLIKELY(i == m_sessions.end())) {
12,126✔
4421
        bad_session_ident("MARK", session_ident);
×
4422
        return;
×
4423
    }
×
4424
    Session& sess = *i->second;
12,126✔
4425
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
12,126✔
4426
        message_after_unbind("MARK", session_ident); // Throws
×
4427
        return;
×
4428
    }
×
4429
    if (REALM_UNLIKELY(sess.error_occurred())) {
12,126✔
4430
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
24✔
4431
        // messages, other than UNBIND, must be ignored.
24✔
4432
        return;
48✔
4433
    }
48✔
4434
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
12,078✔
4435
        message_before_ident("MARK", session_ident); // Throws
×
4436
        return;
×
4437
    }
×
4438

5,972✔
4439
    ProtocolError error;
12,078✔
4440
    bool success = sess.receive_mark_message(request_ident, error); // Throws
12,078✔
4441
    if (REALM_UNLIKELY(!success))                                   // Throws
12,078✔
4442
        protocol_error(error, &sess);                               // Throws
5,972✔
4443
}
12,078✔
4444

4445

4446
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4447
{
2,864✔
4448
    auto i = m_sessions.find(session_ident); // Throws
2,864✔
4449
    if (REALM_UNLIKELY(i == m_sessions.end())) {
2,864✔
4450
        bad_session_ident("UNBIND", session_ident); // Throws
×
4451
        return;
×
4452
    }
×
4453
    Session& sess = *i->second;
2,864✔
4454
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
2,864✔
4455
        message_after_unbind("UNBIND", session_ident); // Throws
×
4456
        return;
×
4457
    }
×
4458

1,294✔
4459
    sess.receive_unbind_message(); // Throws
2,864✔
4460
    // NOTE: The session might have gotten destroyed at this time!
1,294✔
4461
}
2,864✔
4462

4463

4464
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4465
{
148✔
4466
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
148✔
4467
    m_send_pong = true;
148✔
4468
    m_last_ping_timestamp = timestamp;
148✔
4469
    if (!m_is_sending)
148✔
4470
        send_next_message();
146✔
4471
}
148✔
4472

4473

4474
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4475
                                           std::string_view error_body)
4476
{
×
4477
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4478
                 session_ident); // Throws
×
4479
    auto i = m_sessions.find(session_ident);
×
4480
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4481
        bad_session_ident("ERROR", session_ident);
×
4482
        return;
×
4483
    }
×
4484
    Session& sess = *i->second;
×
4485
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4486
        message_after_unbind("ERROR", session_ident); // Throws
×
4487
        return;
×
4488
    }
×
4489

4490
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4491
}
×
4492

4493
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4494
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4495
{
6,082✔
4496
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
6,082✔
4497
        return logger.log(level, message.c_str());
×
4498
    }
×
4499

3,072✔
4500
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
6,082✔
4501
    {
6,082✔
4502
        std::lock_guard lock(m_log_mutex);
6,082✔
4503
        m_log_messages.push(std::move(log_msg));
6,082✔
4504
    }
6,082✔
4505
    m_send_trigger->trigger();
6,082✔
4506
}
6,082✔
4507

4508

4509
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4510
{
×
4511
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4512
                 session_ident);                      // Throws
×
4513
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4514
}
×
4515

4516

4517
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4518
{
×
4519
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4520
                 session_ident);                      // Throws
×
4521
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4522
}
×
4523

4524

4525
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4526
{
×
4527
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4528
                 session_ident);                      // Throws
×
4529
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4530
}
×
4531

4532

4533
void SyncConnection::handle_message_received(const char* data, size_t size)
4534
{
69,850✔
4535
    // parse_message_received() parses the message and calls the
35,048✔
4536
    // proper handler on the SyncConnection object (this).
35,048✔
4537
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
69,850✔
4538
    return;
69,850✔
4539
}
69,850✔
4540

4541

4542
void SyncConnection::handle_ping_received(const char* data, size_t size)
4543
{
×
4544
    // parse_message_received() parses the message and calls the
4545
    // proper handler on the SyncConnection object (this).
4546
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4547
    return;
×
4548
}
×
4549

4550

4551
void SyncConnection::send_next_message()
4552
{
102,272✔
4553
    REALM_ASSERT(!m_is_sending);
102,272✔
4554
    REALM_ASSERT(!m_sending_pong);
102,272✔
4555
    if (m_send_pong) {
102,272✔
4556
        send_pong(m_last_ping_timestamp);
148✔
4557
        if (m_sending_pong)
148✔
4558
            return;
148✔
4559
    }
102,124✔
4560
    for (;;) {
151,730✔
4561
        Session* sess = m_sessions_enlisted_to_send.pop_front();
151,730✔
4562
        if (!sess) {
151,730✔
4563
            // No sessions were enlisted to send
23,648✔
4564
            if (REALM_LIKELY(!m_is_closing))
46,916✔
4565
                break; // Check to see if there are any log messages to go out
46,906✔
4566
            // Send a connection level ERROR
10✔
4567
            REALM_ASSERT(!is_session_level_error(m_error_code));
20✔
4568
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
20✔
4569
            return;
20✔
4570
        }
20✔
4571
        sess->send_message(); // Throws
104,814✔
4572
        // NOTE: The session might have gotten destroyed at this time!
54,862✔
4573

54,862✔
4574
        // At this point, `m_is_sending` is true if, and only if the session
54,862✔
4575
        // chose to send a message. If it chose to not send a message, we must
54,862✔
4576
        // loop back and give the next session in `m_sessions_enlisted_to_send`
54,862✔
4577
        // a chance.
54,862✔
4578
        if (m_is_sending)
104,814✔
4579
            return;
55,220✔
4580
    }
104,814✔
4581
    {
75,474✔
4582
        std::lock_guard lock(m_log_mutex);
46,884✔
4583
        if (!m_log_messages.empty()) {
46,884✔
4584
            send_log_message(m_log_messages.front());
6,042✔
4585
            m_log_messages.pop();
6,042✔
4586
        }
6,042✔
4587
    }
46,884✔
4588
    // Otherwise, nothing to do
23,628✔
4589
}
46,884✔
4590

4591

4592
void SyncConnection::initiate_write_output_buffer()
4593
{
61,260✔
4594
    auto handler = [this](std::error_code ec, size_t) {
61,220✔
4595
        if (!ec) {
61,218✔
4596
            handle_write_output_buffer();
61,070✔
4597
        }
61,070✔
4598
    };
61,218✔
4599

31,636✔
4600
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
61,260✔
4601
                                   std::move(handler)); // Throws
61,260✔
4602
    m_is_sending = true;
61,260✔
4603
}
61,260✔
4604

4605

4606
void SyncConnection::initiate_pong_output_buffer()
4607
{
148✔
4608
    auto handler = [this](std::error_code ec, size_t) {
148✔
4609
        if (!ec) {
148✔
4610
            handle_pong_output_buffer();
148✔
4611
        }
148✔
4612
    };
148✔
4613

46✔
4614
    REALM_ASSERT(!m_is_sending);
148✔
4615
    REALM_ASSERT(!m_sending_pong);
148✔
4616
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
148✔
4617
                                   std::move(handler)); // Throws
148✔
4618

46✔
4619
    m_is_sending = true;
148✔
4620
    m_sending_pong = true;
148✔
4621
}
148✔
4622

4623

4624
void SyncConnection::send_pong(milliseconds_type timestamp)
4625
{
148✔
4626
    REALM_ASSERT(m_send_pong);
148✔
4627
    REALM_ASSERT(!m_sending_pong);
148✔
4628
    m_send_pong = false;
148✔
4629
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
148✔
4630

46✔
4631
    OutputBuffer& out = get_output_buffer();
148✔
4632
    get_server_protocol().make_pong(out, timestamp); // Throws
148✔
4633

46✔
4634
    initiate_pong_output_buffer(); // Throws
148✔
4635
}
148✔
4636

4637
void SyncConnection::send_log_message(const LogMessage& log_msg)
4638
{
6,042✔
4639
    OutputBuffer& out = get_output_buffer();
6,042✔
4640
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
6,042✔
4641
                                           log_msg.co_id); // Throws
6,042✔
4642

3,060✔
4643
    initiate_write_output_buffer(); // Throws
6,042✔
4644
}
6,042✔
4645

4646

4647
void SyncConnection::handle_write_output_buffer()
4648
{
61,072✔
4649
    release_output_buffer();
61,072✔
4650
    m_is_sending = false;
61,072✔
4651
    send_next_message(); // Throws
61,072✔
4652
}
61,072✔
4653

4654

4655
void SyncConnection::handle_pong_output_buffer()
4656
{
148✔
4657
    release_output_buffer();
148✔
4658
    REALM_ASSERT(m_is_sending);
148✔
4659
    REALM_ASSERT(m_sending_pong);
148✔
4660
    m_is_sending = false;
148✔
4661
    m_sending_pong = false;
148✔
4662
    send_next_message(); // Throws
148✔
4663
}
148✔
4664

4665

4666
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4667
{
20✔
4668
    const char* message = get_protocol_error_message(int(error_code));
20✔
4669
    std::size_t message_size = std::strlen(message);
20✔
4670
    bool try_again = determine_try_again(error_code);
20✔
4671

10✔
4672
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
20✔
4673
                  message_size, try_again, session_ident); // Throws
20✔
4674

10✔
4675
    OutputBuffer& out = get_output_buffer();
20✔
4676
    int protocol_version = get_client_protocol_version();
20✔
4677
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
20✔
4678
                                             session_ident); // Throws
20✔
4679

10✔
4680
    auto handler = [this](std::error_code ec, size_t) {
20✔
4681
        handle_write_error(ec); // Throws
20✔
4682
    };
20✔
4683
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
20✔
4684
    m_is_sending = true;
20✔
4685
}
20✔
4686

4687

4688
void SyncConnection::handle_write_error(std::error_code ec)
4689
{
20✔
4690
    m_is_sending = false;
20✔
4691
    REALM_ASSERT(m_is_closing);
20✔
4692
    if (!m_ssl_stream) {
20✔
4693
        m_socket->shutdown(network::Socket::shutdown_send, ec);
20✔
4694
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
20!
4695
            throw std::system_error(ec);
×
4696
    }
20✔
4697
}
20✔
4698

4699

4700
// For connection level errors, `sess` is ignored. For session level errors, a
4701
// session must be specified.
4702
//
4703
// If a session is specified, that session object will have been detached from
4704
// the ServerFile object (and possibly destroyed) upon return from
4705
// protocol_error().
4706
//
4707
// If a session is specified for a protocol level error, that session object
4708
// will have been destroyed upon return from protocol_error(). For session level
4709
// errors, the specified session will have been destroyed upon return from
4710
// protocol_error() if, and only if the negotiated protocol version is less than
4711
// 23.
4712
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4713
{
76✔
4714
    REALM_ASSERT(!m_is_closing);
76✔
4715
    bool session_level = is_session_level_error(error_code);
76✔
4716
    REALM_ASSERT(!session_level || sess);
76✔
4717
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
76✔
4718
    if (logger.would_log(util::Logger::Level::debug)) {
76✔
4719
        const char* message = get_protocol_error_message(int(error_code));
×
4720
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4721
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4722
    }
×
4723
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
76✔
4724
    if (session_level) {
76✔
4725
        sess->initiate_deactivation(error_code); // Throws
76✔
4726
        return;
76✔
4727
    }
76✔
4728
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4729
}
×
4730

4731

4732
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4733
{
20✔
4734
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
20✔
4735

10✔
4736
    // With recent versions of the protocol (when the version is greater than,
10✔
4737
    // or equal to 23), this function will only be called for connection level
10✔
4738
    // errors, never for session specific errors. However, for the purpose of
10✔
4739
    // emulating earlier protocol versions, this function might be called for
10✔
4740
    // session specific errors too.
10✔
4741
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
20✔
4742
    REALM_ASSERT(!is_session_level_error(error_code));
20✔
4743

10✔
4744
    REALM_ASSERT(m_send_trigger);
20✔
4745
    REALM_ASSERT(!m_is_closing);
20✔
4746
    m_is_closing = true;
20✔
4747

10✔
4748
    m_error_code = error_code;
20✔
4749
    m_error_session_ident = session_ident;
20✔
4750

10✔
4751
    // Don't waste time and effort sending any other messages
10✔
4752
    m_send_pong = false;
20✔
4753
    m_sessions_enlisted_to_send.clear();
20✔
4754

10✔
4755
    m_receiving_session = nullptr;
20✔
4756

10✔
4757
    terminate_sessions(); // Throws
20✔
4758

10✔
4759
    m_send_trigger->trigger();
20✔
4760
}
20✔
4761

4762

4763
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4764
{
696✔
4765
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
596✔
4766
    // Suicide
378✔
4767
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
696✔
4768
}
696✔
4769

4770

4771
void SyncConnection::close_due_to_error(std::error_code ec)
4772
{
478✔
4773
    // Suicide
262✔
4774
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
478✔
4775
              ec.message()); // Throws
478✔
4776
}
478✔
4777

4778

4779
void SyncConnection::terminate_sessions()
4780
{
1,198✔
4781
    for (auto& entry : m_sessions) {
1,850✔
4782
        Session& sess = *entry.second;
1,850✔
4783
        sess.terminate(); // Throws
1,850✔
4784
    }
1,850✔
4785
    m_sessions_enlisted_to_send.clear();
1,198✔
4786
    m_sessions.clear();
1,198✔
4787
}
1,198✔
4788

4789

4790
void SyncConnection::initiate_soft_close()
4791
{
20✔
4792
    if (!m_is_closing) {
20✔
4793
        session_ident_type session_ident = 0;                                    // Not session specific
20✔
4794
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
20✔
4795
    }
20✔
4796
}
20✔
4797

4798

4799
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4800
{
2,638✔
4801
    m_sessions.erase(session_ident);
2,638✔
4802
}
2,638✔
4803

4804
} // anonymous namespace
4805

4806

4807
// ============================ sync::Server implementation ============================
4808

4809
class Server::Implementation : public ServerImpl {
4810
public:
4811
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4812
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
4813
    {
7,964✔
4814
    }
7,964✔
4815
    virtual ~Implementation() {}
7,964✔
4816
};
4817

4818

4819
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4820
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
4821
{
7,964✔
4822
}
7,964✔
4823

4824

4825
Server::Server(Server&& serv) noexcept
4826
    : m_impl{std::move(serv.m_impl)}
4827
{
×
4828
}
×
4829

4830

4831
Server::~Server() noexcept {}
7,964✔
4832

4833

4834
void Server::start()
4835
{
7,296✔
4836
    m_impl->start(); // Throws
7,296✔
4837
}
7,296✔
4838

4839

4840
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4841
{
668✔
4842
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
668✔
4843
}
668✔
4844

4845

4846
network::Endpoint Server::listen_endpoint() const
4847
{
7,970✔
4848
    return m_impl->listen_endpoint(); // Throws
7,970✔
4849
}
7,970✔
4850

4851

4852
void Server::run()
4853
{
7,912✔
4854
    m_impl->run(); // Throws
7,912✔
4855
}
7,912✔
4856

4857

4858
void Server::stop() noexcept
4859
{
8,802✔
4860
    m_impl->stop();
8,802✔
4861
}
8,802✔
4862

4863

4864
uint_fast64_t Server::errors_seen() const noexcept
4865
{
668✔
4866
    return m_impl->errors_seen;
668✔
4867
}
668✔
4868

4869

4870
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4871
                                                      milliseconds_type timeout)
4872
{
×
4873
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4874
}
×
4875

4876

4877
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4878
{
4✔
4879
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4880
}
4✔
4881

4882

4883
void Server::close_connections()
4884
{
20✔
4885
    m_impl->close_connections();
20✔
4886
}
20✔
4887

4888

4889
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4890
{
72✔
4891
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4892
}
72✔
4893

4894

4895
void Server::recognize_external_change(const std::string& virt_path)
4896
{
4,800✔
4897
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4898
}
4,800✔
4899

4900

4901
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4902
{
×
4903
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4904
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc