• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1816

04 Nov 2023 12:29AM UTC coverage: 91.648% (-0.01%) from 91.66%
1816

push

Evergreen

web-flow
Use a single write transaction for DiscardLocal client resets on FLX realms (#7110)

Updating the subscription store in a separate write transaction from the
recovery means that we temporarily commit an invalid state. If the application
crashes between committing the client reset diff and updating the subscription
store, the next launch of the application would try to use the now-invalid
pending subscriptions that should have been discarded.

92128 of 168844 branches covered (0.0%)

141 of 146 new or added lines in 7 files covered. (96.58%)

84 existing lines in 15 files now uncovered.

230681 of 251702 relevant lines covered (91.65%)

6383138.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.26
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/load_file.hpp>
29
#include <realm/util/memory_stream.hpp>
30
#include <realm/util/optional.hpp>
31
#include <realm/util/platform_info.hpp>
32
#include <realm/util/random.hpp>
33
#include <realm/util/safe_int_ops.hpp>
34
#include <realm/util/scope_exit.hpp>
35
#include <realm/util/scratch_allocator.hpp>
36
#include <realm/util/thread.hpp>
37
#include <realm/util/thread_exec_guard.hpp>
38
#include <realm/util/value_reset_guard.hpp>
39
#include <realm/version.hpp>
40

41
#include <algorithm>
42
#include <atomic>
43
#include <cctype>
44
#include <chrono>
45
#include <cmath>
46
#include <condition_variable>
47
#include <cstdint>
48
#include <cstdio>
49
#include <cstring>
50
#include <functional>
51
#include <locale>
52
#include <map>
53
#include <memory>
54
#include <queue>
55
#include <sstream>
56
#include <stdexcept>
57
#include <thread>
58
#include <vector>
59

60
// NOTE: The protocol specification is in `/doc/protocol.md`
61

62

63
// FIXME: Verify that session identifier spoofing cannot be used to get access
64
// to sessions belonging to other network conections in any way.
65
// FIXME: Seems that server must close connection with zero sessions after a
66
// certain timeout.
67

68

69
using namespace realm;
70
using namespace realm::sync;
71
using namespace realm::util;
72

73
// clang-format off
74
using ServerHistory         = _impl::ServerHistory;
75
using ServerProtocol        = _impl::ServerProtocol;
76
using ServerFileAccessCache = _impl::ServerFileAccessCache;
77
using ServerImplBase = _impl::ServerImplBase;
78

79
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
80
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
81
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
82
using IntegrationResult = ServerHistory::IntegrationResult;
83
using BootstrapError = ServerHistory::BootstrapError;
84
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
85
using ClientType = ServerHistory::ClientType;
86
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
87
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
88

89
using UploadChangeset = ServerProtocol::UploadChangeset;
90
// clang-format on
91

92

93
using UploadChangesets = std::vector<UploadChangeset>;
94

95
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
96

97

98
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
99
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
100
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
101
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
102

103

104
namespace {
105

106
enum class SchedStatus { done = 0, pending, in_progress };
107

108
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
109
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
110
{
×
111
    return "com.mongodb.realm-sync/";
×
112
}
×
113

114
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
115
{
574✔
116
    if (str.size() > cutoff) {
574✔
117
        return "..." + str.substr(str.size() - cutoff);
×
118
    }
×
119
    else {
574✔
120
        return str;
574✔
121
    }
574✔
122
}
574✔
123

124

125
class HttpListHeaderValueParser {
126
public:
127
    HttpListHeaderValueParser(std::string_view string) noexcept
128
        : m_string{string}
129
    {
1,928✔
130
    }
1,928✔
131
    bool next(std::string_view& elem) noexcept
132
    {
19,280✔
133
        while (m_pos < m_string.size()) {
19,280✔
134
            size_type i = m_pos;
17,352✔
135
            size_type j = m_string.find(',', i);
17,352✔
136
            if (j != std::string_view::npos) {
17,352✔
137
                m_pos = j + 1;
15,424✔
138
            }
15,424✔
139
            else {
1,928✔
140
                j = m_string.size();
1,928✔
141
                m_pos = j;
1,928✔
142
            }
1,928✔
143

8,460✔
144
            // Exclude leading and trailing white space
8,460✔
145
            while (i < j && is_http_lws(m_string[i]))
32,776✔
146
                ++i;
15,424✔
147
            while (j > i && is_http_lws(m_string[j - 1]))
17,352✔
148
                --j;
×
149

8,460✔
150
            if (i != j) {
17,352✔
151
                elem = m_string.substr(i, j - i);
17,352✔
152
                return true;
17,352✔
153
            }
17,352✔
154
        }
17,352✔
155
        return false;
10,388✔
156
    }
19,280✔
157

158
private:
159
    using size_type = std::string_view::size_type;
160
    const std::string_view m_string;
161
    size_type m_pos = 0;
162
    static bool is_http_lws(char ch) noexcept
163
    {
50,128✔
164
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
50,128✔
165
    }
50,128✔
166
};
167

168

169
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
170
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
171
using SteadyTimePoint = SteadyClock::time_point;
172

173
SteadyTimePoint steady_clock_now() noexcept
174
{
151,358✔
175
    return SteadyClock::now();
151,358✔
176
}
151,358✔
177

178
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
179
{
38,122✔
180
    auto duration = end_time - start_time;
38,122✔
181
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
38,122✔
182
    return milliseconds_type(millis_duration);
38,122✔
183
}
38,122✔
184

185

186
bool determine_try_again(ProtocolError error_code) noexcept
187
{
108✔
188
    return (error_code == ProtocolError::connection_closed);
108✔
189
}
108✔
190

191

192
class ServerFile;
193
class ServerImpl;
194
class HTTPConnection;
195
class SyncConnection;
196
class Session;
197

198

199
using Formatter = util::ResettableExpandableBufferOutputStream;
200
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
201

202
using ProtocolVersionRange = std::pair<int, int>;
203

204
class MiscBuffers {
205
public:
206
    Formatter formatter;
207
    OutputBuffer download_message;
208

209
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
210
    ProtocolVersionRanges protocol_version_ranges;
211

212
    std::vector<char> compress;
213

214
    MiscBuffers()
215
    {
7,968✔
216
        formatter.imbue(std::locale::classic());
7,968✔
217
        download_message.imbue(std::locale::classic());
7,968✔
218
    }
7,968✔
219
};
220

221

222
struct DownloadCache {
223
    std::unique_ptr<char[]> body;
224
    std::size_t uncompressed_body_size;
225
    std::size_t compressed_body_size;
226
    bool body_is_compressed;
227
    version_type end_version;
228
    DownloadCursor download_progress;
229
    std::uint_fast64_t downloadable_bytes;
230
    std::size_t num_changesets;
231
    std::size_t accum_original_size;
232
    std::size_t accum_compacted_size;
233
};
234

235

236
// An unblocked work unit is comprised of one Work object for each of the files
237
// that contribute work to the work unit, generally one reference file and a
238
// number of partial files.
239
class Work {
240
public:
241
    // In general, primary work is all forms of modifying work, including file
242
    // deletion.
243
    bool has_primary_work = false;
244

245
    // Only for reference files
246
    bool might_produce_new_sync_version = false;
247

248
    bool produced_new_realm_version = false;
249
    bool produced_new_sync_version = false;
250
    bool expired_reference_version = false;
251

252
    // True if, and only if changesets_from_downstream contains at least one
253
    // changeset.
254
    bool have_changesets_from_downstream = false;
255

256
    FileIdentAllocSlots file_ident_alloc_slots;
257
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
258
    IntegratableChangesets changesets_from_downstream;
259

260
    VersionInfo version_info;
261

262
    // Result of integration of changesets from downstream clients
263
    IntegrationResult integration_result;
264

265
    void reset() noexcept
266
    {
38,034✔
267
        has_primary_work = false;
38,034✔
268

19,250✔
269
        might_produce_new_sync_version = false;
38,034✔
270

19,250✔
271
        produced_new_realm_version = false;
38,034✔
272
        produced_new_sync_version = false;
38,034✔
273
        expired_reference_version = false;
38,034✔
274
        have_changesets_from_downstream = false;
38,034✔
275

19,250✔
276
        file_ident_alloc_slots.clear();
38,034✔
277
        changeset_buffers.clear();
38,034✔
278
        changesets_from_downstream.clear();
38,034✔
279

19,250✔
280
        version_info = {};
38,034✔
281
        integration_result = {};
38,034✔
282
    }
38,034✔
283
};
284

285

286
class WorkerState {
287
public:
288
    FileIdentAllocSlots file_ident_alloc_slots;
289
    util::ScratchMemory scratch_memory;
290
    bool use_file_cache = true;
291
    std::unique_ptr<ServerHistory> reference_hist;
292
    DBRef reference_sg;
293
};
294

295

296
// ============================ SessionQueue ============================
297

298
class SessionQueue {
299
public:
300
    void push_back(Session*) noexcept;
301
    Session* pop_front() noexcept;
302
    void clear() noexcept;
303

304
private:
305
    Session* m_back = nullptr;
306
};
307

308

309
// ============================ FileIdentReceiver ============================
310

311
class FileIdentReceiver {
312
public:
313
    virtual void receive_file_ident(SaltedFileIdent) = 0;
314

315
protected:
316
    ~FileIdentReceiver() {}
5,666✔
317
};
318

319

320
// ============================ WorkerBox =============================
321

322
class WorkerBox {
323
public:
324
    using JobType = util::UniqueFunction<void(WorkerState&)>;
325
    void add_work(WorkerState& state, JobType job)
326
    {
×
327
        std::unique_lock<std::mutex> lock(m_mutex);
×
328
        if (m_jobs.size() >= m_queue_limit) {
×
329
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
330
            // than to queue it.
×
331
            run_a_job(lock, state, job);
×
332
        }
×
333
        else {
×
334
            // Create worker threads on demand (if all existing threads are active):
×
335
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
336
                m_threads.emplace_back([this]() {
×
337
                    WorkerState state;
×
338
                    state.use_file_cache = false;
×
339
                    JobType the_job;
×
340
                    std::unique_lock<std::mutex> lock(m_mutex);
×
341
                    for (;;) {
×
342
                        while (m_jobs.empty() && !m_finish_up)
×
343
                            m_changes.wait(lock);
×
344
                        if (m_finish_up)
×
345
                            break; // terminate thread
×
346
                        the_job = std::move(m_jobs.back());
×
347
                        m_jobs.pop_back();
×
348
                        run_a_job(lock, state, the_job);
×
349
                        m_changes.notify_all();
×
350
                    }
×
351
                });
×
352
            }
×
353

×
354
            // Submit the job for execution:
×
355
            m_jobs.emplace_back(std::move(job));
×
356
            m_changes.notify_all();
×
357
        }
×
358
    }
×
359

360
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
361
    // propagation of exceptions.
362
    void wait_completion(WorkerState& state)
363
    {
×
364
        std::unique_lock<std::mutex> lock(m_mutex);
×
365
        while (!m_jobs.empty() || m_active > 0) {
×
366
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
367
                JobType the_job = std::move(m_jobs.back());
×
368
                m_jobs.pop_back();
×
369
                run_a_job(lock, state, the_job);
×
370
            }
×
371
            else {
×
372
                m_changes.wait(lock);
×
373
            }
×
374
        }
×
375
        if (m_epr) {
×
376
            std::rethrow_exception(m_epr);
×
377
        }
×
378
    }
×
379

380
    WorkerBox(unsigned int num_threads)
381
    {
×
382
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
383
        m_max_num_threads = num_threads;
×
384
    }
×
385

386
    ~WorkerBox()
387
    {
×
388
        {
×
389
            std::unique_lock<std::mutex> lock(m_mutex);
×
390
            m_finish_up = true;
×
391
            m_changes.notify_all();
×
392
        }
×
393
        for (auto& e : m_threads)
×
394
            e.join();
×
395
    }
×
396

397
private:
398
    std::mutex m_mutex;
399
    std::condition_variable m_changes;
400
    std::vector<std::thread> m_threads;
401
    std::vector<JobType> m_jobs;
402
    unsigned int m_active = 0;
403
    bool m_finish_up = false;
404
    unsigned int m_queue_limit = 0;
405
    unsigned int m_max_num_threads = 0;
406
    std::exception_ptr m_epr;
407

408
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
409
    {
×
410
        ++m_active;
×
411
        lock.unlock();
×
412
        try {
×
413
            job(state);
×
414
            lock.lock();
×
415
        }
×
416
        catch (...) {
×
417
            lock.lock();
×
418
            if (!m_epr)
×
419
                m_epr = std::current_exception();
×
420
        }
×
421
        --m_active;
×
422
    }
×
423
};
424

425

426
// ============================ ServerFile ============================
427

428
class ServerFile : public util::RefCountBase {
429
public:
430
    util::PrefixLogger logger;
431

432
    // Logger to be used by the worker thread
433
    util::PrefixLogger wlogger;
434

435
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
436
               bool disable_sync_to_disk);
437
    ~ServerFile() noexcept;
438

439
    void initialize();
440
    void activate();
441

442
    ServerImpl& get_server() noexcept
443
    {
39,838✔
444
        return m_server;
39,838✔
445
    }
39,838✔
446

447
    const std::string& get_real_path() const noexcept
448
    {
×
449
        return m_file.realm_path;
×
450
    }
×
451

452
    const std::string& get_virt_path() const noexcept
453
    {
×
454
        return m_file.virt_path;
×
455
    }
×
456

457
    ServerFileAccessCache::File& access()
458
    {
50,280✔
459
        return m_file.access(); // Throws
50,280✔
460
    }
50,280✔
461

462
    ServerFileAccessCache::File& worker_access()
463
    {
38,000✔
464
        return m_worker_file.access(); // Throws
38,000✔
465
    }
38,000✔
466

467
    version_type get_realm_version() const noexcept
468
    {
×
469
        return m_version_info.realm_version;
×
470
    }
×
471

472
    version_type get_sync_version() const noexcept
473
    {
×
474
        return m_version_info.sync_version.version;
×
475
    }
×
476

477
    SaltedVersion get_salted_sync_version() const noexcept
478
    {
103,162✔
479
        return m_version_info.sync_version;
103,162✔
480
    }
103,162✔
481

482
    DownloadCache& get_download_cache() noexcept;
483

484
    void register_client_access(file_ident_type client_file_ident);
485

486
    using file_ident_request_type = std::int_fast64_t;
487

488
    // Initiate a request for a new client file identifier.
489
    //
490
    // Unless the request is cancelled, the identifier will be delivered to the
491
    // receiver by way of an invocation of
492
    // FileIdentReceiver::receive_file_ident().
493
    //
494
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
495
    // until after request_file_ident() has returned (no callback reentrance).
496
    //
497
    // New client file identifiers will be delivered to receivers in the order
498
    // that they were requested.
499
    //
500
    // The returned value is a nonzero integer that can be used to cancel the
501
    // request before the file identifier is delivered using
502
    // cancel_file_ident_request().
503
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
504

505
    // Cancel the specified file identifier request.
506
    //
507
    // It is an error to call this function after the identifier has been
508
    // delivered.
509
    void cancel_file_ident_request(file_ident_request_type) noexcept;
510

511
    void add_unidentified_session(Session*);
512
    void identify_session(Session*, file_ident_type client_file_ident);
513

514
    void remove_unidentified_session(Session*) noexcept;
515
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
516

517
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
518

519
    bool can_add_changesets_from_downstream() const noexcept;
520
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
521
                                        version_type locked_server_version, const UploadChangeset*,
522
                                        std::size_t num_changesets);
523

524
    // bootstrap_client_session calls the function of same name in server_history
525
    // but corrects the upload_progress with information from pending
526
    // integratable changesets. A situation can occur where a client terminates
527
    // a session and starts a new session and re-uploads changesets that are known
528
    // by the ServerFile object but not by the ServerHistory.
529
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
530
                                            SaltedVersion server_version, ClientType client_type,
531
                                            UploadCursor& upload_progress, version_type& locked_server_version,
532
                                            Logger&);
533

534
    // NOTE: This function is executed by the worker thread
535
    void worker_process_work_unit(WorkerState&);
536

537
    void recognize_external_change();
538

539
private:
540
    ServerImpl& m_server;
541
    ServerFileAccessCache::Slot m_file;
542

543
    // In general, `m_version_info` refers to the last snapshot of the Realm
544
    // file that is supposed to be visible to remote peers engaging in regular
545
    // Realm file synchronization.
546
    VersionInfo m_version_info;
547

548
    file_ident_request_type m_last_file_ident_request = 0;
549

550
    // The set of sessions whose client file identifier is not yet known, i.e.,
551
    // those for which an IDENT message has not yet been received,
552
    std::set<Session*> m_unidentified_sessions;
553

554
    // A map of the sessions whose client file identifier is known, i.e, those
555
    // for which an IDENT message has been received.
556
    std::map<file_ident_type, Session*> m_identified_sessions;
557

558
    // Used when a file used as partial view wants to allocate a client file
559
    // identifier from the reference Realm.
560
    file_ident_request_type m_file_ident_request = 0;
561

562
    struct FileIdentRequestInfo {
563
        FileIdentReceiver* receiver;
564
        file_ident_type proxy_file;
565
        ClientType client_type;
566
    };
567

568
    // When nonempty, it counts towards outstanding blocked work (see
569
    // `m_has_blocked_work`).
570
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
571

572
    // Changesets received from the downstream clients, and waiting to be
573
    // integrated, as well as information about the clients progress in terms of
574
    // integrating changesets received from the server. When nonempty, it counts
575
    // towards outstanding blocked work (see `m_has_blocked_work`).
576
    //
577
    // At any given time, the set of changesets from a particular client-side
578
    // file may be comprised of changesets received via distinct sessions.
579
    //
580
    // See also `m_num_changesets_from_downstream`.
581
    IntegratableChangesets m_changesets_from_downstream;
582

583
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
584
    //
585
    // Its purpose is also to initialize
586
    // `Work::have_changesets_from_downstream`.
587
    std::size_t m_num_changesets_from_downstream = 0;
588

589
    // The total size, in bytes, of the changesets that were received from
590
    // clients, are targeting this file, and are currently part of the blocked
591
    // work unit.
592
    //
593
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
594
    // purpose is to allow the server to keep track of the accumulated size of
595
    // changesets being processed, or waiting to be processed (metric
596
    // `upload.pending.bytes`) (see
597
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
598
    //
599
    // Its purpose is also to enable the "very poor man's" backpressure solution
600
    // (see can_add_changesets_from_downstream()).
601
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
602

603
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
604
    // currently unblocked work unit.
605
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
606

607
    // When nonempty, it counts towards outstanding blocked work (see
608
    // `m_has_blocked_work`).
609
    std::vector<std::string> m_permission_changes;
610

611
    // True iff this file, or any of its associated partial files (when
612
    // applicable), has a nonzero amount of outstanding work that is currently
613
    // held back from being passed to the worker thread because a previously
614
    // accumulated chunk of work related to this file is currently in progress.
615
    bool m_has_blocked_work = false;
616

617
    // A file, that is not a partial file, is considered *exposed to the worker
618
    // thread* from the point in time where it is submitted to the worker
619
    // (Worker::enqueue()) and up until the point in time where
620
    // group_postprocess_stage_1() starts to execute. A partial file is
621
    // considered *exposed to the worker thread* precisely when the associated
622
    // reference file is exposed to the worker thread, but only if it was in
623
    // `m_reference_file->m_work.partial_files` at the point in time where the
624
    // reference file was passed to the worker.
625
    //
626
    // While this file is exposed to the worker thread, all members of `m_work`
627
    // other than `changesets_from_downstream` may be accessed and modified by
628
    // the worker thread only.
629
    //
630
    // While this file is exposed to the worker thread,
631
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
632
    // must not be modified by any thread. This special status of
633
    // `m_work.changesets_from_downstream` is required to allow
634
    // ServerFile::bootstrap_client_session() to read from it at any time.
635
    Work m_work;
636

637
    // For reference files, set to true when work is unblocked, and reset back
638
    // to false when the work finalization process completes
639
    // (group_postprocess_stage_3()). Always zero for partial files.
640
    bool m_has_work_in_progress = 0;
641

642
    // This one must only be accessed by the worker thread.
643
    //
644
    // More specifically, `m_worker_file.access()` must only be called by the
645
    // worker thread, and if it was ever called, it must be closed by the worker
646
    // thread before the ServerFile object is destroyed, if destruction happens
647
    // before the destruction of the server object itself.
648
    ServerFileAccessCache::Slot m_worker_file;
649

650
    std::vector<std::int_fast64_t> m_deleting_connections;
651

652
    DownloadCache m_download_cache;
653

654
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
655
    void on_work_added();
656
    void group_unblock_work();
657
    void unblock_work();
658

659
    /// Resume history scanning in all sessions bound to this file. To be called
660
    /// after a successfull integration of a changeset.
661
    void resume_download() noexcept;
662

663
    // NOTE: These functions are executed by the worker thread
664
    void worker_allocate_file_identifiers();
665
    bool worker_integrate_changes_from_downstream(WorkerState&);
666
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
667
                                           DBRef& sg_ptr);
668
    ServerHistory& get_reference_file_history(WorkerState& state);
669
    void group_postprocess_stage_1();
670
    void group_postprocess_stage_2();
671
    void group_postprocess_stage_3();
672
    void group_finalize_work_stage_1();
673
    void group_finalize_work_stage_2();
674
    void finalize_work_stage_1();
675
    void finalize_work_stage_2();
676
};
677

678

679
inline DownloadCache& ServerFile::get_download_cache() noexcept
680
{
40,094✔
681
    return m_download_cache;
40,094✔
682
}
40,094✔
683

684
inline void ServerFile::group_finalize_work_stage_1()
685
{
37,632✔
686
    finalize_work_stage_1(); // Throws
37,632✔
687
}
37,632✔
688

689
inline void ServerFile::group_finalize_work_stage_2()
690
{
37,634✔
691
    finalize_work_stage_2(); // Throws
37,634✔
692
}
37,634✔
693

694

695
// ============================ Worker ============================
696

697
// All write transaction on server-side Realm files performed on behalf of the
698
// server, must be performed by the worker thread, not the network event loop
699
// thread. This is to ensure that the network event loop thread never gets
700
// blocked waiting for the worker thread to end a long running write
701
// transaction.
702
//
703
// FIXME: Currently, the event loop thread does perform a number of write
704
// transactions, but only on subtier nodes of a star topology server cluster.
705
class Worker : public ServerHistory::Context {
706
public:
707
    util::PrefixLogger logger;
708

709
    explicit Worker(ServerImpl&);
710

711
    ServerFileAccessCache& get_file_access_cache() noexcept;
712

713
    void enqueue(ServerFile*);
714

715
    // Overriding members of ServerHistory::Context
716
    std::mt19937_64& server_history_get_random() noexcept override final;
717

718
private:
719
    ServerImpl& m_server;
720
    std::mt19937_64 m_random;
721
    ServerFileAccessCache m_file_access_cache;
722

723
    util::Mutex m_mutex;
724
    util::CondVar m_cond; // Protected by `m_mutex`
725

726
    bool m_stop = false; // Protected by `m_mutex`
727

728
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
729

730
    WorkerState m_state;
731

732
    void run();
733
    void stop() noexcept;
734

735
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
736
};
737

738

739
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
740
{
1,082✔
741
    return m_file_access_cache;
1,082✔
742
}
1,082✔
743

744

745
// ============================ ServerImpl ============================
746

747
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
748
public:
749
    std::uint_fast64_t errors_seen = 0;
750

751
    std::atomic<milliseconds_type> m_par_time;
752
    std::atomic<milliseconds_type> m_seq_time;
753

754
    util::Mutex last_client_accesses_mutex;
755

756
    const std::shared_ptr<util::Logger> logger_ptr;
757
    util::Logger& logger;
758

759
    network::Service& get_service() noexcept
760
    {
78,706✔
761
        return m_service;
78,706✔
762
    }
78,706✔
763

764
    const network::Service& get_service() const noexcept
765
    {
×
766
        return m_service;
×
767
    }
×
768

769
    std::mt19937_64& get_random() noexcept
770
    {
64,262✔
771
        return m_random;
64,262✔
772
    }
64,262✔
773

774
    const Server::Config& get_config() const noexcept
775
    {
123,522✔
776
        return m_config;
123,522✔
777
    }
123,522✔
778

779
    std::size_t get_max_upload_backlog() const noexcept
780
    {
43,862✔
781
        return m_max_upload_backlog;
43,862✔
782
    }
43,862✔
783

784
    const std::string& get_root_dir() const noexcept
785
    {
5,666✔
786
        return m_root_dir;
5,666✔
787
    }
5,666✔
788

789
    network::ssl::Context& get_ssl_context() noexcept
790
    {
48✔
791
        return *m_ssl_context;
48✔
792
    }
48✔
793

794
    const AccessControl& get_access_control() const noexcept
795
    {
×
796
        return m_access_control;
×
797
    }
×
798

799
    ProtocolVersionRange get_protocol_version_range() const noexcept
800
    {
1,928✔
801
        return m_protocol_version_range;
1,928✔
802
    }
1,928✔
803

804
    ServerProtocol& get_server_protocol() noexcept
805
    {
132,614✔
806
        return m_server_protocol;
132,614✔
807
    }
132,614✔
808

809
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
810
    {
4,226✔
811
        return m_compress_memory_arena;
4,226✔
812
    }
4,226✔
813

814
    MiscBuffers& get_misc_buffers() noexcept
815
    {
46,246✔
816
        return m_misc_buffers;
46,246✔
817
    }
46,246✔
818

819
    int_fast64_t get_current_server_session_ident() const noexcept
820
    {
×
821
        return m_current_server_session_ident;
×
822
    }
×
823

824
    util::ScratchMemory& get_scratch_memory() noexcept
825
    {
×
826
        return m_scratch_memory;
×
827
    }
×
828

829
    Worker& get_worker() noexcept
830
    {
40,172✔
831
        return m_worker;
40,172✔
832
    }
40,172✔
833

834
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
835
    {
×
836
        parallel_section = m_par_time;
×
837
        sequential_section = m_seq_time;
×
838
    }
×
839

840
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
841
    ~ServerImpl() noexcept;
842

843
    void start();
844

845
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
846
    {
672✔
847
        m_config.listen_address = listen_address;
672✔
848
        m_config.listen_port = listen_port;
672✔
849
        m_config.reuse_address = reuse_address;
672✔
850

338✔
851
        start(); // Throws
672✔
852
    }
672✔
853

854
    network::Endpoint listen_endpoint() const
855
    {
7,974✔
856
        return m_acceptor.local_endpoint();
7,974✔
857
    }
7,974✔
858

859
    void run();
860
    void stop() noexcept;
861

862
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
863

864
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
865
    void remove_sync_connection(int_fast64_t connection_id);
866

867
    size_t get_number_of_http_connections()
868
    {
×
869
        return m_http_connections.size();
×
870
    }
×
871

872
    size_t get_number_of_sync_connections()
873
    {
×
874
        return m_sync_connections.size();
×
875
    }
×
876

877
    bool is_sync_stopped()
878
    {
39,964✔
879
        return m_sync_stopped;
39,964✔
880
    }
39,964✔
881

882
    const std::set<std::string>& get_realm_names() const noexcept
883
    {
×
884
        return m_realm_names;
×
885
    }
×
886

887
    // virt_path must be valid when get_or_create_file() is called.
888
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
889
    {
5,636✔
890
        util::bind_ptr<ServerFile> file = get_file(virt_path);
5,636✔
891
        if (REALM_LIKELY(file))
5,636✔
892
            return file;
4,992✔
893

436✔
894
        _impl::VirtualPathComponents virt_path_components =
1,080✔
895
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,080✔
896
        REALM_ASSERT(virt_path_components.is_valid);
1,080✔
897

436✔
898
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,080✔
899
        m_realm_names.insert(virt_path);         // Throws
1,080✔
900
        {
1,080✔
901
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,080✔
902
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,080✔
903
                                      disable_sync_to_disk)); // Throws
1,080✔
904
        }
1,080✔
905

436✔
906
        file->initialize();
1,080✔
907
        m_files[virt_path] = file; // Throws
1,080✔
908
        file->activate();          // Throws
1,080✔
909
        return file;
1,080✔
910
    }
1,080✔
911

912
    std::unique_ptr<ServerHistory> make_history_for_path()
913
    {
×
914
        return std::make_unique<ServerHistory>(*this);
×
915
    }
×
916

917
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
918
    {
5,636✔
919
        auto i = m_files.find(virt_path);
5,636✔
920
        if (REALM_LIKELY(i != m_files.end()))
5,636✔
921
            return i->second;
4,992✔
922
        return {};
1,080✔
923
    }
1,080✔
924

925
    // Returns the number of seconds since the Epoch of
926
    // std::chrono::system_clock.
927
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
928
    {
×
929
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
930
            return m_config.token_expiration_clock->now();
×
931
        return std::chrono::system_clock::now();
×
932
    }
×
933

934
    void set_connection_reaper_timeout(milliseconds_type);
935

936
    void close_connections();
937
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
938

939
    void recognize_external_change(const std::string& virt_path);
940

941
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
942
                                                  milliseconds_type timeout);
943

944
    // Server global outputbuffers that can be reused.
945
    // The server is single threaded, so there are no
946
    // synchronization issues.
947
    // output_buffers_count is equal to the
948
    // maximum number of buffers needed at any point.
949
    static constexpr int output_buffers_count = 1;
950
    OutputBuffer output_buffers[output_buffers_count];
951

952
    bool is_load_balancing_allowed() const
953
    {
×
954
        return m_allow_load_balancing;
×
955
    }
×
956

957
    // inc_byte_size_for_pending_downstream_changesets() must be called by
958
    // ServerFile objects when changesets from downstream clients have been
959
    // received.
960
    //
961
    // dec_byte_size_for_pending_downstream_changesets() must be called by
962
    // ServerFile objects when changesets from downstream clients have been
963
    // processed or discarded.
964
    //
965
    // ServerImpl uses this information to keep a running tally (metric
966
    // `upload.pending.bytes`) of the total byte size of pending changesets from
967
    // downstream clients.
968
    //
969
    // These functions must be called on the network thread.
970
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
971
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
972

973
    // Overriding member functions in _impl::ServerHistory::Context
974
    std::mt19937_64& server_history_get_random() noexcept override final;
975

976
private:
977
    Server::Config m_config;
978
    network::Service m_service;
979
    std::mt19937_64 m_random;
980
    const std::size_t m_max_upload_backlog;
981
    const std::string m_root_dir;
982
    const AccessControl m_access_control;
983
    const ProtocolVersionRange m_protocol_version_range;
984

985
    // The reserved files will be closed in situations where the server
986
    // runs out of file descriptors.
987
    std::unique_ptr<File> m_reserved_files[5];
988

989
    // The set of all Realm files known to this server, represented by their
990
    // virtual path.
991
    //
992
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
993
    // reported by an invocation of _impl::get_realm_names()), then the
994
    // corresponding virtual path is in `m_realm_names`, assuming no external
995
    // file-system level intervention.
996
    std::set<std::string> m_realm_names;
997

998
    std::unique_ptr<network::ssl::Context> m_ssl_context;
999
    ServerFileAccessCache m_file_access_cache;
1000
    Worker m_worker;
1001
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1002
    network::Acceptor m_acceptor;
1003
    std::int_fast64_t m_next_conn_id = 0;
1004
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1005
    network::Endpoint m_next_http_conn_endpoint;
1006
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1007
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1008
    ServerProtocol m_server_protocol;
1009
    compression::CompressMemoryArena m_compress_memory_arena;
1010
    MiscBuffers m_misc_buffers;
1011
    int_fast64_t m_current_server_session_ident;
1012
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1013
    bool m_allow_load_balancing = false;
1014

1015
    util::Mutex m_mutex;
1016

1017
    bool m_stopped = false; // Protected by `m_mutex`
1018

1019
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1020
    // When m_sync_stopped is true, the server does not perform any sync.
1021
    bool m_sync_stopped = false;
1022

1023
    std::atomic<bool> m_running{false}; // Debugging facility
1024

1025
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1026

1027
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1028

1029
    util::ScratchMemory m_scratch_memory;
1030

1031
    void listen();
1032
    void initiate_accept();
1033
    void handle_accept(std::error_code);
1034

1035
    void reap_connections();
1036
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1037
    void do_close_connections();
1038

1039
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1040
    {
7,968✔
1041
        if (config.max_upload_backlog == 0)
7,968✔
1042
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
7,968✔
1043
        return config.max_upload_backlog;
×
1044
    }
×
1045

1046
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1047
    {
7,968✔
1048
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
7,968✔
1049
        const int actual_max = get_current_protocol_version();
7,968✔
1050
        static_assert(actual_min <= actual_max, "");
7,968✔
1051
        int min = actual_min;
7,968✔
1052
        int max = actual_max;
7,968✔
1053
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
7,968!
1054
            if (config.max_protocol_version < min)
×
1055
                throw Server::NoSupportedProtocolVersions();
×
1056
            max = config.max_protocol_version;
×
1057
        }
×
1058
        return {min, max};
7,968✔
1059
    }
7,968✔
1060

1061
    void do_recognize_external_change(const std::string& virt_path);
1062

1063
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1064
                                                     milliseconds_type timeout);
1065
};
1066

1067
// ============================ SyncConnection ============================
1068

1069
class SyncConnection : public websocket::Config {
1070
public:
1071
    const std::shared_ptr<util::Logger> logger_ptr;
1072
    util::Logger& logger;
1073

1074
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1075
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1076
    // Clients with sync protocol version less than 10 do not support log messages
1077
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1078

1079
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1080
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1081
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1082
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1083
        : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(id), serv.logger_ptr)} // Throws
1084
        , logger{*logger_ptr}
1085
        , m_server{serv}
1086
        , m_id{id}
1087
        , m_socket{std::move(socket)}
1088
        , m_ssl_stream{std::move(ssl_stream)}
1089
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
1090
        , m_websocket{*this}
1091
        , m_client_protocol_version{client_protocol_version}
1092
        , m_client_user_agent{std::move(client_user_agent)}
1093
        , m_remote_endpoint{std::move(remote_endpoint)}
1094
        , m_appservices_request_id{std::move(appservices_request_id)}
1095
    {
1,928✔
1096
        // Make the output buffer stream throw std::bad_alloc if it fails to
940✔
1097
        // expand the buffer
940✔
1098
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
1,928✔
1099

940✔
1100
        network::Service& service = m_server.get_service();
1,928✔
1101
        auto handler = [this](Status status) {
96,014✔
1102
            if (!status.is_ok())
96,014✔
1103
                return;
×
1104
            if (!m_is_sending)
96,014✔
1105
                send_next_message(); // Throws
41,882✔
1106
        };
96,014✔
1107
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
1,928✔
1108
    }
1,928✔
1109

1110
    ~SyncConnection() noexcept;
1111

1112
    ServerImpl& get_server() noexcept
1113
    {
113,126✔
1114
        return m_server;
113,126✔
1115
    }
113,126✔
1116

1117
    ServerProtocol& get_server_protocol() noexcept
1118
    {
132,614✔
1119
        return m_server.get_server_protocol();
132,614✔
1120
    }
132,614✔
1121

1122
    int get_client_protocol_version()
1123
    {
101,544✔
1124
        return m_client_protocol_version;
101,544✔
1125
    }
101,544✔
1126

1127
    const std::string& get_client_user_agent() const noexcept
1128
    {
5,638✔
1129
        return m_client_user_agent;
5,638✔
1130
    }
5,638✔
1131

1132
    const std::string& get_remote_endpoint() const noexcept
1133
    {
5,636✔
1134
        return m_remote_endpoint;
5,636✔
1135
    }
5,636✔
1136

1137
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1138
    {
1,928✔
1139
        return logger_ptr;
1,928✔
1140
    }
1,928✔
1141

1142
    std::mt19937_64& websocket_get_random() noexcept final override
1143
    {
63,176✔
1144
        return m_server.get_random();
63,176✔
1145
    }
63,176✔
1146

1147
    bool websocket_binary_message_received(const char* data, size_t size) final override
1148
    {
69,886✔
1149
        using sf = _impl::SimulatedFailure;
69,886✔
1150
        if (sf::check_trigger(sf::sync_server__read_head)) {
69,886✔
1151
            // Suicide
228✔
1152
            read_error(sf::sync_server__read_head);
430✔
1153
            return false;
430✔
1154
        }
430✔
1155
        // After a connection level error has occurred, all incoming messages
34,382✔
1156
        // will be ignored. By continuing to read until end of input, the server
34,382✔
1157
        // is able to know when the client closes the connection, which in
34,382✔
1158
        // general means that is has received the ERROR message.
34,382✔
1159
        if (REALM_LIKELY(!m_is_closing)) {
69,456✔
1160
            m_last_activity_at = steady_clock_now();
69,422✔
1161
            handle_message_received(data, size);
69,422✔
1162
        }
69,422✔
1163
        return true;
69,456✔
1164
    }
69,456✔
1165

1166
    bool websocket_ping_message_received(const char* data, size_t size) final override
1167
    {
×
1168
        if (REALM_LIKELY(!m_is_closing)) {
×
1169
            m_last_activity_at = steady_clock_now();
×
1170
            handle_ping_received(data, size);
×
1171
        }
×
1172
        return true;
×
1173
    }
×
1174

1175
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1176
    {
63,182✔
1177
        if (m_ssl_stream) {
63,182✔
1178
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1179
        }
70✔
1180
        else {
63,112✔
1181
            m_socket->async_write(data, size, std::move(handler)); // Throws
63,112✔
1182
        }
63,112✔
1183
    }
63,182✔
1184

1185
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1186
    {
211,140✔
1187
        if (m_ssl_stream) {
211,140✔
1188
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
164✔
1189
        }
164✔
1190
        else {
210,976✔
1191
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
210,976✔
1192
        }
210,976✔
1193
    }
211,140✔
1194

1195
    void async_read_until(char* buffer, size_t size, char delim,
1196
                          websocket::ReadCompletionHandler handler) final override
1197
    {
×
1198
        if (m_ssl_stream) {
×
1199
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1200
                                           std::move(handler)); // Throws
×
1201
        }
×
1202
        else {
×
1203
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1204
                                       std::move(handler)); // Throws
×
1205
        }
×
1206
    }
×
1207

1208
    void websocket_read_error_handler(std::error_code ec) final override
1209
    {
692✔
1210
        read_error(ec);
692✔
1211
    }
692✔
1212

1213
    void websocket_write_error_handler(std::error_code ec) final override
1214
    {
×
1215
        write_error(ec);
×
1216
    }
×
1217

1218
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
1219
                                           const std::string_view*) final override
1220
    {
×
1221
        // WebSocket class has already logged a message for this error
1222
        close_due_to_error(ec); // Throws
×
1223
    }
×
1224

1225
    void websocket_protocol_error_handler(std::error_code ec) final override
1226
    {
×
1227
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1228
        close_due_to_error(ec);                                              // Throws
×
1229
    }
×
1230

1231
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1232
    {
×
1233
        // This is not called since we handle HTTP request in handle_request_for_sync()
1234
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
1235
    }
×
1236

1237
    int_fast64_t get_id() const noexcept
1238
    {
×
1239
        return m_id;
×
1240
    }
×
1241

1242
    network::Socket& get_socket() noexcept
1243
    {
×
1244
        return *m_socket;
×
1245
    }
×
1246

1247
    void initiate();
1248

1249
    // Commits suicide
1250
    template <class... Params>
1251
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1252

1253
    // Commits suicide
1254
    void terminate_if_dead(SteadyTimePoint now);
1255

1256
    void enlist_to_send(Session*) noexcept;
1257

1258
    // Sessions should get the output_buffer and insert a message, after which
1259
    // they call initiate_write_output_buffer().
1260
    OutputBuffer& get_output_buffer()
1261
    {
63,176✔
1262
        m_output_buffer.reset();
63,176✔
1263
        return m_output_buffer;
63,176✔
1264
    }
63,176✔
1265

1266
    // More advanced memory strategies can be implemented if needed.
1267
    void release_output_buffer() {}
62,988✔
1268

1269
    // When this function is called, the connection will initiate a write with
1270
    // its output_buffer. Sessions use this method.
1271
    void initiate_write_output_buffer();
1272

1273
    void initiate_pong_output_buffer();
1274

1275
    void handle_protocol_error(Status status);
1276

1277
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1278
                              bool need_client_file_ident, bool is_subserver);
1279

1280
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1281
                               salt_type client_file_ident_salt, version_type scan_server_version,
1282
                               version_type scan_client_version, version_type latest_server_version,
1283
                               salt_type latest_server_version_salt);
1284

1285
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1286
                                version_type progress_server_version, version_type locked_server_version,
1287
                                const UploadChangesets&);
1288

1289
    void receive_mark_message(session_ident_type, request_ident_type);
1290

1291
    void receive_unbind_message(session_ident_type);
1292

1293
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1294

1295
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1296

1297
    void protocol_error(ProtocolError, Session* = nullptr);
1298

1299
    void initiate_soft_close();
1300

1301
    void discard_session(session_ident_type) noexcept;
1302

1303
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1304
                          std::optional<std::string> co_id = std::nullopt);
1305

1306
private:
1307
    ServerImpl& m_server;
1308
    const int_fast64_t m_id;
1309
    std::unique_ptr<network::Socket> m_socket;
1310
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1311
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1312

1313
    websocket::Socket m_websocket;
1314
    std::unique_ptr<char[]> m_input_body_buffer;
1315
    OutputBuffer m_output_buffer;
1316
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1317

1318
    // The protocol version in use by the connected client.
1319
    const int m_client_protocol_version;
1320

1321
    // The user agent description passed by the client.
1322
    const std::string m_client_user_agent;
1323

1324
    const std::string m_remote_endpoint;
1325

1326
    const std::string m_appservices_request_id;
1327

1328
    // A queue of sessions that have enlisted for an opportunity to send a
1329
    // message. Sessions will be served in the order that they enlist. A session
1330
    // can only occur once in this queue (linked list). If the queue is not
1331
    // empty, and no message is currently being written to the socket, the first
1332
    // session is taken out of the queue, and then granted an opportunity to
1333
    // send a message.
1334
    //
1335
    // Sessions will never be destroyed while in this queue. This is ensured
1336
    // because the connection owns the sessions that are associated with it, and
1337
    // the connection only removes a session from m_sessions at points in time
1338
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1339
    // (Connection::send_next_message() and Connection::~Connection()).
1340
    SessionQueue m_sessions_enlisted_to_send;
1341

1342
    Session* m_receiving_session = nullptr;
1343

1344
    bool m_is_sending = false;
1345
    bool m_is_closing = false;
1346

1347
    bool m_send_pong = false;
1348
    bool m_sending_pong = false;
1349

1350
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1351

1352
    milliseconds_type m_last_ping_timestamp = 0;
1353

1354
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1355
    // set to true (initiation of soft close). Otherwise, if no messages have
1356
    // been received from the client, this is the time at which the connection
1357
    // object was initiated (completion of WebSocket handshake). Otherwise this
1358
    // is the time at which the last message was received from the client.
1359
    SteadyTimePoint m_last_activity_at;
1360

1361
    // These are initialized by do_initiate_soft_close().
1362
    //
1363
    // With recent versions of the protocol (when the version is greater than,
1364
    // or equal to 23), `m_error_session_ident` is always zero.
1365
    ProtocolError m_error_code = {};
1366
    session_ident_type m_error_session_ident = 0;
1367

1368
    struct LogMessage {
1369
        session_ident_type sess_ident;
1370
        util::Logger::Level level;
1371
        std::string message;
1372
        std::optional<std::string> co_id;
1373
    };
1374

1375
    std::mutex m_log_mutex;
1376
    std::queue<LogMessage> m_log_messages;
1377

1378
    static std::string make_logger_prefix(int_fast64_t id)
1379
    {
1,928✔
1380
        std::ostringstream out;
1,928✔
1381
        out.imbue(std::locale::classic());
1,928✔
1382
        out << "Sync Connection[" << id << "]: "; // Throws
1,928✔
1383
        return out.str();                         // Throws
1,928✔
1384
    }
1,928✔
1385

1386
    // The return value of handle_message_received() designates whether
1387
    // message processing should continue. If the connection object is
1388
    // destroyed during execution of handle_message_received(), the return
1389
    // value must be false.
1390
    void handle_message_received(const char* data, size_t size);
1391

1392
    void handle_ping_received(const char* data, size_t size);
1393

1394
    void send_next_message();
1395
    void send_pong(milliseconds_type timestamp);
1396
    void send_log_message(const LogMessage& log_msg);
1397

1398
    void handle_write_output_buffer();
1399
    void handle_pong_output_buffer();
1400

1401
    void initiate_write_error(ProtocolError, session_ident_type);
1402
    void handle_write_error(std::error_code ec);
1403

1404
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1405
    void read_error(std::error_code);
1406
    void write_error(std::error_code);
1407

1408
    void close_due_to_close_by_client(std::error_code);
1409
    void close_due_to_error(std::error_code);
1410

1411
    void terminate_sessions();
1412

1413
    void bad_session_ident(const char* message_type, session_ident_type);
1414
    void message_after_unbind(const char* message_type, session_ident_type);
1415
    void message_before_ident(const char* message_type, session_ident_type);
1416
};
1417

1418

1419
inline void SyncConnection::read_error(std::error_code ec)
1420
{
1,122✔
1421
    REALM_ASSERT(ec != util::error::operation_aborted);
1,122✔
1422
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,122✔
1423
        // Suicide
360✔
1424
        close_due_to_close_by_client(ec); // Throws
692✔
1425
        return;
692✔
1426
    }
692✔
1427
    if (ec == util::MiscExtErrors::delim_not_found) {
430✔
1428
        logger.error("Input message head delimited not found"); // Throws
×
1429
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1430
        return;
×
1431
    }
×
1432

228✔
1433
    logger.error("Reading failed: %1", ec.message()); // Throws
430✔
1434

228✔
1435
    // Suicide
228✔
1436
    close_due_to_error(ec); // Throws
430✔
1437
}
430✔
1438

1439
inline void SyncConnection::write_error(std::error_code ec)
1440
{
×
1441
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1442
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1443
        // Suicide
1444
        close_due_to_close_by_client(ec); // Throws
×
1445
        return;
×
1446
    }
×
1447
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1448

1449
    // Suicide
1450
    close_due_to_error(ec); // Throws
×
1451
}
×
1452

1453

1454
// ============================ HTTPConnection ============================
1455

1456
std::string g_user_agent = "User-Agent";
1457

1458
class HTTPConnection {
1459
public:
1460
    const std::shared_ptr<Logger> logger_ptr;
1461
    util::Logger& logger;
1462

1463
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1464
        : logger_ptr{std::make_shared<PrefixLogger>(make_logger_prefix(id), serv.logger_ptr)} // Throws
1465
        , logger{*logger_ptr}
1466
        , m_server{serv}
1467
        , m_id{id}
1468
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1469
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1470
        , m_http_server{*this, logger_ptr}
1471
    {
9,926✔
1472
        // Make the output buffer stream throw std::bad_alloc if it fails to
4,888✔
1473
        // expand the buffer
4,888✔
1474
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
9,926✔
1475

4,888✔
1476
        if (is_ssl) {
9,926✔
1477
            using namespace network::ssl;
48✔
1478
            Context& ssl_context = serv.get_ssl_context();
48✔
1479
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1480
                                                    Stream::server); // Throws
48✔
1481
        }
48✔
1482
    }
9,926✔
1483

1484
    ServerImpl& get_server() noexcept
1485
    {
×
1486
        return m_server;
×
1487
    }
×
1488

1489
    int_fast64_t get_id() const noexcept
1490
    {
1,958✔
1491
        return m_id;
1,958✔
1492
    }
1,958✔
1493

1494
    network::Socket& get_socket() noexcept
1495
    {
11,568✔
1496
        return *m_socket;
11,568✔
1497
    }
11,568✔
1498

1499
    template <class H>
1500
    void async_write(const char* data, size_t size, H handler)
1501
    {
1,948✔
1502
        if (m_ssl_stream) {
1,948✔
1503
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1504
        }
14✔
1505
        else {
1,934✔
1506
            m_socket->async_write(data, size, std::move(handler)); // Throws
1,934✔
1507
        }
1,934✔
1508
    }
1,948✔
1509

1510
    template <class H>
1511
    void async_read(char* buffer, size_t size, H handler)
1512
    {
8✔
1513
        if (m_ssl_stream) {
8✔
1514
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1515
                                     std::move(handler)); // Throws
×
1516
        }
×
1517
        else {
8✔
1518
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1519
                                 std::move(handler)); // Throws
8✔
1520
        }
8✔
1521
    }
8✔
1522

1523
    template <class H>
1524
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1525
    {
17,412✔
1526
        if (m_ssl_stream) {
17,412✔
1527
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1528
                                           std::move(handler)); // Throws
126✔
1529
        }
126✔
1530
        else {
17,286✔
1531
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
17,286✔
1532
                                       std::move(handler)); // Throws
17,286✔
1533
        }
17,286✔
1534
    }
17,412✔
1535

1536
    void initiate(std::string remote_endpoint)
1537
    {
1,958✔
1538
        m_last_activity_at = steady_clock_now();
1,958✔
1539
        m_remote_endpoint = std::move(remote_endpoint);
1,958✔
1540

956✔
1541
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
1,958✔
1542

956✔
1543
        if (m_ssl_stream) {
1,958✔
1544
            initiate_ssl_handshake(); // Throws
24✔
1545
        }
24✔
1546
        else {
1,934✔
1547
            initiate_http(); // Throws
1,934✔
1548
        }
1,934✔
1549
    }
1,958✔
1550

1551
    void respond_200_ok()
1552
    {
×
1553
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1554
    }
×
1555

1556
    void respond_404_not_found()
1557
    {
×
1558
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1559
    }
×
1560

1561
    void respond_503_service_unavailable()
1562
    {
×
1563
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1564
    }
×
1565

1566
    // Commits suicide
1567
    template <class... Params>
1568
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1569
    {
30✔
1570
        logger.log(log_level, log_message, log_params...); // Throws
30✔
1571
        m_ssl_stream.reset();
30✔
1572
        m_socket.reset();
30✔
1573
        m_server.remove_http_connection(m_id); // Suicide
30✔
1574
    }
30✔
1575

1576
    // Commits suicide
1577
    void terminate_if_dead(SteadyTimePoint now)
1578
    {
2✔
1579
        milliseconds_type time = steady_duration(m_last_activity_at, now);
2✔
1580
        const Server::Config& config = m_server.get_config();
2✔
1581
        if (m_is_sending) {
2✔
UNCOV
1582
            if (time >= config.http_response_timeout) {
×
1583
                // Suicide
1584
                terminate(Logger::Level::detail,
×
1585
                          "HTTP connection closed (request timeout)"); // Throws
×
1586
            }
×
UNCOV
1587
        }
×
1588
        else {
2✔
1589
            if (time >= config.http_request_timeout) {
2✔
1590
                // Suicide
1591
                terminate(Logger::Level::detail,
×
1592
                          "HTTP connection closed (response timeout)"); // Throws
×
1593
            }
×
1594
        }
2✔
1595
    }
2✔
1596

1597
    std::string get_appservices_request_id() const
1598
    {
1,948✔
1599
        return m_appservices_request_id.to_string();
1,948✔
1600
    }
1,948✔
1601

1602
private:
1603
    ServerImpl& m_server;
1604
    const int_fast64_t m_id;
1605
    const ObjectId m_appservices_request_id = ObjectId::gen();
1606
    std::unique_ptr<network::Socket> m_socket;
1607
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1608
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1609
    HTTPServer<HTTPConnection> m_http_server;
1610
    OutputBuffer m_output_buffer;
1611
    bool m_is_sending = false;
1612
    SteadyTimePoint m_last_activity_at;
1613
    std::string m_remote_endpoint;
1614
    int m_negotiated_protocol_version = 0;
1615

1616
    void initiate_ssl_handshake()
1617
    {
24✔
1618
        auto handler = [this](std::error_code ec) {
24✔
1619
            if (ec != util::error::operation_aborted)
24✔
1620
                handle_ssl_handshake(ec); // Throws
24✔
1621
        };
24✔
1622
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1623
    }
24✔
1624

1625
    void handle_ssl_handshake(std::error_code ec)
1626
    {
24✔
1627
        if (ec) {
24✔
1628
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
10✔
1629
            close_due_to_error(ec);                                         // Throws
10✔
1630
            return;
10✔
1631
        }
10✔
1632
        initiate_http(); // Throws
14✔
1633
    }
14✔
1634

1635
    void initiate_http()
1636
    {
1,948✔
1637
        logger.debug("Connection initiates HTTP receipt");
1,948✔
1638

950✔
1639
        auto handler = [this](HTTPRequest request, std::error_code ec) {
1,948✔
1640
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
1,948✔
1641
                return;
950✔
1642
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
1,948✔
1643
                logger.error("Malformed HTTP request");
×
1644
                close_due_to_error(ec); // Throws
×
1645
                return;
×
1646
            }
×
1647
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
1,948✔
1648
                logger.error("Bad HTTP request");
8✔
1649
                const char* body = "The HTTP request was corrupted";
8✔
1650
                handle_400_bad_request(body); // Throws
8✔
1651
                return;
8✔
1652
            }
8✔
1653
            if (REALM_UNLIKELY(ec)) {
1,940✔
1654
                read_error(ec); // Throws
×
1655
                return;
×
1656
            }
×
1657
            handle_http_request(std::move(request)); // Throws
1,940✔
1658
        };
1,940✔
1659
        m_http_server.async_receive_request(std::move(handler)); // Throws
1,948✔
1660
    }
1,948✔
1661

1662
    void handle_http_request(const HTTPRequest& request)
1663
    {
1,940✔
1664
        StringData path = request.path;
1,940✔
1665

946✔
1666
        logger.debug("HTTP request received, request = %1", request);
1,940✔
1667

946✔
1668
        m_is_sending = true;
1,940✔
1669
        m_last_activity_at = steady_clock_now();
1,940✔
1670

946✔
1671
        // FIXME: When thinking of this function as a switching device, it seem
946✔
1672
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
946✔
1673
        // supposed to be mandatory, then that check ought to be delegated to
946✔
1674
        // handle_request_for_sync(), as that will yield a sharper separation of
946✔
1675
        // concerns.
946✔
1676
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
1,940✔
1677
            handle_request_for_sync(request); // Throws
1,928✔
1678
        }
1,928✔
1679
        else {
12✔
1680
            handle_404_not_found(request); // Throws
12✔
1681
        }
12✔
1682
    }
1,940✔
1683

1684
    void handle_request_for_sync(const HTTPRequest& request)
1685
    {
1,928✔
1686
        if (m_server.is_sync_stopped()) {
1,928✔
1687
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1688
                         "stopped"); // Throws
×
1689
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1690
                                                    "connections"); // Throws
×
1691
            return;
×
1692
        }
×
1693

940✔
1694
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
1,928✔
1695

940✔
1696
        // Figure out whether there are any protocol versions supported by both
940✔
1697
        // the client and the server, and if so, choose the newest one of them.
940✔
1698
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
1,928✔
1699
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
1,928✔
1700
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
1,928✔
1701
        {
1,928✔
1702
            protocol_version_ranges.clear();
1,928✔
1703
            util::MemoryInputStream in;
1,928✔
1704
            in.imbue(std::locale::classic());
1,928✔
1705
            in.unsetf(std::ios_base::skipws);
1,928✔
1706
            std::string_view value;
1,928✔
1707
            if (sec_websocket_protocol)
1,928✔
1708
                value = *sec_websocket_protocol;
1,928✔
1709
            HttpListHeaderValueParser parser{value};
1,928✔
1710
            std::string_view elem;
1,928✔
1711
            while (parser.next(elem)) {
19,280✔
1712
                // FIXME: Use std::string_view::begins_with() in C++20.
8,460✔
1713
                const StringData protocol{elem};
17,352✔
1714
                std::string_view prefix;
17,352✔
1715
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
17,352✔
1716
                    prefix = get_pbs_websocket_protocol_prefix();
17,352✔
1717
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
×
1718
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1719
                if (!prefix.empty()) {
17,352✔
1720
                    auto parse_version = [&](std::string_view str) {
17,352✔
1721
                        in.set_buffer(str.data(), str.data() + str.size());
17,352✔
1722
                        int version = 0;
17,352✔
1723
                        in >> version;
17,352✔
1724
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
17,352✔
1725
                            return version;
17,352✔
1726
                        return -1;
×
1727
                    };
×
1728
                    int min, max;
17,352✔
1729
                    std::string_view range = elem.substr(prefix.size());
17,352✔
1730
                    auto i = range.find('-');
17,352✔
1731
                    if (i != std::string_view::npos) {
17,352✔
1732
                        min = parse_version(range.substr(0, i));
×
1733
                        max = parse_version(range.substr(i + 1));
×
1734
                    }
×
1735
                    else {
17,352✔
1736
                        min = parse_version(range);
17,352✔
1737
                        max = min;
17,352✔
1738
                    }
17,352✔
1739
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
17,352✔
1740
                        protocol_version_ranges.emplace_back(min, max); // Throws
17,352✔
1741
                        continue;
17,352✔
1742
                    }
17,352✔
1743
                    logger.error("Protocol version negotiation failed: Client sent malformed "
×
1744
                                 "specification of supported protocol versions: '%1'",
×
1745
                                 elem); // Throws
×
1746
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
×
1747
                                           "specification of supported protocol "
×
1748
                                           "versions\n"); // Throws
×
1749
                    return;
×
1750
                }
×
1751
                logger.warn("Unrecognized protocol token in HTTP response header "
×
1752
                            "Sec-WebSocket-Protocol: '%1'",
×
1753
                            elem); // Throws
×
1754
            }
×
1755
            if (protocol_version_ranges.empty()) {
1,928✔
1756
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1757
                             "specification of supported protocol versions"); // Throws
×
1758
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1759
                                       "of supported protocol versions\n"); // Throws
×
1760
                return;
×
1761
            }
×
1762
        }
1,928✔
1763
        {
1,928✔
1764
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
1,928✔
1765
            int server_min = server_range.first;
1,928✔
1766
            int server_max = server_range.second;
1,928✔
1767
            int best_match = 0;
1,928✔
1768
            int overall_client_min = std::numeric_limits<int>::max();
1,928✔
1769
            int overall_client_max = std::numeric_limits<int>::min();
1,928✔
1770
            for (const auto& range : protocol_version_ranges) {
17,352✔
1771
                int client_min = range.first;
17,352✔
1772
                int client_max = range.second;
17,352✔
1773
                if (client_max >= server_min && client_min <= server_max) {
17,352✔
1774
                    // Overlap
8,460✔
1775
                    int version = std::min(client_max, server_max);
17,352✔
1776
                    if (version > best_match) {
17,352✔
1777
                        best_match = version;
1,928✔
1778
                    }
1,928✔
1779
                }
17,352✔
1780
                if (client_min < overall_client_min)
17,352✔
1781
                    overall_client_min = client_min;
17,352✔
1782
                if (client_max > overall_client_max)
17,352✔
1783
                    overall_client_max = client_max;
1,928✔
1784
            }
17,352✔
1785
            Formatter& formatter = misc_buffers.formatter;
1,928✔
1786
            if (REALM_UNLIKELY(best_match == 0)) {
1,928✔
1787
                const char* elaboration = "No version supported by both client and server";
×
1788
                const char* identifier_hint = nullptr;
×
1789
                if (overall_client_max < server_min) {
×
1790
                    // Client is too old
1791
                    elaboration = "Client is too old for server";
×
1792
                    identifier_hint = "CLIENT_TOO_OLD";
×
1793
                }
×
1794
                else if (overall_client_min > server_max) {
×
1795
                    // Client is too new
1796
                    elaboration = "Client is too new for server";
×
1797
                    identifier_hint = "CLIENT_TOO_NEW";
×
1798
                }
×
1799
                auto format_ranges = [&](const auto& list) {
×
1800
                    bool nonfirst = false;
×
1801
                    for (auto range : list) {
×
1802
                        if (nonfirst)
×
1803
                            formatter << ", "; // Throws
×
1804
                        int min = range.first, max = range.second;
×
1805
                        REALM_ASSERT(min <= max);
×
1806
                        formatter << min;
×
1807
                        if (max != min)
×
1808
                            formatter << "-" << max;
×
1809
                        nonfirst = true;
×
1810
                    }
×
1811
                };
×
1812
                using Range = ProtocolVersionRange;
×
1813
                formatter.reset();
×
1814
                format_ranges(protocol_version_ranges); // Throws
×
1815
                logger.error("Protocol version negotiation failed: %1 "
×
1816
                             "(client supports: %2)",
×
1817
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1818
                formatter.reset();
×
1819
                formatter << "Protocol version negotiation failed: "
×
1820
                             ""
×
1821
                          << elaboration << ".\n\n";                                   // Throws
×
1822
                formatter << "Server supports: ";                                      // Throws
×
1823
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1824
                formatter << "\n";                                                     // Throws
×
1825
                formatter << "Client supports: ";                                      // Throws
×
1826
                format_ranges(protocol_version_ranges);                                // Throws
×
1827
                formatter << "\n\n";                                                   // Throws
×
1828
                formatter << "REALM_SYNC_PROTOCOL_MISMATCH";                           // Throws
×
1829
                if (identifier_hint)
×
1830
                    formatter << ":" << identifier_hint;                      // Throws
×
1831
                formatter << "\n";                                            // Throws
×
1832
                handle_400_bad_request({formatter.data(), formatter.size()}); // Throws
×
1833
                return;
×
1834
            }
×
1835
            m_negotiated_protocol_version = best_match;
1,928✔
1836
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
1,928✔
1837
                         m_negotiated_protocol_version); // Throws
1,928✔
1838
            formatter.reset();
1,928✔
1839
        }
1,928✔
1840

940✔
1841
        std::string sec_websocket_protocol_2;
1,928✔
1842
        {
1,928✔
1843
            std::string_view prefix =
1,928✔
1844
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
1,928✔
1845
                    ? get_old_pbs_websocket_protocol_prefix()
940✔
1846
                    : get_pbs_websocket_protocol_prefix();
1,928✔
1847
            std::ostringstream out;
1,928✔
1848
            out.imbue(std::locale::classic());
1,928✔
1849
            out << prefix << m_negotiated_protocol_version; // Throws
1,928✔
1850
            sec_websocket_protocol_2 = std::move(out).str();
1,928✔
1851
        }
1,928✔
1852

940✔
1853
        std::error_code ec;
1,928✔
1854
        util::Optional<HTTPResponse> response =
1,928✔
1855
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
1,928✔
1856

940✔
1857
        if (ec) {
1,928✔
1858
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1859
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1860
            }
×
1861
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1862
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1863
            }
×
1864
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1865
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1866
            }
×
1867
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1868
                logger.error("The header Sec-WebSocket-Key is missing");
×
1869
            }
×
1870

1871
            logger.error("The HTTP request with the error is:\n%1", request);
×
1872
            logger.error("Check the proxy configuration and make sure that the "
×
1873
                         "HTTP request is a valid Websocket request.");
×
1874
            close_due_to_error(ec);
×
1875
            return;
×
1876
        }
×
1877
        REALM_ASSERT(response);
1,928✔
1878
        add_common_http_response_headers(*response);
1,928✔
1879

940✔
1880
        std::string user_agent;
1,928✔
1881
        {
1,928✔
1882
            auto i = request.headers.find(g_user_agent);
1,928✔
1883
            if (i != request.headers.end())
1,928✔
1884
                user_agent = i->second; // Throws (copy)
1,928✔
1885
        }
1,928✔
1886

940✔
1887
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
1,928✔
1888
                        this](std::error_code ec) {
1,928✔
1889
            // If the operation is aborted, the socket object may have been destroyed.
940✔
1890
            if (ec != util::error::operation_aborted) {
1,928✔
1891
                if (ec) {
1,928✔
1892
                    write_error(ec);
×
1893
                    return;
×
1894
                }
×
1895

940✔
1896
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
1,928✔
1897
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
1,928✔
1898
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
1,928✔
1899
                    get_appservices_request_id()); // Throws
1,928✔
1900
                SyncConnection& sync_conn_ref = *sync_conn;
1,928✔
1901
                m_server.add_sync_connection(m_id, std::move(sync_conn));
1,928✔
1902
                m_server.remove_http_connection(m_id);
1,928✔
1903
                sync_conn_ref.initiate();
1,928✔
1904
            }
1,928✔
1905
        };
1,928✔
1906
        m_http_server.async_send_response(*response, std::move(handler));
1,928✔
1907
    }
1,928✔
1908

1909
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1910
    {
20✔
1911
        std::string body_2 = std::string(body); // Throws
20✔
1912

10✔
1913
        HTTPResponse response;
20✔
1914
        response.status = http_status;
20✔
1915
        add_common_http_response_headers(response);
20✔
1916
        response.headers["Connection"] = "close";
20✔
1917

10✔
1918
        if (!body_2.empty()) {
20✔
1919
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1920
            response.body = std::move(body_2);
20✔
1921
        }
20✔
1922

10✔
1923
        auto handler = [this](std::error_code ec) {
20✔
1924
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1925
                return;
10✔
1926
            if (REALM_UNLIKELY(ec)) {
20✔
1927
                write_error(ec);
×
1928
                return;
×
1929
            }
×
1930
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1931
        };
20✔
1932
        m_http_server.async_send_response(response, std::move(handler));
20✔
1933
    }
20✔
1934

1935
    void handle_400_bad_request(std::string_view body)
1936
    {
8✔
1937
        logger.detail("400 Bad Request");
8✔
1938
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1939
    }
8✔
1940

1941
    void handle_404_not_found(const HTTPRequest&)
1942
    {
12✔
1943
        logger.detail("404 Not Found"); // Throws
12✔
1944
        handle_text_response(HTTPStatus::NotFound,
12✔
1945
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1946
    }
12✔
1947

1948
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1949
    {
×
1950
        logger.debug("503 Service Unavailable");                       // Throws
×
1951
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1952
    }
×
1953

1954
    void add_common_http_response_headers(HTTPResponse& response)
1955
    {
1,948✔
1956
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
1,948✔
1957
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
1,948✔
1958
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
10✔
1959
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1960
        }
20✔
1961
    }
1,948✔
1962

1963
    void read_error(std::error_code ec)
1964
    {
×
1965
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1966
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
×
1967
            // Suicide
1968
            close_due_to_close_by_client(ec); // Throws
×
1969
            return;
×
1970
        }
×
1971
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1972
            logger.error("Input message head delimited not found"); // Throws
×
1973
            close_due_to_error(ec);                                 // Throws
×
1974
            return;
×
1975
        }
×
1976

1977
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1978

1979
        // Suicide
1980
        close_due_to_error(ec); // Throws
×
1981
    }
×
1982

1983
    void write_error(std::error_code ec)
1984
    {
×
1985
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1986
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1987
            // Suicide
1988
            close_due_to_close_by_client(ec); // Throws
×
1989
            return;
×
1990
        }
×
1991
        logger.error("Writing failed: %1", ec.message()); // Throws
×
1992

1993
        // Suicide
1994
        close_due_to_error(ec); // Throws
×
1995
    }
×
1996

1997
    void close_due_to_close_by_client(std::error_code ec)
1998
    {
×
1999
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
×
2000
        // Suicide
2001
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
×
2002
    }
×
2003

2004
    void close_due_to_error(std::error_code ec)
2005
    {
10✔
2006
        // Suicide
6✔
2007
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
10✔
2008
                  ec.message()); // Throws
10✔
2009
    }
10✔
2010

2011
    static std::string make_logger_prefix(int_fast64_t id)
2012
    {
9,924✔
2013
        std::ostringstream out;
9,924✔
2014
        out.imbue(std::locale::classic());
9,924✔
2015
        out << "HTTP Connection[" << id << "]: "; // Throws
9,924✔
2016
        return out.str();                         // Throws
9,924✔
2017
    }
9,924✔
2018
};
2019

2020

2021
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2022
public:
2023
    std::size_t num_changesets = 0;
2024
    std::size_t accum_original_size = 0;
2025
    std::size_t accum_compacted_size = 0;
2026

2027
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2028
        : m_protocol{protocol}
2029
        , m_buffer{buffer}
2030
        , m_logger{logger}
2031
    {
40,092✔
2032
    }
40,092✔
2033

2034
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2035
    {
42,170✔
2036
        version_type client_version = entry.remote_version;
42,170✔
2037
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
42,170✔
2038
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
42,170✔
2039
        ++num_changesets;
42,170✔
2040
        accum_original_size += original_size;
42,170✔
2041
        accum_compacted_size += entry.changeset.size();
42,170✔
2042
    }
42,170✔
2043

2044
private:
2045
    ServerProtocol& m_protocol;
2046
    OutputBuffer& m_buffer;
2047
    util::Logger& m_logger;
2048
};
2049

2050

2051
// ============================ Session ============================
2052

2053
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2054
//   Protocol             ent file    IDENT    message   message   Error     message
2055
//   state                identifier  message  received  received  occurred  sent
2056
// ---------------------------------------------------------------------------------
2057
//   AllocatingIdent      yes         yes      no        no        no        no
2058
//   SendIdent            no          yes      no        no        no        no
2059
//   WaitForIdent         no          no       no        no        no        no
2060
//   WaitForUnbind        maybe       no       yes       no        no        no
2061
//   SendError            maybe       maybe    maybe     no        yes       no
2062
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2063
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2064
//
2065
//
2066
//   Condition                      Expression
2067
// ----------------------------------------------------------
2068
//   Need client file identifier    need_client_file_ident()
2069
//   Send IDENT message             must_send_ident_message()
2070
//   IDENT message received         ident_message_received()
2071
//   UNBIND message received        unbind_message_received()
2072
//   Error occurred                 error_occurred()
2073
//   ERROR message sent             m_error_message_sent
2074
//
2075
//
2076
//   Protocol
2077
//   state                Will send              Can receive
2078
// -----------------------------------------------------------------------
2079
//   AllocatingIdent      none                   UNBIND
2080
//   SendIdent            IDENT                  UNBIND
2081
//   WaitForIdent         none                   IDENT, UNBIND
2082
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2083
//                        MARK, ALLOC            ALLOC, UNBIND
2084
//   SendError            ERROR                  any
2085
//   WaitForUnbindErr     none                   any
2086
//   SendUnbound          UNBOUND                none
2087
//
2088
class Session final : private FileIdentReceiver {
2089
public:
2090
    util::PrefixLogger logger;
2091

2092
    Session(SyncConnection& conn, session_ident_type session_ident)
2093
        : logger{make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2094
        , m_connection{conn}
2095
        , m_session_ident{session_ident}
2096
    {
5,666✔
2097
    }
5,666✔
2098

2099
    ~Session() noexcept
2100
    {
5,666✔
2101
        REALM_ASSERT(!is_enlisted_to_send());
5,666✔
2102
        detach_from_server_file();
5,666✔
2103
    }
5,666✔
2104

2105
    SyncConnection& get_connection() noexcept
2106
    {
40,120✔
2107
        return m_connection;
40,120✔
2108
    }
40,120✔
2109

2110
    const Optional<std::array<char, 64>>& get_encryption_key()
2111
    {
×
2112
        return m_connection.get_server().get_config().encryption_key;
×
2113
    }
×
2114

2115
    session_ident_type get_session_ident() const noexcept
2116
    {
176✔
2117
        return m_session_ident;
176✔
2118
    }
176✔
2119

2120
    ServerProtocol& get_server_protocol() noexcept
2121
    {
56,830✔
2122
        return m_connection.get_server_protocol();
56,830✔
2123
    }
56,830✔
2124

2125
    bool need_client_file_ident() const noexcept
2126
    {
6,988✔
2127
        return (m_file_ident_request != 0);
6,988✔
2128
    }
6,988✔
2129

2130
    bool must_send_ident_message() const noexcept
2131
    {
4,332✔
2132
        return m_send_ident_message;
4,332✔
2133
    }
4,332✔
2134

2135
    bool ident_message_received() const noexcept
2136
    {
337,050✔
2137
        return m_client_file_ident != 0;
337,050✔
2138
    }
337,050✔
2139

2140
    bool unbind_message_received() const noexcept
2141
    {
340,634✔
2142
        return m_unbind_message_received;
340,634✔
2143
    }
340,634✔
2144

2145
    bool error_occurred() const noexcept
2146
    {
331,724✔
2147
        return int(m_error_code) != 0;
331,724✔
2148
    }
331,724✔
2149

2150
    bool relayed_alloc_request_in_progress() const noexcept
2151
    {
×
2152
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2153
    }
×
2154

2155
    // Returns the file identifier (always a nonzero value) of the client side
2156
    // file if ident_message_received() returns true. Otherwise it returns zero.
2157
    file_ident_type get_client_file_ident() const noexcept
2158
    {
×
2159
        return m_client_file_ident;
×
2160
    }
×
2161

2162
    void initiate()
2163
    {
5,666✔
2164
        logger.detail("Session initiated", m_session_ident); // Throws
5,666✔
2165
    }
5,666✔
2166

2167
    void terminate()
2168
    {
4,746✔
2169
        logger.detail("Session terminated", m_session_ident); // Throws
4,746✔
2170
    }
4,746✔
2171

2172
    // Initiate the deactivation process, if it has not been initiated already
2173
    // by the client.
2174
    //
2175
    // IMPORTANT: This function must not be called with protocol versions
2176
    // earlier than 23.
2177
    //
2178
    // The deactivation process will eventually lead to termination of the
2179
    // session.
2180
    //
2181
    // The session will detach itself from the server file when the deactivation
2182
    // process is initiated, regardless of whether it is initiated by the
2183
    // client, or by calling this function.
2184
    void initiate_deactivation(ProtocolError error_code)
2185
    {
88✔
2186
        REALM_ASSERT(is_session_level_error(error_code));
88✔
2187
        REALM_ASSERT(!error_occurred()); // Must only be called once
88✔
2188

44✔
2189
        // If the UNBIND message has been received, then the client has
44✔
2190
        // initiated the deactivation process already.
44✔
2191
        if (REALM_LIKELY(!unbind_message_received())) {
88✔
2192
            detach_from_server_file();
88✔
2193
            m_error_code = error_code;
88✔
2194
            // Protocol state is now SendError
44✔
2195
            ensure_enlisted_to_send();
88✔
2196
            return;
88✔
2197
        }
88✔
2198
        // Protocol state was SendUnbound, and remains unchanged
44✔
2199
    }
88✔
2200

2201
    bool is_enlisted_to_send() const noexcept
2202
    {
269,150✔
2203
        return m_next != nullptr;
269,150✔
2204
    }
269,150✔
2205

2206
    void ensure_enlisted_to_send() noexcept
2207
    {
52,620✔
2208
        if (!is_enlisted_to_send())
52,620✔
2209
            enlist_to_send();
50,964✔
2210
    }
52,620✔
2211

2212
    void enlist_to_send() noexcept
2213
    {
107,718✔
2214
        m_connection.enlist_to_send(this);
107,718✔
2215
    }
107,718✔
2216

2217
    // Overriding memeber function in FileIdentReceiver
2218
    void receive_file_ident(SaltedFileIdent file_ident) override final
2219
    {
1,328✔
2220
        // Protocol state must be AllocatingIdent or WaitForUnbind
562✔
2221
        if (!ident_message_received()) {
1,328✔
2222
            REALM_ASSERT(need_client_file_ident());
1,328✔
2223
            REALM_ASSERT(m_send_ident_message);
1,328✔
2224
        }
1,328✔
2225
        else {
×
2226
            REALM_ASSERT(!m_send_ident_message);
×
2227
        }
×
2228
        REALM_ASSERT(!unbind_message_received());
1,328✔
2229
        REALM_ASSERT(!error_occurred());
1,328✔
2230
        REALM_ASSERT(!m_error_message_sent);
1,328✔
2231

562✔
2232
        m_file_ident_request = 0;
1,328✔
2233
        m_allocated_file_ident = file_ident;
1,328✔
2234

562✔
2235
        // If the protocol state was AllocatingIdent, it is now SendIdent,
562✔
2236
        // otherwise it continues to be WaitForUnbind.
562✔
2237

562✔
2238
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,328✔
2239
                     file_ident.salt); // Throws
1,328✔
2240

562✔
2241
        ensure_enlisted_to_send();
1,328✔
2242
    }
1,328✔
2243

2244
    // Called by the associated connection object when this session is granted
2245
    // an opportunity to initiate the sending of a message.
2246
    //
2247
    // This function may lead to the destruction of the session object
2248
    // (suicide).
2249
    void send_message()
2250
    {
107,534✔
2251
        if (REALM_LIKELY(!unbind_message_received())) {
107,534✔
2252
            if (REALM_LIKELY(!error_occurred())) {
104,576✔
2253
                if (REALM_LIKELY(ident_message_received())) {
104,486✔
2254
                    // State is WaitForUnbind.
54,948✔
2255
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
103,164✔
2256
                    if (REALM_LIKELY(!relayed_alloc)) {
103,164✔
2257
                        // Send DOWNLOAD or MARK.
54,948✔
2258
                        continue_history_scan(); // Throws
103,164✔
2259
                        // Session object may have been
54,948✔
2260
                        // destroyed at this point (suicide)
54,948✔
2261
                        return;
103,164✔
2262
                    }
103,164✔
2263
                    send_alloc_message(); // Throws
×
2264
                    return;
×
2265
                }
×
2266
                // State is SendIdent
558✔
2267
                send_ident_message(); // Throws
1,322✔
2268
                return;
1,322✔
2269
            }
1,322✔
2270
            // State is SendError
46✔
2271
            send_error_message(); // Throws
90✔
2272
            return;
90✔
2273
        }
90✔
2274
        // State is SendUnbound
1,388✔
2275
        send_unbound_message(); // Throws
2,958✔
2276
        terminate();            // Throws
2,958✔
2277
        m_connection.discard_session(m_session_ident);
2,958✔
2278
        // This session is now destroyed!
1,388✔
2279
    }
2,958✔
2280

2281
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2282
                              bool is_subserver, ProtocolError& error)
2283
    {
5,666✔
2284
        if (logger.would_log(util::Logger::Level::info)) {
5,666✔
2285
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
546✔
2286
                          "need_client_file_ident=%3, is_subserver=%4)",
546✔
2287
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
546✔
2288
                          int(is_subserver)); // Throws
546✔
2289
        }
546✔
2290

2,782✔
2291
        ServerImpl& server = m_connection.get_server();
5,666✔
2292
        _impl::VirtualPathComponents virt_path_components =
5,666✔
2293
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
5,666✔
2294

2,782✔
2295
        if (!virt_path_components.is_valid) {
5,666✔
2296
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2297
                         "signed_user_token='%2')",
28✔
2298
                         path,
28✔
2299
                         short_token_fmt(signed_user_token)); // Throws
28✔
2300
            error = ProtocolError::illegal_realm_path;
28✔
2301
            return false;
28✔
2302
        }
28✔
2303

2,768✔
2304
        // The user has proper permissions at this stage.
2,768✔
2305

2,768✔
2306
        m_server_file = server.get_or_create_file(path); // Throws
5,638✔
2307

2,768✔
2308
        m_server_file->add_unidentified_session(this); // Throws
5,638✔
2309

2,768✔
2310
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
5,638✔
2311
                    m_connection.get_client_protocol_version(),
5,638✔
2312
                    m_connection.get_client_user_agent()); // Throws
5,638✔
2313

2,768✔
2314
        m_is_subserver = is_subserver;
5,638✔
2315
        if (REALM_LIKELY(!need_client_file_ident)) {
5,638✔
2316
            // Protocol state is now WaitForUnbind
1,536✔
2317
            return true;
3,240✔
2318
        }
3,240✔
2319

1,232✔
2320
        // FIXME: We must make a choice about client file ident for read only
1,232✔
2321
        // sessions. They should have a special read-only client file ident.
1,232✔
2322
        file_ident_type proxy_file = 0; // No proxy
2,398✔
2323
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
2,398✔
2324
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
2,398✔
2325
        m_send_ident_message = true;
2,398✔
2326
        // Protocol state is now AllocatingIdent
1,232✔
2327

1,232✔
2328
        return true;
2,398✔
2329
    }
2,398✔
2330

2331
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2332
                               version_type scan_server_version, version_type scan_client_version,
2333
                               version_type latest_server_version, salt_type latest_server_version_salt,
2334
                               ProtocolError& error)
2335
    {
4,332✔
2336
        // Protocol state must be WaitForIdent
2,016✔
2337
        REALM_ASSERT(!need_client_file_ident());
4,332✔
2338
        REALM_ASSERT(!m_send_ident_message);
4,332✔
2339
        REALM_ASSERT(!ident_message_received());
4,332✔
2340
        REALM_ASSERT(!unbind_message_received());
4,332✔
2341
        REALM_ASSERT(!error_occurred());
4,332✔
2342
        REALM_ASSERT(!m_error_message_sent);
4,332✔
2343

2,016✔
2344
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
4,332✔
2345
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
4,332✔
2346
                     "latest_server_version_salt=%6)",
4,332✔
2347
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
4,332✔
2348
                     latest_server_version, latest_server_version_salt); // Throws
4,332✔
2349

2,016✔
2350
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
4,332✔
2351
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
4,332✔
2352
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
4,332✔
2353
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
4,332✔
2354
        UploadCursor upload_threshold = {0, 0};
4,332✔
2355
        version_type locked_server_version = 0;
4,332✔
2356
        BootstrapError error_2 =
4,332✔
2357
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
4,332✔
2358
                                                    client_type, upload_threshold, locked_server_version,
4,332✔
2359
                                                    logger); // Throws
4,332✔
2360
        switch (error_2) {
4,332✔
2361
            case BootstrapError::no_error:
4,300✔
2362
                break;
4,300✔
2363
            case BootstrapError::client_file_expired:
✔
2364
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2365
                error = ProtocolError::client_file_expired;
×
2366
                return false;
×
2367
            case BootstrapError::bad_client_file_ident:
✔
2368
                logger.error("Bad client file ident (%1) in IDENT message",
×
2369
                             client_file_ident); // Throws
×
2370
                error = ProtocolError::bad_client_file_ident;
×
2371
                return false;
×
2372
            case BootstrapError::bad_client_file_ident_salt:
4✔
2373
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2374
                             client_file_ident_salt); // Throws
4✔
2375
                error = ProtocolError::diverging_histories;
4✔
2376
                return false;
4✔
2377
            case BootstrapError::bad_download_server_version:
✔
2378
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2379
                error = ProtocolError::bad_server_version;
×
2380
                return false;
×
2381
            case BootstrapError::bad_download_client_version:
4✔
2382
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2383
                error = ProtocolError::bad_client_version;
4✔
2384
                return false;
4✔
2385
            case BootstrapError::bad_server_version:
20✔
2386
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2387
                error = ProtocolError::bad_server_version;
20✔
2388
                return false;
20✔
2389
            case BootstrapError::bad_server_version_salt:
4✔
2390
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2391
                error = ProtocolError::diverging_histories;
4✔
2392
                return false;
4✔
2393
            case BootstrapError::bad_client_type:
✔
2394
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2395
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2396
                                                              // `bad_client_type`.
2397
                return false;
×
2398
        }
4,300✔
2399

2,000✔
2400
        // Make sure there is no other session currently associcated with the
2,000✔
2401
        // same client-side file
2,000✔
2402
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
4,300✔
2403
            SyncConnection& other_conn = other_sess->get_connection();
×
2404
            // It is a protocol violation if the other session is associated
2405
            // with the same connection
2406
            if (&other_conn == &m_connection) {
×
2407
                logger.error("Client file already bound in other session associated with "
×
2408
                             "the same connection"); // Throws
×
2409
                error = ProtocolError::bound_in_other_session;
×
2410
                return false;
×
2411
            }
×
2412
            // When the other session is associated with a different connection
2413
            // (`other_conn`), the clash may be due to the server not yet having
2414
            // realized that the other connection has been closed by the
2415
            // client. If so, the other connention is a "zombie". In the
2416
            // interest of getting rid of zombie connections as fast as
2417
            // possible, we shall assume that a clash with a session in another
2418
            // connection is always due to that other connection being a
2419
            // zombie. And when such a situation is detected, we want to close
2420
            // the zombie connection immediately.
2421
            auto log_level = Logger::Level::detail;
×
2422
            other_conn.terminate(log_level,
×
2423
                                 "Sync connection closed (superseded session)"); // Throws
×
2424
        }
×
2425

2,000✔
2426
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
4,300✔
2427

2,000✔
2428
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
4,300✔
2429
                                                                  m_session_ident, client_file_ident));
4,300✔
2430

2,000✔
2431
        m_server_file->identify_session(this, client_file_ident); // Throws
4,300✔
2432

2,000✔
2433
        m_client_file_ident = client_file_ident;
4,300✔
2434
        m_download_progress = download_progress;
4,300✔
2435
        m_upload_threshold = upload_threshold;
4,300✔
2436
        m_locked_server_version = locked_server_version;
4,300✔
2437

2,000✔
2438
        ServerImpl& server = m_connection.get_server();
4,300✔
2439
        const Server::Config& config = server.get_config();
4,300✔
2440
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
4,300✔
2441

2,000✔
2442
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
4,300✔
2443
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2444
                                              client_file_ident); // Throws
×
2445
        }
×
2446

2,000✔
2447
        // Protocol  state is now WaitForUnbind
2,000✔
2448
        enlist_to_send();
4,300✔
2449
        return true;
4,300✔
2450
    }
4,300✔
2451

2452
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2453
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2454
                                ProtocolError& error)
2455
    {
43,862✔
2456
        // Protocol state must be WaitForUnbind
22,112✔
2457
        REALM_ASSERT(!m_send_ident_message);
43,862✔
2458
        REALM_ASSERT(ident_message_received());
43,862✔
2459
        REALM_ASSERT(!unbind_message_received());
43,862✔
2460
        REALM_ASSERT(!error_occurred());
43,862✔
2461
        REALM_ASSERT(!m_error_message_sent);
43,862✔
2462

22,112✔
2463
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
43,862✔
2464
                      "locked_server_version=%3, num_changesets=%4)",
43,862✔
2465
                      progress_client_version, progress_server_version, locked_server_version,
43,862✔
2466
                      upload_changesets.size()); // Throws
43,862✔
2467

22,112✔
2468
        // We are unable to reproduce the cursor object for the upload progress
22,112✔
2469
        // when the protocol version is less than 29, because the client does
22,112✔
2470
        // not provide the required information. When the protocol version is
22,112✔
2471
        // less than 25, we can always get a consistent cursor by taking it from
22,112✔
2472
        // the changeset that was uploaded last, but in protocol versions 25,
22,112✔
2473
        // 26, 27, and 28, things are more complicated. Here, we receive new
22,112✔
2474
        // values for `last_integrated_server_version` which we cannot afford to
22,112✔
2475
        // ignore, but we do not know what client versions they correspond
22,112✔
2476
        // to. Fortunately, we can produce a cursor that works, and is mutually
22,112✔
2477
        // consistent with previous cursors, by simply bumping
22,112✔
2478
        // `upload_progress.client_version` when
22,112✔
2479
        // `upload_progress.last_intgerated_server_version` grows.
22,112✔
2480
        //
22,112✔
2481
        // To see that this scheme works, consider the last changeset, A, that
22,112✔
2482
        // will have already been uploaded and integrated at the beginning of
22,112✔
2483
        // the next session, and the first changeset, B, that follows A in the
22,112✔
2484
        // client side history, and is not upload skippable (of local origin and
22,112✔
2485
        // nonempty). We then need to show that A will be skipped, if uploaded
22,112✔
2486
        // in the next session, but B will not.
22,112✔
2487
        //
22,112✔
2488
        // Let V be the client version produced by A, and let T be the value of
22,112✔
2489
        // `upload_progress.client_version` as determined in this session, which
22,112✔
2490
        // is used as threshold in the next session. Then we know that A is
22,112✔
2491
        // skipped during the next session if V is less than, or equal to T. If
22,112✔
2492
        // the protocol version is at least 29, the protocol requires that T is
22,112✔
2493
        // greater than, or equal to V. If the protocol version is less than 25,
22,112✔
2494
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
22,112✔
2495
        // or 28, we construct T such that it is always greater than, or equal
22,112✔
2496
        // to V, so in all cases, A will be skipped during the next session.
22,112✔
2497
        //
22,112✔
2498
        // Let W be the client version on which B is based. We then know that B
22,112✔
2499
        // will be retained if, and only if W is greater than, or equalto T. If
22,112✔
2500
        // the protocol version is at least 29, we know that T is less than, or
22,112✔
2501
        // equal to W, since B is not integrated until the next session. If the
22,112✔
2502
        // protocol version is less tahn 25, we know that T is V. Since V must
22,112✔
2503
        // be less than, or equal to W, we again know that T is less than, or
22,112✔
2504
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
22,112✔
2505
        // construct T such that it is equal to V + N, where N is the number of
22,112✔
2506
        // observed increments in `last_integrated_server_version` since the
22,112✔
2507
        // client version prodiced by A. For each of these observed increments,
22,112✔
2508
        // there must have been a distinct new client version, but all these
22,112✔
2509
        // client versions must be less than, or equal to W, since B is not
22,112✔
2510
        // integrated until the next session. Therefore, we know that T = V + N
22,112✔
2511
        // is less than, or qual to W. So, in all cases, B will not skipped
22,112✔
2512
        // during the next session.
22,112✔
2513
        int protocol_version = m_connection.get_client_protocol_version();
43,862✔
2514
        static_cast<void>(protocol_version); // No protocol diversion (yet)
43,862✔
2515

22,112✔
2516
        UploadCursor upload_progress;
43,862✔
2517
        upload_progress = {progress_client_version, progress_server_version};
43,862✔
2518

22,112✔
2519
        // `upload_progress.client_version` must be nondecreasing across the
22,112✔
2520
        // session.
22,112✔
2521
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
43,862✔
2522
        if (REALM_UNLIKELY(!good_1)) {
43,862✔
2523
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2524
                         m_upload_progress.client_version); // Throws
×
2525
            error = ProtocolError::bad_client_version;
×
2526
            return false;
×
2527
        }
×
2528
        // `upload_progress.last_integrated_server_version` must be a version
22,112✔
2529
        // that the client can have heard about.
22,112✔
2530
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
43,862✔
2531
        if (REALM_UNLIKELY(!good_2)) {
43,862✔
2532
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2533
                         upload_progress.last_integrated_server_version,
×
2534
                         m_download_progress.server_version); // Throws
×
2535
            error = ProtocolError::bad_server_version;
×
2536
            return false;
×
2537
        }
×
2538

22,112✔
2539
        // `upload_progress` must be consistent.
22,112✔
2540
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
43,862✔
2541
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2542
                         upload_progress.last_integrated_server_version); // Throws
×
2543
            error = ProtocolError::bad_server_version;
×
2544
            return false;
×
2545
        }
×
2546
        // `upload_progress` and `m_upload_threshold` must be mutually
22,112✔
2547
        // consistent.
22,112✔
2548
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
43,862✔
2549
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2550
                         "threshold (%3, %4)",
×
2551
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2552
                         m_upload_threshold.client_version,
×
2553
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2554
            error = ProtocolError::bad_server_version;
×
2555
            return false;
×
2556
        }
×
2557
        // `upload_progress` and `m_upload_progress` must be mutually
22,112✔
2558
        // consistent.
22,112✔
2559
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
43,862✔
2560
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2561
                         "upload progress (%3, %4)",
×
2562
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2563
                         m_upload_progress.client_version,
×
2564
                         m_upload_progress.last_integrated_server_version); // Throws
×
2565
            error = ProtocolError::bad_server_version;
×
2566
            return false;
×
2567
        }
×
2568

22,112✔
2569
        version_type locked_server_version_2 = locked_server_version;
43,862✔
2570

22,112✔
2571
        // `locked_server_version_2` must be nondecreasing over the lifetime of
22,112✔
2572
        // the client-side file.
22,112✔
2573
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
43,862✔
2574
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2575
                         m_locked_server_version); // Throws
×
2576
            error = ProtocolError::bad_server_version;
×
2577
            return false;
×
2578
        }
×
2579
        // `locked_server_version_2` must be a version that the client can have
22,112✔
2580
        // heard about.
22,112✔
2581
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
43,862✔
2582
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2583
                         m_download_progress.server_version); // Throws
×
2584
            error = ProtocolError::bad_server_version;
×
2585
            return false;
×
2586
        }
×
2587

22,112✔
2588
        std::size_t num_previously_integrated_changesets = 0;
43,862✔
2589
        if (!upload_changesets.empty()) {
43,862✔
2590
            UploadCursor up = m_upload_progress;
22,556✔
2591
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
36,774✔
2592
                // `uc.upload_cursor.client_version` must be increasing across
17,842✔
2593
                // all the changesets in this UPLOAD message, and all must be
17,842✔
2594
                // greater than upload_progress.client_version of previous
17,842✔
2595
                // UPLOAD message.
17,842✔
2596
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
36,774✔
2597
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2598
                                 "changeset (%1 <= %2)",
×
2599
                                 uc.upload_cursor.client_version,
×
2600
                                 up.client_version); // Throws
×
2601
                    error = ProtocolError::bad_client_version;
×
2602
                    return false;
×
2603
                }
×
2604
                // `uc.upload_progress` must be consistent.
17,842✔
2605
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
36,774✔
2606
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2607
                                 uc.upload_cursor.client_version,
×
2608
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2609
                    error = ProtocolError::bad_server_version;
×
2610
                    return false;
×
2611
                }
×
2612
                // `uc.upload_progress` must be mutually consistent with
17,842✔
2613
                // previous upload cursor.
17,842✔
2614
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
36,774✔
2615
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2616
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2617
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2618
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2619
                    error = ProtocolError::bad_server_version;
×
2620
                    return false;
×
2621
                }
×
2622
                // `uc.upload_progress` must be mutually consistent with
17,842✔
2623
                // threshold, that is, for changesets that have not previously
17,842✔
2624
                // been integrated, it is important that the specified value of
17,842✔
2625
                // `last_integrated_server_version` is greater than, or equal to
17,842✔
2626
                // the reciprocal history base version.
17,842✔
2627
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
36,774✔
2628
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
36,774✔
2629
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2630
                                 "inconsistent with threshold (%3, %4)",
×
2631
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2632
                                 m_upload_threshold.client_version,
×
2633
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2634
                    error = ProtocolError::bad_server_version;
×
2635
                    return false;
×
2636
                }
×
2637
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
36,774✔
2638
                if (previously_integrated)
36,774✔
2639
                    ++num_previously_integrated_changesets;
2,344✔
2640
                up = uc.upload_cursor;
36,774✔
2641
            }
36,774✔
2642
            // `upload_progress.client_version` must be greater than, or equal
11,710✔
2643
            // to client versions produced by each of the changesets in this
11,710✔
2644
            // UPLOAD message.
11,710✔
2645
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
22,556✔
2646
                logger.error("Upload progress less than client version produced by uploaded "
×
2647
                             "changeset (%1 > %2)",
×
2648
                             up.client_version,
×
2649
                             upload_progress.client_version); // Throws
×
2650
                error = ProtocolError::bad_client_version;
×
2651
                return false;
×
2652
            }
×
2653
            // The upload cursor of last uploaded changeset must be mutually
11,710✔
2654
            // consistent with the reported upload progress.
11,710✔
2655
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
22,556✔
2656
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2657
                             "inconsistent with upload progress (%3, %4)",
×
2658
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2659
                             upload_progress.last_integrated_server_version); // Throws
×
2660
                error = ProtocolError::bad_server_version;
×
2661
                return false;
×
2662
            }
×
2663
        }
43,862✔
2664

22,112✔
2665
        // FIXME: Part of a very poor man's substitute for a proper backpressure
22,112✔
2666
        // scheme.
22,112✔
2667
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
43,862✔
2668
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2669
            // Using this exact error code, because it causes `try_again` flag
2670
            // to be set to true, which causes the client to wait for about 5
2671
            // minuites before trying to connect again.
2672
            error = ProtocolError::connection_closed;
×
2673
            return false;
×
2674
        }
×
2675

22,112✔
2676
        m_upload_progress = upload_progress;
43,862✔
2677

22,112✔
2678
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
43,862✔
2679
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
43,862✔
2680

22,112✔
2681
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
43,862✔
2682
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
43,862✔
2683

22,112✔
2684
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
43,862✔
2685
        if (!have_anything_to_do)
43,862✔
2686
            return true;
206✔
2687

21,998✔
2688
        if (!have_real_upload_progress)
43,656✔
2689
            upload_progress = m_upload_threshold;
×
2690

21,998✔
2691
        if (num_previously_integrated_changesets > 0) {
43,656✔
2692
            logger.detail("Ignoring %1 previously integrated changesets",
712✔
2693
                          num_previously_integrated_changesets); // Throws
712✔
2694
        }
712✔
2695
        if (num_changesets_to_integrate > 0) {
43,656✔
2696
            logger.detail("Initiate integration of %1 remote changesets",
22,258✔
2697
                          num_changesets_to_integrate); // Throws
22,258✔
2698
        }
22,258✔
2699

21,998✔
2700
        REALM_ASSERT(m_server_file);
43,656✔
2701
        ServerFile& file = *m_server_file;
43,656✔
2702
        std::size_t offset = num_previously_integrated_changesets;
43,656✔
2703
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
43,656✔
2704
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
43,656✔
2705

21,998✔
2706
        m_locked_server_version = locked_server_version_2;
43,656✔
2707
        return true;
43,656✔
2708
    }
43,656✔
2709

2710
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2711
    {
12,366✔
2712
        // Protocol state must be WaitForUnbind
5,974✔
2713
        REALM_ASSERT(!m_send_ident_message);
12,366✔
2714
        REALM_ASSERT(ident_message_received());
12,366✔
2715
        REALM_ASSERT(!unbind_message_received());
12,366✔
2716
        REALM_ASSERT(!error_occurred());
12,366✔
2717
        REALM_ASSERT(!m_error_message_sent);
12,366✔
2718

5,974✔
2719
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
12,366✔
2720

5,974✔
2721
        m_download_completion_request = request_ident;
12,366✔
2722

5,974✔
2723
        ensure_enlisted_to_send();
12,366✔
2724
        return true;
12,366✔
2725
    }
12,366✔
2726

2727
    // Returns true if the deactivation process has been completed, at which
2728
    // point the caller (SyncConnection::receive_unbind_message()) should
2729
    // terminate the session.
2730
    //
2731
    // CAUTION: This function may commit suicide!
2732
    void receive_unbind_message()
2733
    {
2,988✔
2734
        // Protocol state may be anything but SendUnbound
1,402✔
2735
        REALM_ASSERT(!m_unbind_message_received);
2,988✔
2736

1,402✔
2737
        logger.detail("Received: UNBIND"); // Throws
2,988✔
2738

1,402✔
2739
        detach_from_server_file();
2,988✔
2740
        m_unbind_message_received = true;
2,988✔
2741

1,402✔
2742
        // Detect completion of the deactivation process
1,402✔
2743
        if (m_error_message_sent) {
2,988✔
2744
            // Deactivation process completed
14✔
2745
            terminate(); // Throws
30✔
2746
            m_connection.discard_session(m_session_ident);
30✔
2747
            // This session is now destroyed!
14✔
2748
            return;
30✔
2749
        }
30✔
2750

1,388✔
2751
        // Protocol state is now SendUnbound
1,388✔
2752
        ensure_enlisted_to_send();
2,958✔
2753
    }
2,958✔
2754

2755
    void receive_error_message(session_ident_type, int, std::string_view)
2756
    {
×
2757
        REALM_ASSERT(!m_unbind_message_received);
×
2758

2759
        logger.detail("Received: ERROR"); // Throws
×
2760
    }
×
2761

2762
private:
2763
    SyncConnection& m_connection;
2764

2765
    const session_ident_type m_session_ident;
2766

2767
    // Not null if, and only if this session is in
2768
    // m_connection.m_sessions_enlisted_to_send.
2769
    Session* m_next = nullptr;
2770

2771
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2772
    // reset to null when the deactivation process is initiated, either when the
2773
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2774
    util::bind_ptr<ServerFile> m_server_file;
2775

2776
    bool m_disable_download = false;
2777
    bool m_is_subserver = false;
2778

2779
    using file_ident_request_type = ServerFile::file_ident_request_type;
2780

2781
    // When nonzero, this session has an outstanding request for a client file
2782
    // identifier.
2783
    file_ident_request_type m_file_ident_request = 0;
2784

2785
    // Payload for next outgoing ALLOC message.
2786
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2787

2788
    // Zero until the session receives an IDENT message from the client.
2789
    file_ident_type m_client_file_ident = 0;
2790

2791
    // Zero until initiate_deactivation() is called.
2792
    ProtocolError m_error_code = {};
2793

2794
    // The current point of progression of the download process. Set to (<server
2795
    // version>, <client version>) of the IDENT message when the IDENT message
2796
    // is received. At the time of return from continue_history_scan(), it
2797
    // points to the latest server version such that all preceding changesets in
2798
    // the server-side history have been downloaded, are currently being
2799
    // downloaded, or are *download excluded*.
2800
    DownloadCursor m_download_progress = {0, 0};
2801

2802
    request_ident_type m_download_completion_request = 0;
2803

2804
    // Records the progress of the upload process. Used to check that the client
2805
    // uploads changesets in order. Also, when m_upload_progress >
2806
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2807
    // version of the upload progress.
2808
    UploadCursor m_upload_progress = {0, 0};
2809

2810
    // Initialized on reception of the IDENT message. Specifies the actual
2811
    // upload progress (as recorded on the server-side) at the beginning of the
2812
    // session, and it remains fixed throughout the session.
2813
    //
2814
    // m_upload_threshold includes the progress resulting from the received
2815
    // changesets that have not yet been integrated (only relevant for
2816
    // synchronous backup).
2817
    UploadCursor m_upload_threshold = {0, 0};
2818

2819
    // Works partially as a cache of the persisted value, and partially as a way
2820
    // of checking that the client respects that it can never decrease.
2821
    version_type m_locked_server_version = 0;
2822

2823
    bool m_send_ident_message = false;
2824
    bool m_unbind_message_received = false;
2825
    bool m_error_message_sent = false;
2826

2827
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2828
    /// has been sent in the current session. The variable is used to ensure
2829
    /// that a DOWNLOAD message is always sent in a session. The received
2830
    /// DOWNLOAD message is needed by the client to ensure that its current
2831
    /// download progress is up to date.
2832
    bool m_one_download_message_sent = false;
2833

2834
    static std::string make_logger_prefix(session_ident_type session_ident)
2835
    {
5,666✔
2836
        std::ostringstream out;
5,666✔
2837
        out.imbue(std::locale::classic());
5,666✔
2838
        out << "Session[" << session_ident << "]: "; // Throws
5,666✔
2839
        return out.str();                            // Throws
5,666✔
2840
    }
5,666✔
2841

2842
    // Scan the history for changesets to be downloaded.
2843
    // If the history is longer than the end point of the previous scan,
2844
    // a DOWNLOAD message will be sent.
2845
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2846
    // requested to be notified about download completion.
2847
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2848
    //
2849
    // This function may lead to the destruction of the session object
2850
    // (suicide).
2851
    void continue_history_scan()
2852
    {
103,158✔
2853
        // Protocol state must be WaitForUnbind
54,946✔
2854
        REALM_ASSERT(!m_send_ident_message);
103,158✔
2855
        REALM_ASSERT(ident_message_received());
103,158✔
2856
        REALM_ASSERT(!unbind_message_received());
103,158✔
2857
        REALM_ASSERT(!error_occurred());
103,158✔
2858
        REALM_ASSERT(!m_error_message_sent);
103,158✔
2859
        REALM_ASSERT(!is_enlisted_to_send());
103,158✔
2860

54,946✔
2861
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
103,158✔
2862
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
103,158✔
2863

54,946✔
2864
        ServerImpl& server = m_connection.get_server();
103,158✔
2865
        const Server::Config& config = server.get_config();
103,158✔
2866
        if (REALM_UNLIKELY(m_disable_download))
103,158✔
2867
            return;
54,946✔
2868

54,946✔
2869
        bool have_more_to_scan =
103,158✔
2870
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
103,158✔
2871
        if (have_more_to_scan) {
103,158✔
2872
            m_server_file->register_client_access(m_client_file_ident);     // Throws
40,090✔
2873
            const ServerHistory& history = m_server_file->access().history; // Throws
40,090✔
2874
            const char* body;
40,090✔
2875
            std::size_t uncompressed_body_size;
40,090✔
2876
            std::size_t compressed_body_size = 0;
40,090✔
2877
            bool body_is_compressed = false;
40,090✔
2878
            version_type end_version = last_server_version.version;
40,090✔
2879
            DownloadCursor download_progress;
40,090✔
2880
            UploadCursor upload_progress = {0, 0};
40,090✔
2881
            std::uint_fast64_t downloadable_bytes = 0;
40,090✔
2882
            std::size_t num_changesets;
40,090✔
2883
            std::size_t accum_original_size;
40,090✔
2884
            std::size_t accum_compacted_size;
40,090✔
2885
            ServerProtocol& protocol = get_server_protocol();
40,090✔
2886
            bool disable_download_compaction = config.disable_download_compaction;
40,090✔
2887
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
40,090!
2888
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
21,818!
2889
            DownloadCache& cache = m_server_file->get_download_cache();
40,090✔
2890
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
40,090!
2891
            if (fetch_from_cache) {
40,090✔
2892
                body = cache.body.get();
×
2893
                uncompressed_body_size = cache.uncompressed_body_size;
×
2894
                compressed_body_size = cache.compressed_body_size;
×
2895
                body_is_compressed = cache.body_is_compressed;
×
2896
                download_progress = cache.download_progress;
×
2897
                downloadable_bytes = cache.downloadable_bytes;
×
2898
                num_changesets = cache.num_changesets;
×
2899
                accum_original_size = cache.accum_original_size;
×
2900
                accum_compacted_size = cache.accum_compacted_size;
×
2901
            }
×
2902
            else {
40,090✔
2903
                // Discard the old cached DOWNLOAD body before generating a new
21,818✔
2904
                // one to be cached. This can make a big difference because the
21,818✔
2905
                // size of that body can be very large (10GiB has been seen in a
21,818✔
2906
                // real-world case).
21,818✔
2907
                if (enable_cache)
40,090✔
2908
                    cache.body = {};
×
2909

21,818✔
2910
                OutputBuffer& out = server.get_misc_buffers().download_message;
40,090✔
2911
                out.reset();
40,090✔
2912
                download_progress = m_download_progress;
40,090✔
2913
                auto fetch_and_compress = [&](std::size_t max_download_size) {
40,094✔
2914
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
40,092✔
2915
                    std::uint_fast64_t cumulative_byte_size_current;
40,092✔
2916
                    std::uint_fast64_t cumulative_byte_size_total;
40,092✔
2917
                    bool not_expired = history.fetch_download_info(
40,092✔
2918
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
40,092✔
2919
                        cumulative_byte_size_current, cumulative_byte_size_total, disable_download_compaction,
40,092✔
2920
                        max_download_size); // Throws
40,092✔
2921
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
40,092✔
2922
                    SyncConnection& conn = get_connection();
40,092✔
2923
                    if (REALM_UNLIKELY(!not_expired)) {
40,092✔
2924
                        logger.debug("History scanning failed: Client file entry "
×
2925
                                     "expired during session"); // Throws
×
2926
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2927
                        // Session object may have been destroyed at this point
2928
                        // (suicide).
2929
                        return false;
×
2930
                    }
×
2931

21,816✔
2932
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
40,092✔
2933
                    uncompressed_body_size = out.size();
40,092✔
2934
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
40,092✔
2935
                    body = uncompressed.data();
40,092✔
2936
                    std::size_t max_uncompressed = 1024;
40,092✔
2937
                    if (uncompressed.size() > max_uncompressed) {
40,092✔
2938
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,226✔
2939
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,226✔
2940
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,226✔
2941
                        if (buffer.size() < uncompressed.size()) {
4,226✔
2942
                            body = buffer.data();
4,226✔
2943
                            compressed_body_size = buffer.size();
4,226✔
2944
                            body_is_compressed = true;
4,226✔
2945
                        }
4,226✔
2946
                    }
4,226✔
2947
                    num_changesets = handler.num_changesets;
40,092✔
2948
                    accum_original_size = handler.accum_original_size;
40,092✔
2949
                    accum_compacted_size = handler.accum_compacted_size;
40,092✔
2950
                    return true;
40,092✔
2951
                };
40,092✔
2952
                if (enable_cache) {
40,090✔
2953
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2954
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2955
                        // Session object may have been destroyed at this point
2956
                        // (suicide).
2957
                        return;
×
2958
                    }
×
2959
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2960
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2961
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2962
                    std::copy(body, body + body_size, cache.body.get());
×
2963
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2964
                    cache.compressed_body_size = compressed_body_size;
×
2965
                    cache.body_is_compressed = body_is_compressed;
×
2966
                    cache.end_version = end_version;
×
2967
                    cache.download_progress = download_progress;
×
2968
                    cache.downloadable_bytes = downloadable_bytes;
×
2969
                    cache.num_changesets = num_changesets;
×
2970
                    cache.accum_original_size = accum_original_size;
×
2971
                    cache.accum_compacted_size = accum_compacted_size;
×
2972
                }
×
2973
                else {
40,090✔
2974
                    std::size_t max_download_size = config.max_download_size;
40,090✔
2975
                    if (!fetch_and_compress(max_download_size)) { // Throws
40,090✔
2976
                        // Session object may have been destroyed at this point
2977
                        // (suicide).
2978
                        return;
×
2979
                    }
×
2980
                }
40,090✔
2981
            }
40,090✔
2982

21,818✔
2983
            OutputBuffer& out = m_connection.get_output_buffer();
40,090✔
2984
            protocol.make_download_message(
40,090✔
2985
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
40,090✔
2986
                download_progress.last_integrated_client_version, last_server_version.version,
40,090✔
2987
                last_server_version.salt, upload_progress.client_version,
40,090✔
2988
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
40,090✔
2989
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
40,090✔
2990

21,818✔
2991
            if (!disable_download_compaction) {
40,092✔
2992
                std::size_t saved = accum_original_size - accum_compacted_size;
40,092✔
2993
                double saved_2 = (accum_original_size == 0 ? 0 : std::round(saved * 100.0 / accum_original_size));
31,162✔
2994
                logger.detail("Download compaction: Saved %1 bytes (%2%%)", saved, saved_2); // Throws
40,092✔
2995
            }
40,092✔
2996

21,818✔
2997
            m_download_progress = download_progress;
40,090✔
2998
            logger.debug("Setting of m_download_progress.server_version = %1",
40,090✔
2999
                         m_download_progress.server_version); // Throws
40,090✔
3000
            send_download_message();
40,090✔
3001
            m_one_download_message_sent = true;
40,090✔
3002

21,818✔
3003
            enlist_to_send();
40,090✔
3004
        }
40,090✔
3005
        else if (m_download_completion_request) {
63,068✔
3006
            // Send a MARK message
5,974✔
3007
            request_ident_type request_ident = m_download_completion_request;
12,362✔
3008
            send_mark_message(request_ident);  // Throws
12,362✔
3009
            m_download_completion_request = 0; // Request handled
12,362✔
3010
            enlist_to_send();
12,362✔
3011
        }
12,362✔
3012
    }
103,158✔
3013

3014
    void send_ident_message()
3015
    {
1,328✔
3016
        // Protocol state must be SendIdent
562✔
3017
        REALM_ASSERT(!need_client_file_ident());
1,328✔
3018
        REALM_ASSERT(m_send_ident_message);
1,328✔
3019
        REALM_ASSERT(!ident_message_received());
1,328✔
3020
        REALM_ASSERT(!unbind_message_received());
1,328✔
3021
        REALM_ASSERT(!error_occurred());
1,328✔
3022
        REALM_ASSERT(!m_error_message_sent);
1,328✔
3023

562✔
3024
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,328✔
3025

562✔
3026
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,328✔
3027
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,328✔
3028

562✔
3029
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,328✔
3030
                     client_file_ident_salt); // Throws
1,328✔
3031

562✔
3032
        ServerProtocol& protocol = get_server_protocol();
1,328✔
3033
        OutputBuffer& out = m_connection.get_output_buffer();
1,328✔
3034
        int protocol_version = m_connection.get_client_protocol_version();
1,328✔
3035
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,328✔
3036
                                    client_file_ident_salt); // Throws
1,328✔
3037
        m_connection.initiate_write_output_buffer();         // Throws
1,328✔
3038

562✔
3039
        m_allocated_file_ident.ident = 0; // Consumed
1,328✔
3040
        m_send_ident_message = false;
1,328✔
3041
        // Protocol state is now WaitForStateRequest or WaitForIdent
562✔
3042
    }
1,328✔
3043

3044
    void send_download_message()
3045
    {
40,094✔
3046
        m_connection.initiate_write_output_buffer(); // Throws
40,094✔
3047
    }
40,094✔
3048

3049
    void send_mark_message(request_ident_type request_ident)
3050
    {
12,362✔
3051
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
12,362✔
3052

5,974✔
3053
        ServerProtocol& protocol = get_server_protocol();
12,362✔
3054
        OutputBuffer& out = m_connection.get_output_buffer();
12,362✔
3055
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
12,362✔
3056
        m_connection.initiate_write_output_buffer();                     // Throws
12,362✔
3057
    }
12,362✔
3058

3059
    void send_alloc_message()
3060
    {
×
3061
        // Protocol state must be WaitForUnbind
3062
        REALM_ASSERT(!m_send_ident_message);
×
3063
        REALM_ASSERT(ident_message_received());
×
3064
        REALM_ASSERT(!unbind_message_received());
×
3065
        REALM_ASSERT(!error_occurred());
×
3066
        REALM_ASSERT(!m_error_message_sent);
×
3067

3068
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3069

3070
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3071
        REALM_ASSERT(false);
×
3072

3073
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3074

3075
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3076

3077
        ServerProtocol& protocol = get_server_protocol();
×
3078
        OutputBuffer& out = m_connection.get_output_buffer();
×
3079
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3080
        m_connection.initiate_write_output_buffer();                   // Throws
×
3081

3082
        m_allocated_file_ident.ident = 0; // Consumed
×
3083

3084
        // Other messages may be waiting to be sent.
3085
        enlist_to_send();
×
3086
    }
×
3087

3088
    void send_unbound_message()
3089
    {
2,958✔
3090
        // Protocol state must be SendUnbound
1,388✔
3091
        REALM_ASSERT(unbind_message_received());
2,958✔
3092
        REALM_ASSERT(!m_error_message_sent);
2,958✔
3093

1,388✔
3094
        logger.debug("Sending: UNBOUND"); // Throws
2,958✔
3095

1,388✔
3096
        ServerProtocol& protocol = get_server_protocol();
2,958✔
3097
        OutputBuffer& out = m_connection.get_output_buffer();
2,958✔
3098
        protocol.make_unbound_message(out, m_session_ident); // Throws
2,958✔
3099
        m_connection.initiate_write_output_buffer();         // Throws
2,958✔
3100
    }
2,958✔
3101

3102
    void send_error_message()
3103
    {
88✔
3104
        // Protocol state must be SendError
44✔
3105
        REALM_ASSERT(!unbind_message_received());
88✔
3106
        REALM_ASSERT(error_occurred());
88✔
3107
        REALM_ASSERT(!m_error_message_sent);
88✔
3108

44✔
3109
        REALM_ASSERT(is_session_level_error(m_error_code));
88✔
3110

44✔
3111
        ProtocolError error_code = m_error_code;
88✔
3112
        const char* message = get_protocol_error_message(int(error_code));
88✔
3113
        std::size_t message_size = std::strlen(message);
88✔
3114
        bool try_again = determine_try_again(error_code);
88✔
3115

44✔
3116
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
88✔
3117
                      try_again); // Throws
88✔
3118

44✔
3119
        ServerProtocol& protocol = get_server_protocol();
88✔
3120
        OutputBuffer& out = m_connection.get_output_buffer();
88✔
3121
        int protocol_version = m_connection.get_client_protocol_version();
88✔
3122
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
88✔
3123
                                    m_session_ident); // Throws
88✔
3124
        m_connection.initiate_write_output_buffer();  // Throws
88✔
3125

44✔
3126
        m_error_message_sent = true;
88✔
3127
        // Protocol state is now WaitForUnbindErr
44✔
3128
    }
88✔
3129

3130
    void send_log_message(util::Logger::Level level, const std::string&& message)
3131
    {
4,300✔
3132
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
4,300✔
3133
            return logger.log(level, message.c_str());
×
3134
        }
×
3135

2,000✔
3136
        m_connection.send_log_message(level, std::move(message), m_session_ident);
4,300✔
3137
    }
4,300✔
3138

3139
    // Idempotent
3140
    void detach_from_server_file() noexcept
3141
    {
8,742✔
3142
        if (!m_server_file)
8,742✔
3143
            return;
3,104✔
3144
        ServerFile& file = *m_server_file;
5,638✔
3145
        if (ident_message_received()) {
5,638✔
3146
            file.remove_identified_session(m_client_file_ident);
4,300✔
3147
        }
4,300✔
3148
        else {
1,338✔
3149
            file.remove_unidentified_session(this);
1,338✔
3150
        }
1,338✔
3151
        if (m_file_ident_request != 0)
5,638✔
3152
            file.cancel_file_ident_request(m_file_ident_request);
1,070✔
3153
        m_server_file.reset();
5,638✔
3154
    }
5,638✔
3155

3156
    friend class SessionQueue;
3157
};
3158

3159

3160
// ============================ SessionQueue implementation ============================
3161

3162
void SessionQueue::push_back(Session* sess) noexcept
3163
{
107,710✔
3164
    REALM_ASSERT(!sess->m_next);
107,710✔
3165
    if (m_back) {
107,710✔
3166
        sess->m_next = m_back->m_next;
38,442✔
3167
        m_back->m_next = sess;
38,442✔
3168
    }
38,442✔
3169
    else {
69,268✔
3170
        sess->m_next = sess;
69,268✔
3171
    }
69,268✔
3172
    m_back = sess;
107,710✔
3173
}
107,710✔
3174

3175

3176
Session* SessionQueue::pop_front() noexcept
3177
{
155,578✔
3178
    Session* sess = nullptr;
155,578✔
3179
    if (m_back) {
155,578✔
3180
        sess = m_back->m_next;
107,538✔
3181
        if (sess != m_back) {
107,538✔
3182
            m_back->m_next = sess->m_next;
38,374✔
3183
        }
38,374✔
3184
        else {
69,164✔
3185
            m_back = nullptr;
69,164✔
3186
        }
69,164✔
3187
        sess->m_next = nullptr;
107,538✔
3188
    }
107,538✔
3189
    return sess;
155,578✔
3190
}
155,578✔
3191

3192

3193
void SessionQueue::clear() noexcept
3194
{
3,094✔
3195
    if (m_back) {
3,094✔
3196
        Session* sess = m_back;
106✔
3197
        for (;;) {
176✔
3198
            Session* next = sess->m_next;
176✔
3199
            sess->m_next = nullptr;
176✔
3200
            if (next == m_back)
176✔
3201
                break;
106✔
3202
            sess = next;
70✔
3203
        }
70✔
3204
        m_back = nullptr;
106✔
3205
    }
106✔
3206
}
3,094✔
3207

3208

3209
// ============================ ServerFile implementation ============================
3210

3211
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3212
                       std::string real_path, bool disable_sync_to_disk)
3213
    : logger{"ServerFile[" + virt_path + "]: ", server.logger_ptr}           // Throws
3214
    , wlogger{"ServerFile[" + virt_path + "]: ", server.get_worker().logger} // Throws
3215
    , m_server{server}
3216
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
3217
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
3218
{
1,082✔
3219
}
1,082✔
3220

3221

3222
ServerFile::~ServerFile() noexcept
3223
{
1,082✔
3224
    REALM_ASSERT(m_unidentified_sessions.empty());
1,082✔
3225
    REALM_ASSERT(m_identified_sessions.empty());
1,082✔
3226
    REALM_ASSERT(m_file_ident_request == 0);
1,082✔
3227
}
1,082✔
3228

3229

3230
void ServerFile::initialize()
3231
{
1,082✔
3232
    const ServerHistory& history = access().history; // Throws
1,082✔
3233
    file_ident_type partial_file_ident = 0;
1,082✔
3234
    version_type partial_progress_reference_version = 0;
1,082✔
3235
    bool has_upstream_sync_status;
1,082✔
3236
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,082✔
3237
                       partial_progress_reference_version); // Throws
1,082✔
3238
    REALM_ASSERT(!has_upstream_sync_status);
1,082✔
3239
    REALM_ASSERT(partial_file_ident == 0);
1,082✔
3240
}
1,082✔
3241

3242

3243
void ServerFile::activate() {}
1,082✔
3244

3245

3246
// This function must be called only after a completed invocation of
3247
// initialize(). Both functinos must only ever be called by the network event
3248
// loop thread.
3249
void ServerFile::register_client_access(file_ident_type) {}
88,044✔
3250

3251

3252
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file, ClientType client_type)
3253
    -> file_ident_request_type
3254
{
2,398✔
3255
    auto request = ++m_last_file_ident_request;
2,398✔
3256
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
2,398✔
3257

1,232✔
3258
    on_work_added(); // Throws
2,398✔
3259
    return request;
2,398✔
3260
}
2,398✔
3261

3262

3263
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3264
{
1,070✔
3265
    auto i = m_file_ident_requests.find(request);
1,070✔
3266
    REALM_ASSERT(i != m_file_ident_requests.end());
1,070✔
3267
    FileIdentRequestInfo& info = i->second;
1,070✔
3268
    REALM_ASSERT(info.receiver);
1,070✔
3269
    info.receiver = nullptr;
1,070✔
3270
}
1,070✔
3271

3272

3273
void ServerFile::add_unidentified_session(Session* sess)
3274
{
5,638✔
3275
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
5,638✔
3276
    m_unidentified_sessions.insert(sess); // Throws
5,638✔
3277
}
5,638✔
3278

3279

3280
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3281
{
4,300✔
3282
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
4,300✔
3283
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
4,300✔
3284

2,000✔
3285
    m_identified_sessions[client_file_ident] = sess; // Throws
4,300✔
3286
    m_unidentified_sessions.erase(sess);
4,300✔
3287
}
4,300✔
3288

3289

3290
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3291
{
1,338✔
3292
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
1,338✔
3293
    m_unidentified_sessions.erase(sess);
1,338✔
3294
}
1,338✔
3295

3296

3297
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3298
{
4,300✔
3299
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
4,300✔
3300
    m_identified_sessions.erase(client_file_ident);
4,300✔
3301
}
4,300✔
3302

3303

3304
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3305
{
4,300✔
3306
    auto i = m_identified_sessions.find(client_file_ident);
4,300✔
3307
    if (i == m_identified_sessions.end())
4,300✔
3308
        return nullptr;
4,298✔
3309
    return i->second;
2✔
3310
}
2✔
3311

3312
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3313
{
43,864✔
3314
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
43,864✔
3315
}
43,864✔
3316

3317

3318
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3319
                                                version_type locked_server_version, const UploadChangeset* changesets,
3320
                                                std::size_t num_changesets)
3321
{
43,654✔
3322
    register_client_access(client_file_ident); // Throws
43,654✔
3323

21,996✔
3324
    bool dirty = false;
43,654✔
3325

21,996✔
3326
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
43,654✔
3327
    std::size_t num_bytes = 0;
43,654✔
3328
    for (std::size_t i = 0; i < num_changesets; ++i) {
78,082✔
3329
        const UploadChangeset& uc = changesets[i];
34,428✔
3330
        auto& changesets = list.changesets;
34,428✔
3331
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,428✔
3332
                                uc.changeset); // Throws
34,428✔
3333
        num_bytes += uc.changeset.size();
34,428✔
3334
        dirty = true;
34,428✔
3335
    }
34,428✔
3336

21,996✔
3337
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
43,654✔
3338
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
43,654✔
3339
    if (upload_progress.client_version > list.upload_progress.client_version) {
43,660✔
3340
        list.upload_progress = upload_progress;
43,658✔
3341
        dirty = true;
43,658✔
3342
    }
43,658✔
3343

21,996✔
3344
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
43,654✔
3345
    if (locked_server_version > list.locked_server_version) {
43,654✔
3346
        list.locked_server_version = locked_server_version;
38,446✔
3347
        dirty = true;
38,446✔
3348
    }
38,446✔
3349

21,996✔
3350
    if (REALM_LIKELY(dirty)) {
43,660✔
3351
        if (num_changesets > 0) {
43,658✔
3352
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
22,260✔
3353
        }
22,260✔
3354
        else {
21,398✔
3355
            on_work_added(); // Throws
21,398✔
3356
        }
21,398✔
3357
    }
43,658✔
3358
}
43,654✔
3359

3360

3361
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3362
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3363
                                                    ClientType client_type, UploadCursor& upload_progress,
3364
                                                    version_type& locked_server_version, Logger& logger)
3365
{
4,332✔
3366
    // The Realm file may contain a later snapshot than the one reflected by
2,016✔
3367
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
2,016✔
3368
    if (server_version.version > m_version_info.sync_version.version)
4,332✔
3369
        return BootstrapError::bad_server_version;
20✔
3370

2,006✔
3371
    const ServerHistory& hist = access().history; // Throws
4,312✔
3372
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
4,312✔
3373
                                                         client_type, upload_progress, locked_server_version,
4,312✔
3374
                                                         logger); // Throws
4,312✔
3375

2,006✔
3376
    // FIXME: Rather than taking previously buffered changesets from the same
2,006✔
3377
    // client file into account when determining the upload progress, and then
2,006✔
3378
    // allowing for an error during the integration of those changesets to be
2,006✔
3379
    // reported to, and terminate the new session, consider to instead postpone
2,006✔
3380
    // the bootstrapping of the new session until all previously buffered
2,006✔
3381
    // changesets from same client file have been fully processed.
2,006✔
3382

2,006✔
3383
    if (error == BootstrapError::no_error) {
4,312✔
3384
        register_client_access(client_file_ident.ident); // Throws
4,300✔
3385

2,000✔
3386
        // If upload, or releaseing of server versions progressed further during
2,000✔
3387
        // previous sessions than the persisted points, take that into account
2,000✔
3388
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
4,300✔
3389
        if (i != m_work.changesets_from_downstream.end()) {
4,300✔
3390
            const IntegratableChangesetList& list = i->second;
1,590✔
3391
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,590✔
3392
            upload_progress = list.upload_progress;
1,590✔
3393
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,590✔
3394
            locked_server_version = list.locked_server_version;
1,590✔
3395
        }
1,590✔
3396
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
4,300✔
3397
        if (j != m_changesets_from_downstream.end()) {
4,300✔
3398
            const IntegratableChangesetList& list = j->second;
138✔
3399
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
138✔
3400
            upload_progress = list.upload_progress;
138✔
3401
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
138✔
3402
            locked_server_version = list.locked_server_version;
138✔
3403
        }
138✔
3404
    }
4,300✔
3405

2,006✔
3406
    return error;
4,312✔
3407
}
4,312✔
3408

3409
// NOTE: This function is executed by the worker thread
3410
void ServerFile::worker_process_work_unit(WorkerState& state)
3411
{
37,998✔
3412
    SteadyTimePoint start_time = steady_clock_now();
37,998✔
3413
    milliseconds_type parallel_time = 0;
37,998✔
3414

19,246✔
3415
    Work& work = m_work;
37,998✔
3416
    wlogger.debug("Work unit execution started"); // Throws
37,998✔
3417

19,246✔
3418
    if (work.has_primary_work) {
37,998✔
3419
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
37,998✔
3420
            worker_allocate_file_identifiers(); // Throws
20,328✔
3421

19,246✔
3422
        if (!m_work.changesets_from_downstream.empty())
37,998✔
3423
            worker_integrate_changes_from_downstream(state); // Throws
36,316✔
3424
    }
37,998✔
3425

19,246✔
3426
    wlogger.debug("Work unit execution completed"); // Throws
37,998✔
3427

19,246✔
3428
    milliseconds_type time = steady_duration(start_time);
37,998✔
3429
    milliseconds_type seq_time = time - parallel_time;
37,998✔
3430
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
37,998✔
3431
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
37,998✔
3432

19,246✔
3433
    // Pass control back to the network event loop thread
19,246✔
3434
    network::Service& service = m_server.get_service();
37,998✔
3435
    service.post([this](Status) {
37,874✔
3436
        // FIXME: The safety of capturing `this` here, relies on the fact
19,004✔
3437
        // that ServerFile objects currently are not destroyed until the
19,004✔
3438
        // server object is destroyed.
19,004✔
3439
        group_postprocess_stage_1(); // Throws
37,632✔
3440
        // Suicide may have happened at this point
19,004✔
3441
    }); // Throws
37,632✔
3442
}
37,998✔
3443

3444

3445
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3446
{
22,260✔
3447
    m_num_changesets_from_downstream += num_changesets;
22,260✔
3448

11,548✔
3449
    if (num_bytes > 0) {
22,260✔
3450
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
22,260✔
3451
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
22,260✔
3452
    }
22,260✔
3453

11,548✔
3454
    on_work_added(); // Throws
22,260✔
3455
}
22,260✔
3456

3457

3458
void ServerFile::on_work_added()
3459
{
46,056✔
3460
    if (m_has_blocked_work)
46,056✔
3461
        return;
7,978✔
3462
    m_has_blocked_work = true;
38,078✔
3463
    // Reference file
19,280✔
3464
    if (m_has_work_in_progress)
38,078✔
3465
        return;
11,518✔
3466
    group_unblock_work(); // Throws
26,560✔
3467
}
26,560✔
3468

3469

3470
void ServerFile::group_unblock_work()
3471
{
38,040✔
3472
    REALM_ASSERT(!m_has_work_in_progress);
38,040✔
3473
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
38,040✔
3474
        unblock_work(); // Throws
38,034✔
3475
        const Work& work = m_work;
38,034✔
3476
        if (REALM_LIKELY(work.has_primary_work)) {
38,034✔
3477
            logger.trace("Work unit unblocked"); // Throws
38,004✔
3478
            m_has_work_in_progress = true;
38,004✔
3479
            Worker& worker = m_server.get_worker();
38,004✔
3480
            worker.enqueue(this); // Throws
38,004✔
3481
        }
38,004✔
3482
    }
38,034✔
3483
}
38,040✔
3484

3485

3486
void ServerFile::unblock_work()
3487
{
38,034✔
3488
    REALM_ASSERT(m_has_blocked_work);
38,034✔
3489

19,250✔
3490
    m_work.reset();
38,034✔
3491

19,250✔
3492
    // Discard requests for file identifiers whose receiver is no longer
19,250✔
3493
    // waiting.
19,250✔
3494
    {
38,034✔
3495
        auto i = m_file_ident_requests.begin();
38,034✔
3496
        auto end = m_file_ident_requests.end();
38,034✔
3497
        while (i != end) {
40,406✔
3498
            auto j = i++;
2,372✔
3499
            const FileIdentRequestInfo& info = j->second;
2,372✔
3500
            if (!info.receiver)
2,372✔
3501
                m_file_ident_requests.erase(j);
636✔
3502
        }
2,372✔
3503
    }
38,034✔
3504
    std::size_t n = m_file_ident_requests.size();
38,034✔
3505
    if (n > 0) {
38,034✔
3506
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,686✔
3507
        std::size_t i = 0;
1,686✔
3508
        for (const auto& pair : m_file_ident_requests) {
1,736✔
3509
            const FileIdentRequestInfo& info = pair.second;
1,736✔
3510
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,736✔
3511
            slot.proxy_file = info.proxy_file;
1,736✔
3512
            slot.client_type = info.client_type;
1,736✔
3513
            ++i;
1,736✔
3514
        }
1,736✔
3515
        m_work.has_primary_work = true;
1,686✔
3516
    }
1,686✔
3517

19,250✔
3518
    // FIXME: `ServerFile::m_changesets_from_downstream` and
19,250✔
3519
    // `Work::changesets_from_downstream` should be renamed to something else,
19,250✔
3520
    // as it may contain kinds of data other than changesets.
19,250✔
3521

19,250✔
3522
    using std::swap;
38,034✔
3523
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
38,034✔
3524
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
38,034✔
3525
    bool has_changesets = !m_work.changesets_from_downstream.empty();
38,034✔
3526
    if (has_changesets) {
38,034✔
3527
        m_work.has_primary_work = true;
36,320✔
3528
    }
36,320✔
3529

19,250✔
3530
    // Keep track of the size of pending changesets
19,250✔
3531
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
38,034✔
3532
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
38,034✔
3533
    m_blocked_changesets_from_downstream_byte_size = 0;
38,034✔
3534

19,250✔
3535
    m_num_changesets_from_downstream = 0;
38,034✔
3536
    m_has_blocked_work = false;
38,034✔
3537
}
38,034✔
3538

3539

3540
void ServerFile::resume_download() noexcept
3541
{
22,350✔
3542
    for (const auto& entry : m_identified_sessions) {
35,876✔
3543
        Session& sess = *entry.second;
35,876✔
3544
        sess.ensure_enlisted_to_send();
35,876✔
3545
    }
35,876✔
3546
}
22,350✔
3547

3548

3549
void ServerFile::recognize_external_change()
3550
{
4,798✔
3551
    VersionInfo prev_version_info = m_version_info;
4,798✔
3552
    const ServerHistory& history = access().history;       // Throws
4,798✔
3553
    bool has_upstream_status;                              // Dummy
4,798✔
3554
    sync::file_ident_type partial_file_ident;              // Dummy
4,798✔
3555
    sync::version_type partial_progress_reference_version; // Dummy
4,798✔
3556
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,798✔
3557
                       partial_progress_reference_version); // Throws
4,798✔
3558

2,400✔
3559
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,798✔
3560
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,798✔
3561
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,798✔
3562
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,798✔
3563
        resume_download();
4,798✔
3564
    }
4,798✔
3565
}
4,798✔
3566

3567

3568
// NOTE: This function is executed by the worker thread
3569
void ServerFile::worker_allocate_file_identifiers()
3570
{
1,684✔
3571
    Work& work = m_work;
1,684✔
3572
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,684✔
3573
    ServerHistory& hist = worker_access().history;                                      // Throws
1,684✔
3574
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,684✔
3575
    m_work.produced_new_realm_version = true;
1,684✔
3576
}
1,684✔
3577

3578

3579
// Returns true when, and only when this function produces a new sync version
3580
// (adds a new entry to the sync history).
3581
//
3582
// NOTE: This function is executed by the worker thread
3583
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3584
{
36,316✔
3585
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
36,316✔
3586

18,644✔
3587
    std::unique_ptr<ServerHistory> hist_ptr;
36,316✔
3588
    DBRef sg_ptr;
36,316✔
3589
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
36,316✔
3590
    bool backup_whole_realm = false;
36,316✔
3591
    bool produced_new_realm_version = hist.integrate_client_changesets(
36,316✔
3592
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
36,316✔
3593
        wlogger); // Throws
36,316✔
3594
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
36,316✔
3595
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
36,316✔
3596
    if (produced_new_realm_version) {
36,316✔
3597
        m_work.produced_new_realm_version = true;
36,288✔
3598
        if (produced_new_sync_version) {
36,288✔
3599
            m_work.produced_new_sync_version = true;
17,562✔
3600
        }
17,562✔
3601
    }
36,288✔
3602
    return produced_new_sync_version;
36,316✔
3603
}
36,316✔
3604

3605
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3606
                                                   DBRef& sg_ptr)
3607
{
36,316✔
3608
    if (state.use_file_cache)
36,316✔
3609
        return worker_access().history; // Throws
36,316✔
3610
    const std::string& path = m_worker_file.realm_path;
×
3611
    hist_ptr = m_server.make_history_for_path();                   // Throws
×
3612
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
×
3613
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
×
3614
    sg_ptr->claim_sync_agent();                                    // Throws
×
3615
    return *hist_ptr;                                              // Throws
×
3616
}
×
3617

3618

3619
// When worker thread finishes work unit.
3620
void ServerFile::group_postprocess_stage_1()
3621
{
37,632✔
3622
    REALM_ASSERT(m_has_work_in_progress);
37,632✔
3623

19,004✔
3624
    group_finalize_work_stage_1(); // Throws
37,632✔
3625
    group_finalize_work_stage_2(); // Throws
37,632✔
3626
    group_postprocess_stage_2();   // Throws
37,632✔
3627
}
37,632✔
3628

3629

3630
void ServerFile::group_postprocess_stage_2()
3631
{
37,634✔
3632
    REALM_ASSERT(m_has_work_in_progress);
37,634✔
3633
    group_postprocess_stage_3(); // Throws
37,634✔
3634
    // Suicide may have happened at this point
19,004✔
3635
}
37,634✔
3636

3637

3638
// When all files, including the reference file, have been backed up.
3639
void ServerFile::group_postprocess_stage_3()
3640
{
37,634✔
3641
    REALM_ASSERT(m_has_work_in_progress);
37,634✔
3642
    m_has_work_in_progress = false;
37,634✔
3643

19,004✔
3644
    logger.trace("Work unit postprocessing complete"); // Throws
37,634✔
3645
    if (m_has_blocked_work)
37,634✔
3646
        group_unblock_work(); // Throws
11,480✔
3647
}
37,634✔
3648

3649

3650
void ServerFile::finalize_work_stage_1()
3651
{
37,632✔
3652
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
37,632✔
3653
        // Report the byte size of completed downstream changesets.
9,304✔
3654
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
17,578✔
3655
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
17,578✔
3656
        m_unblocked_changesets_from_downstream_byte_size = 0;
17,578✔
3657
    }
17,578✔
3658

19,004✔
3659
    // Deal with errors (bad changesets) pertaining to downstream clients
19,004✔
3660
    std::size_t num_changesets_removed = 0;
37,632✔
3661
    std::size_t num_bytes_removed = 0;
37,632✔
3662
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
19,018✔
3663
        file_ident_type client_file_ident = entry.first;
28✔
3664
        ExtendedIntegrationError error = entry.second;
28✔
3665
        ProtocolError error_2 = ProtocolError::other_session_error;
28✔
3666
        switch (error) {
28✔
3667
            case ExtendedIntegrationError::client_file_expired:
✔
3668
                logger.debug("Changeset integration failed: Client file entry "
×
3669
                             "expired during session"); // Throws
×
3670
                error_2 = ProtocolError::client_file_expired;
×
3671
                break;
×
3672
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3673
                error_2 = ProtocolError::bad_origin_file_ident;
×
3674
                break;
×
3675
            case ExtendedIntegrationError::bad_changeset:
28✔
3676
                error_2 = ProtocolError::bad_changeset;
28✔
3677
                break;
28✔
3678
        }
28✔
3679
        auto i = m_identified_sessions.find(client_file_ident);
28✔
3680
        if (i != m_identified_sessions.end()) {
28✔
3681
            Session& sess = *i->second;
28✔
3682
            SyncConnection& conn = sess.get_connection();
28✔
3683
            conn.protocol_error(error_2, &sess); // Throws
28✔
3684
        }
28✔
3685
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
28✔
3686
        std::size_t num_changesets = list.changesets.size();
28✔
3687
        std::size_t num_bytes = 0;
28✔
3688
        for (const IntegratableChangeset& ic : list.changesets)
28✔
3689
            num_bytes += ic.changeset.size();
×
3690
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
28✔
3691
                    client_file_ident); // Throws
28✔
3692
        num_changesets_removed += num_changesets;
28✔
3693
        num_bytes_removed += num_bytes;
28✔
3694
        m_changesets_from_downstream.erase(client_file_ident);
28✔
3695
    }
28✔
3696

19,004✔
3697
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
37,632✔
3698
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
37,632✔
3699

19,004✔
3700
    if (num_changesets_removed == 0)
37,632✔
3701
        return;
37,634✔
3702

3703
    m_num_changesets_from_downstream -= num_changesets_removed;
2,147,483,647✔
3704

3705
    // The byte size of the blocked changesets must be decremented.
3706
    if (num_bytes_removed > 0) {
2,147,483,647✔
3707
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3708
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3709
    }
×
3710
}
2,147,483,647✔
3711

3712

3713
void ServerFile::finalize_work_stage_2()
3714
{
37,634✔
3715
    // Expose new snapshot to remote peers
19,004✔
3716
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
37,634✔
3717
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
37,634✔
3718
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
37,592✔
3719
        m_version_info = m_work.version_info;
37,592✔
3720
    }
37,592✔
3721

19,004✔
3722
    bool resume_download_and_upload = m_work.produced_new_sync_version;
37,634✔
3723

19,004✔
3724
    // Deliver allocated file identifiers to requesters
19,004✔
3725
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
37,634✔
3726
    auto begin = m_file_ident_requests.begin();
37,634✔
3727
    auto i = begin;
37,634✔
3728
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
20,102✔
3729
        FileIdentRequestInfo& info = i->second;
1,716✔
3730
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,716✔
3731
        REALM_ASSERT(info.client_type == slot.client_type);
1,716✔
3732
        if (FileIdentReceiver* receiver = info.receiver) {
1,716✔
3733
            info.receiver = nullptr;
1,328✔
3734
            receiver->receive_file_ident(slot.file_ident); // Throws
1,328✔
3735
        }
1,328✔
3736
        ++i;
1,716✔
3737
    }
1,716✔
3738
    m_file_ident_requests.erase(begin, i);
37,634✔
3739

19,004✔
3740
    // Resume download to downstream clients
19,004✔
3741
    if (resume_download_and_upload) {
37,634✔
3742
        resume_download();
17,552✔
3743
    }
17,552✔
3744
}
37,634✔
3745

3746
// ============================ Worker implementation ============================
3747

3748
Worker::Worker(ServerImpl& server)
3749
    : logger{"Worker: ", server.logger_ptr} // Throws
3750
    , m_server{server}
3751
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
3752
{
7,968✔
3753
    util::seed_prng_nondeterministically(m_random); // Throws
7,968✔
3754
}
7,968✔
3755

3756

3757
void Worker::enqueue(ServerFile* file)
3758
{
38,008✔
3759
    util::LockGuard lock{m_mutex};
38,008✔
3760
    m_queue.push_back(file); // Throws
38,008✔
3761
    m_cond.notify_all();
38,008✔
3762
}
38,008✔
3763

3764

3765
std::mt19937_64& Worker::server_history_get_random() noexcept
3766
{
2,742✔
3767
    return m_random;
2,742✔
3768
}
2,742✔
3769

3770

3771
void Worker::run()
3772
{
7,916✔
3773
    for (;;) {
45,912✔
3774
        ServerFile* file = nullptr;
45,912✔
3775
        {
45,912✔
3776
            util::LockGuard lock{m_mutex};
45,912✔
3777
            for (;;) {
91,216✔
3778
                if (REALM_UNLIKELY(m_stop))
91,216✔
3779
                    return;
49,952✔
3780
                if (!m_queue.empty()) {
83,300✔
3781
                    file = m_queue.front();
37,998✔
3782
                    m_queue.pop_front();
37,998✔
3783
                    break;
37,998✔
3784
                }
37,998✔
3785
                m_cond.wait(lock);
45,302✔
3786
            }
45,302✔
3787
        }
45,912✔
3788
        file->worker_process_work_unit(m_state); // Throws
41,902✔
3789
    }
37,996✔
3790
}
7,916✔
3791

3792

3793
void Worker::stop() noexcept
3794
{
7,916✔
3795
    util::LockGuard lock{m_mutex};
7,916✔
3796
    m_stop = true;
7,916✔
3797
    m_cond.notify_all();
7,916✔
3798
}
7,916✔
3799

3800

3801
// ============================ ServerImpl implementation ============================
3802

3803

3804
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3805
    : logger_ptr{config.logger ? std::move(config.logger) : Logger::get_default_logger()}
3806
    , logger{*logger_ptr}
3807
    , m_config{std::move(config)}
3808
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
3809
    , m_root_dir{root_dir} // Throws
3810
    , m_access_control{std::move(pkey)}
3811
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
3812
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
3813
    , m_worker{*this}                                                                    // Throws
3814
    , m_acceptor{get_service()}
3815
    , m_server_protocol{}       // Throws
3816
    , m_compress_memory_arena{} // Throws
3817
{
7,968✔
3818
    if (m_config.ssl) {
7,968✔
3819
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3820
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3821
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3822
    }
24✔
3823
}
7,968✔
3824

3825

3826
ServerImpl::~ServerImpl() noexcept
3827
{
7,968✔
3828
    bool server_destroyed_while_still_running = m_running;
7,968✔
3829
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
7,968✔
3830
}
7,968✔
3831

3832

3833
void ServerImpl::start()
3834
{
7,968✔
3835
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
7,968✔
3836
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
7,968✔
3837
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
7,968✔
3838
                m_protocol_version_range.first,
7,968✔
3839
                m_protocol_version_range.second); // Throws
7,968✔
3840
    logger.info("Platform: %1", util::get_platform_info());
7,968✔
3841
    bool is_debug_build = false;
7,968✔
3842
#if REALM_DEBUG
7,968✔
3843
    is_debug_build = true;
7,968✔
3844
#endif
7,968✔
3845
    {
7,968✔
3846
        const char* lead_text = "Build mode";
7,968✔
3847
        if (is_debug_build) {
7,968✔
3848
            logger.info("%1: Debug", lead_text); // Throws
7,968✔
3849
        }
7,968✔
3850
        else {
×
3851
            logger.info("%1: Release", lead_text); // Throws
×
3852
        }
×
3853
    }
7,968✔
3854
    if (is_debug_build) {
7,968✔
3855
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
7,968✔
3856
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
7,968✔
3857
    }
7,968✔
3858
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
7,968✔
3859
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
7,968✔
3860
    {
7,968✔
3861
        const char* lead_text = "Encryption";
7,968✔
3862
        if (m_config.encryption_key) {
7,968✔
3863
            logger.info("%1: Yes", lead_text); // Throws
4✔
3864
        }
4✔
3865
        else {
7,964✔
3866
            logger.info("%1: No", lead_text); // Throws
7,964✔
3867
        }
7,964✔
3868
    }
7,968✔
3869
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
7,968✔
3870
    {
7,968✔
3871
        const char* lead_text = "Disable sync to disk";
7,968✔
3872
        if (m_config.disable_sync_to_disk) {
7,968✔
3873
            logger.info("%1: All files", lead_text); // Throws
7,276✔
3874
        }
7,276✔
3875
        else {
692✔
3876
            logger.info("%1: No", lead_text); // Throws
692✔
3877
        }
692✔
3878
    }
7,968✔
3879
    if (m_config.disable_sync_to_disk) {
7,968✔
3880
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
7,276✔
3881
                    "never do this in production!"); // Throws
7,276✔
3882
    }
7,276✔
3883
    logger.info("Download compaction: %1",
7,968✔
3884
                (m_config.disable_download_compaction ? "No" : "Yes")); // Throws
7,968✔
3885
    logger.info("Download bootstrap caching: %1",
7,968✔
3886
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
7,968✔
3887
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
7,968✔
3888
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
7,968✔
3889
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
7,968✔
3890
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
7,968✔
3891
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
7,968✔
3892
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
7,968✔
3893
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
7,968✔
3894
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
7,968✔
3895

3,932✔
3896
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
7,968✔
3897

3,932✔
3898
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
7,968✔
3899

3,932✔
3900
    listen(); // Throws
7,968✔
3901
}
7,968✔
3902

3903

3904
void ServerImpl::run()
3905
{
7,916✔
3906
    auto ta = util::make_temp_assign(m_running, true);
7,916✔
3907

3,906✔
3908
    {
7,916✔
3909
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
7,916✔
3910
        std::string name;
7,916✔
3911
        if (util::Thread::get_name(name)) {
7,916✔
3912
            name += "-worker";
4,010✔
3913
            worker_thread.start_with_signals_blocked(name); // Throws
4,010✔
3914
        }
4,010✔
3915
        else {
3,906✔
3916
            worker_thread.start_with_signals_blocked(); // Throws
3,906✔
3917
        }
3,906✔
3918

3,906✔
3919
        m_service.run(); // Throws
7,916✔
3920

3,906✔
3921
        worker_thread.stop_and_rethrow(); // Throws
7,916✔
3922
    }
7,916✔
3923

3,906✔
3924
    logger.info("Realm sync server stopped");
7,916✔
3925
}
7,916✔
3926

3927

3928
void ServerImpl::stop() noexcept
3929
{
8,810✔
3930
    util::LockGuard lock{m_mutex};
8,810✔
3931
    if (m_stopped)
8,810✔
3932
        return;
842✔
3933
    m_stopped = true;
7,968✔
3934
    m_wait_or_service_stopped_cond.notify_all();
7,968✔
3935
    m_service.stop();
7,968✔
3936
}
7,968✔
3937

3938

3939
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3940
{
22,260✔
3941
    m_pending_changesets_from_downstream_byte_size += byte_size;
22,260✔
3942
    logger.debug("Byte size for pending downstream changesets incremented by "
22,260✔
3943
                 "%1 to reach a total of %2",
22,260✔
3944
                 byte_size,
22,260✔
3945
                 m_pending_changesets_from_downstream_byte_size); // Throws
22,260✔
3946
}
22,260✔
3947

3948

3949
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3950
{
17,580✔
3951
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
17,580✔
3952
    m_pending_changesets_from_downstream_byte_size -= byte_size;
17,580✔
3953
    logger.debug("Byte size for pending downstream changesets decremented by "
17,580✔
3954
                 "%1 to reach a total of %2",
17,580✔
3955
                 byte_size,
17,580✔
3956
                 m_pending_changesets_from_downstream_byte_size); // Throws
17,580✔
3957
}
17,580✔
3958

3959

3960
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3961
{
1,082✔
3962
    return get_random();
1,082✔
3963
}
1,082✔
3964

3965

3966
void ServerImpl::listen()
3967
{
7,968✔
3968
    network::Resolver resolver{get_service()};
7,968✔
3969
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
7,968✔
3970
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
7,968✔
3971
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
7,968✔
3972

3,932✔
3973
    auto i = endpoints.begin();
7,968✔
3974
    auto end = endpoints.end();
7,968✔
3975
    for (;;) {
7,968✔
3976
        std::error_code ec;
7,966✔
3977
        m_acceptor.open(i->protocol(), ec);
7,966✔
3978
        if (!ec) {
7,966✔
3979
            using SocketBase = network::SocketBase;
7,966✔
3980
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
7,966✔
3981
            if (!ec) {
7,966✔
3982
                m_acceptor.bind(*i, ec);
7,966✔
3983
                if (!ec)
7,966✔
3984
                    break;
7,968✔
3985
            }
2,147,483,647✔
3986
            m_acceptor.close();
2,147,483,647✔
3987
        }
2,147,483,647✔
3988
        if (i + 1 == end) {
2,147,483,647!
3989
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
3990
                // FIXME: We don't have the error code for previous attempts, so
3991
                // can't print a nice message.
3992
                logger.error("Failed to bind to %1:%2", i2->address(),
×
3993
                             i2->port()); // Throws
×
3994
            }
×
3995
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
3996
                         ec.message()); // Throws
×
3997
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
3998
        }
×
3999
    }
2,147,483,647✔
4000

3,932✔
4001
    m_acceptor.listen(m_config.listen_backlog);
7,968✔
4002

3,932✔
4003
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
7,968✔
4004
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
7,958✔
4005
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
7,968✔
4006
                m_config.listen_backlog, ssl_mode); // Throws
7,968✔
4007

3,932✔
4008
    initiate_accept();
7,968✔
4009
}
7,968✔
4010

4011

4012
void ServerImpl::initiate_accept()
4013
{
9,924✔
4014
    auto handler = [this](std::error_code ec) {
5,888✔
4015
        if (ec != util::error::operation_aborted)
1,958✔
4016
            handle_accept(ec);
1,958✔
4017
    };
1,958✔
4018
    bool is_ssl = bool(m_ssl_context);
9,924✔
4019
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
9,924✔
4020
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
9,924✔
4021
}
9,924✔
4022

4023

4024
void ServerImpl::handle_accept(std::error_code ec)
4025
{
1,958✔
4026
    if (ec) {
1,958✔
4027
        if (ec != util::error::connection_aborted) {
×
4028
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4029

4030
            // We close the reserved files to get a few extra file descriptors.
4031
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4032
                m_reserved_files[i].reset();
×
4033
            }
×
4034

4035
            // FIXME: There are probably errors that need to be treated
4036
            // specially, and not cause the server to "crash".
4037

4038
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4039
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4040
                             "consider increasing the limit in your system config"); // Throws
×
4041
                throw OutOfFilesError(ec);
×
4042
            }
×
4043
            else {
×
4044
                throw std::system_error(ec);
×
4045
            }
×
4046
        }
×
4047
        logger.debug("Skipping aborted connection"); // Throws
×
4048
    }
×
4049
    else {
1,958✔
4050
        HTTPConnection& conn = *m_next_http_conn;
1,958✔
4051
        if (m_config.tcp_no_delay)
1,958✔
4052
            conn.get_socket().set_option(network::SocketBase::no_delay(true));       // Throws
1,642✔
4053
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn));      // Throws
1,958✔
4054
        Formatter& formatter = m_misc_buffers.formatter;
1,958✔
4055
        formatter.reset();
1,958✔
4056
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
1,958✔
4057
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
1,958✔
4058
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
1,958✔
4059
    }
1,958✔
4060
    initiate_accept(); // Throws
1,958✔
4061
}
1,958✔
4062

4063

4064
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4065
{
1,958✔
4066
    m_http_connections.erase(conn_id);
1,958✔
4067
}
1,958✔
4068

4069

4070
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4071
{
1,928✔
4072
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
1,928✔
4073
}
1,928✔
4074

4075

4076
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4077
{
1,126✔
4078
    m_sync_connections.erase(connection_id);
1,126✔
4079
}
1,126✔
4080

4081

4082
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4083
{
4✔
4084
    get_service().post([this, timeout](Status) {
4✔
4085
        m_config.connection_reaper_timeout = timeout;
4✔
4086
    });
4✔
4087
}
4✔
4088

4089

4090
void ServerImpl::close_connections()
4091
{
20✔
4092
    get_service().post([this](Status) {
20✔
4093
        do_close_connections(); // Throws
20✔
4094
    });
20✔
4095
}
20✔
4096

4097

4098
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4099
{
72✔
4100
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4101
}
72✔
4102

4103

4104
void ServerImpl::recognize_external_change(const std::string& virt_path)
4105
{
4,800✔
4106
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4107
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4108
        do_recognize_external_change(virt_path); // Throws
4,800✔
4109
    });                                          // Throws
4,800✔
4110
}
4,800✔
4111

4112

4113
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4114
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4115
{
×
4116
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4117
                "timeout = %1",
×
4118
                timeout); // Throws
×
4119

4120
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4121
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4122
                                                    timeout); // Throws
×
4123
    });
×
4124
}
×
4125

4126

4127
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4128
{
8,096✔
4129
    m_connection_reaper_timer.emplace(get_service());
8,096✔
4130
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
4,060✔
4131
        if (status != ErrorCodes::OperationAborted) {
128✔
4132
            reap_connections();                        // Throws
128✔
4133
            initiate_connection_reaper_timer(timeout); // Throws
128✔
4134
        }
128✔
4135
    }); // Throws
128✔
4136
}
8,096✔
4137

4138

4139
void ServerImpl::reap_connections()
4140
{
128✔
4141
    logger.debug("Discarding dead connections"); // Throws
128✔
4142
    SteadyTimePoint now = steady_clock_now();
128✔
4143
    {
128✔
4144
        auto end = m_http_connections.end();
128✔
4145
        auto i = m_http_connections.begin();
128✔
4146
        while (i != end) {
130✔
4147
            HTTPConnection& conn = *i->second;
2✔
4148
            ++i;
2✔
4149
            // Suicide
4150
            conn.terminate_if_dead(now); // Throws
2✔
4151
        }
2✔
4152
    }
128✔
4153
    {
128✔
4154
        auto end = m_sync_connections.end();
128✔
4155
        auto i = m_sync_connections.begin();
128✔
4156
        while (i != end) {
250✔
4157
            SyncConnection& conn = *i->second;
122✔
4158
            ++i;
122✔
4159
            // Suicide
116✔
4160
            conn.terminate_if_dead(now); // Throws
122✔
4161
        }
122✔
4162
    }
128✔
4163
}
128✔
4164

4165

4166
void ServerImpl::do_close_connections()
4167
{
20✔
4168
    for (auto& entry : m_sync_connections) {
20✔
4169
        SyncConnection& conn = *entry.second;
20✔
4170
        conn.initiate_soft_close(); // Throws
20✔
4171
    }
20✔
4172
}
20✔
4173

4174

4175
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4176
{
4,800✔
4177
    auto i = m_files.find(virt_path);
4,800✔
4178
    if (i == m_files.end())
4,800✔
4179
        return;
2✔
4180
    ServerFile& file = *i->second;
4,798✔
4181
    file.recognize_external_change();
4,798✔
4182
}
4,798✔
4183

4184

4185
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4186
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4187
{
×
4188
    static_cast<void>(timeout);
×
4189
    if (m_sync_stopped)
×
4190
        return;
×
4191
    do_close_connections(); // Throws
×
4192
    m_sync_stopped = true;
×
4193
    bool completion_reached = false;
×
4194
    completion_handler(completion_reached); // Throws
×
4195
}
×
4196

4197

4198
// ============================ SyncConnection implementation ============================
4199

4200
SyncConnection::~SyncConnection() noexcept
4201
{
1,928✔
4202
    m_sessions_enlisted_to_send.clear();
1,928✔
4203
    m_sessions.clear();
1,928✔
4204
}
1,928✔
4205

4206

4207
void SyncConnection::initiate()
4208
{
1,928✔
4209
    m_last_activity_at = steady_clock_now();
1,928✔
4210
    logger.debug("Sync Connection initiated");
1,928✔
4211
    m_websocket.initiate_server_websocket_after_handshake();
1,928✔
4212
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
1,928✔
4213
                     m_appservices_request_id);
1,928✔
4214
}
1,928✔
4215

4216

4217
template <class... Params>
4218
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4219
{
1,126✔
4220
    terminate_sessions();                              // Throws
1,126✔
4221
    logger.log(log_level, log_message, log_params...); // Throws
1,126✔
4222
    m_websocket.stop();
1,126✔
4223
    m_ssl_stream.reset();
1,126✔
4224
    m_socket.reset();
1,126✔
4225
    // Suicide
590✔
4226
    m_server.remove_sync_connection(m_id);
1,126✔
4227
}
1,126✔
4228

4229

4230
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4231
{
122✔
4232
    milliseconds_type time = steady_duration(m_last_activity_at, now);
122✔
4233
    const Server::Config& config = m_server.get_config();
122✔
4234
    if (m_is_closing) {
122✔
4235
        if (time >= config.soft_close_timeout) {
×
4236
            // Suicide
4237
            terminate(Logger::Level::detail,
×
4238
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4239
        }
×
4240
    }
×
4241
    else {
122✔
4242
        if (time >= config.connection_reaper_timeout) {
122✔
4243
            // Suicide
2✔
4244
            terminate(Logger::Level::detail,
4✔
4245
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4246
        }
4✔
4247
    }
122✔
4248
}
122✔
4249

4250

4251
void SyncConnection::enlist_to_send(Session* sess) noexcept
4252
{
107,722✔
4253
    REALM_ASSERT(m_send_trigger);
107,722✔
4254
    REALM_ASSERT(!m_is_closing);
107,722✔
4255
    REALM_ASSERT(!sess->is_enlisted_to_send());
107,722✔
4256
    m_sessions_enlisted_to_send.push_back(sess);
107,722✔
4257
    m_send_trigger->trigger();
107,722✔
4258
}
107,722✔
4259

4260

4261
void SyncConnection::handle_protocol_error(Status status)
4262
{
×
4263
    logger.error("%1", status);
×
4264
    switch (status.code()) {
×
4265
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4266
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4267
            break;
×
4268
        case ErrorCodes::LimitExceeded:
×
4269
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4270
            break;
×
4271
        default:
×
4272
            protocol_error(ProtocolError::other_error);
×
4273
            break;
×
4274
    }
×
4275
}
×
4276

4277
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4278
                                          std::string signed_user_token, bool need_client_file_ident,
4279
                                          bool is_subserver)
4280
{
5,666✔
4281
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
5,666✔
4282
    bool was_inserted = p.second;
5,666✔
4283
    if (REALM_UNLIKELY(!was_inserted)) {
5,666✔
4284
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4285
                     session_ident);                           // Throws
×
4286
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4287
        return;
×
4288
    }
×
4289
    try {
5,666✔
4290
        p.first->second.reset(new Session(*this, session_ident)); // Throws
5,666✔
4291
    }
5,666✔
4292
    catch (...) {
2,782✔
4293
        m_sessions.erase(p.first);
×
4294
        throw;
×
4295
    }
×
4296

2,782✔
4297
    Session& sess = *p.first->second;
5,666✔
4298
    sess.initiate(); // Throws
5,666✔
4299
    ProtocolError error;
5,666✔
4300
    bool success =
5,666✔
4301
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
5,666✔
4302
                                  error); // Throws
5,666✔
4303
    if (REALM_UNLIKELY(!success))         // Throws
5,666✔
4304
        protocol_error(error, &sess);     // Throws
2,796✔
4305
}
5,666✔
4306

4307

4308
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4309
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4310
                                           version_type scan_client_version, version_type latest_server_version,
4311
                                           salt_type latest_server_version_salt)
4312
{
4,348✔
4313
    auto i = m_sessions.find(session_ident);
4,348✔
4314
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,348✔
4315
        bad_session_ident("IDENT", session_ident); // Throws
×
4316
        return;
×
4317
    }
×
4318
    Session& sess = *i->second;
4,348✔
4319
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,348✔
4320
        message_after_unbind("IDENT", session_ident); // Throws
×
4321
        return;
×
4322
    }
×
4323
    if (REALM_UNLIKELY(sess.error_occurred())) {
4,348✔
4324
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
8✔
4325
        // messages, other than UNBIND, must be ignored.
8✔
4326
        return;
16✔
4327
    }
16✔
4328
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
4,332✔
4329
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4330
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4331
        return;
×
4332
    }
×
4333
    if (REALM_UNLIKELY(sess.ident_message_received())) {
4,332✔
4334
        logger.error("Received second IDENT message for session"); // Throws
×
4335
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4336
        return;
×
4337
    }
×
4338

2,016✔
4339
    ProtocolError error = {};
4,332✔
4340
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
4,332✔
4341
                                              scan_client_version, latest_server_version, latest_server_version_salt,
4,332✔
4342
                                              error); // Throws
4,332✔
4343
    if (REALM_UNLIKELY(!success))                     // Throws
4,332✔
4344
        protocol_error(error, &sess);                 // Throws
2,032✔
4345
}
4,332✔
4346

4347
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4348
                                            version_type progress_server_version, version_type locked_server_version,
4349
                                            const UploadChangesets& upload_changesets)
4350
{
43,860✔
4351
    auto i = m_sessions.find(session_ident);
43,860✔
4352
    if (REALM_UNLIKELY(i == m_sessions.end())) {
43,860✔
4353
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4354
        return;
×
4355
    }
×
4356
    Session& sess = *i->second;
43,860✔
4357
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
43,860✔
4358
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4359
        return;
×
4360
    }
×
4361
    if (REALM_UNLIKELY(sess.error_occurred())) {
43,860✔
4362
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4363
        // messages, other than UNBIND, must be ignored.
4364
        return;
×
4365
    }
×
4366
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
43,860✔
4367
        message_before_ident("UPLOAD", session_ident); // Throws
×
4368
        return;
×
4369
    }
×
4370

22,110✔
4371
    ProtocolError error = {};
43,860✔
4372
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
43,860✔
4373
                                               locked_server_version, upload_changesets, error); // Throws
43,860✔
4374
    if (REALM_UNLIKELY(!success))                                                                // Throws
43,860✔
4375
        protocol_error(error, &sess);                                                            // Throws
22,110✔
4376
}
43,860✔
4377

4378

4379
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4380
{
12,414✔
4381
    auto i = m_sessions.find(session_ident);
12,414✔
4382
    if (REALM_UNLIKELY(i == m_sessions.end())) {
12,414✔
4383
        bad_session_ident("MARK", session_ident);
×
4384
        return;
×
4385
    }
×
4386
    Session& sess = *i->second;
12,414✔
4387
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
12,414✔
4388
        message_after_unbind("MARK", session_ident); // Throws
×
4389
        return;
×
4390
    }
×
4391
    if (REALM_UNLIKELY(sess.error_occurred())) {
12,414✔
4392
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
24✔
4393
        // messages, other than UNBIND, must be ignored.
24✔
4394
        return;
48✔
4395
    }
48✔
4396
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
12,366✔
4397
        message_before_ident("MARK", session_ident); // Throws
×
4398
        return;
×
4399
    }
×
4400

5,974✔
4401
    ProtocolError error;
12,366✔
4402
    bool success = sess.receive_mark_message(request_ident, error); // Throws
12,366✔
4403
    if (REALM_UNLIKELY(!success))                                   // Throws
12,366✔
4404
        protocol_error(error, &sess);                               // Throws
5,974✔
4405
}
12,366✔
4406

4407

4408
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4409
{
2,988✔
4410
    auto i = m_sessions.find(session_ident); // Throws
2,988✔
4411
    if (REALM_UNLIKELY(i == m_sessions.end())) {
2,988✔
4412
        bad_session_ident("UNBIND", session_ident); // Throws
×
4413
        return;
×
4414
    }
×
4415
    Session& sess = *i->second;
2,988✔
4416
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
2,988✔
4417
        message_after_unbind("UNBIND", session_ident); // Throws
×
4418
        return;
×
4419
    }
×
4420

1,402✔
4421
    sess.receive_unbind_message(); // Throws
2,988✔
4422
    // NOTE: The session might have gotten destroyed at this time!
1,402✔
4423
}
2,988✔
4424

4425

4426
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4427
{
160✔
4428
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
160✔
4429
    m_send_pong = true;
160✔
4430
    m_last_ping_timestamp = timestamp;
160✔
4431
    if (!m_is_sending)
160✔
4432
        send_next_message();
158✔
4433
}
160✔
4434

4435

4436
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4437
                                           std::string_view error_body)
4438
{
×
4439
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4440
                 session_ident); // Throws
×
4441
    auto i = m_sessions.find(session_ident);
×
4442
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4443
        bad_session_ident("ERROR", session_ident);
×
4444
        return;
×
4445
    }
×
4446
    Session& sess = *i->second;
×
4447
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4448
        message_after_unbind("ERROR", session_ident); // Throws
×
4449
        return;
×
4450
    }
×
4451

4452
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4453
}
×
4454

4455
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4456
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4457
{
6,228✔
4458
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
6,228✔
4459
        return logger.log(level, message.c_str());
×
4460
    }
×
4461

2,940✔
4462
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
6,228✔
4463
    {
6,228✔
4464
        std::lock_guard lock(m_log_mutex);
6,228✔
4465
        m_log_messages.push(std::move(log_msg));
6,228✔
4466
    }
6,228✔
4467
    m_send_trigger->trigger();
6,228✔
4468
}
6,228✔
4469

4470

4471
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4472
{
×
4473
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4474
                 session_ident);                      // Throws
×
4475
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4476
}
×
4477

4478

4479
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4480
{
×
4481
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4482
                 session_ident);                      // Throws
×
4483
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4484
}
×
4485

4486

4487
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4488
{
×
4489
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4490
                 session_ident);                      // Throws
×
4491
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4492
}
×
4493

4494

4495
void SyncConnection::handle_message_received(const char* data, size_t size)
4496
{
69,438✔
4497
    // parse_message_received() parses the message and calls the
34,374✔
4498
    // proper handler on the SyncConnection object (this).
34,374✔
4499
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
69,438✔
4500
    return;
69,438✔
4501
}
69,438✔
4502

4503

4504
void SyncConnection::handle_ping_received(const char* data, size_t size)
4505
{
×
4506
    // parse_message_received() parses the message and calls the
4507
    // proper handler on the SyncConnection object (this).
4508
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4509
    return;
×
4510
}
×
4511

4512

4513
void SyncConnection::send_next_message()
4514
{
105,026✔
4515
    REALM_ASSERT(!m_is_sending);
105,026✔
4516
    REALM_ASSERT(!m_sending_pong);
105,026✔
4517
    if (m_send_pong) {
105,026✔
4518
        send_pong(m_last_ping_timestamp);
160✔
4519
        if (m_sending_pong)
160✔
4520
            return;
160✔
4521
    }
104,866✔
4522
    for (;;) {
155,580✔
4523
        Session* sess = m_sessions_enlisted_to_send.pop_front();
155,580✔
4524
        if (!sess) {
155,580✔
4525
            // No sessions were enlisted to send
24,438✔
4526
            if (REALM_LIKELY(!m_is_closing))
48,048✔
4527
                break; // Check to see if there are any log messages to go out
48,038✔
4528
            // Send a connection level ERROR
10✔
4529
            REALM_ASSERT(!is_session_level_error(m_error_code));
20✔
4530
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
20✔
4531
            return;
20✔
4532
        }
20✔
4533
        sess->send_message(); // Throws
107,532✔
4534
        // NOTE: The session might have gotten destroyed at this time!
56,938✔
4535

56,938✔
4536
        // At this point, `m_is_sending` is true if, and only if the session
56,938✔
4537
        // chose to send a message. If it chose to not send a message, we must
56,938✔
4538
        // loop back and give the next session in `m_sessions_enlisted_to_send`
56,938✔
4539
        // a chance.
56,938✔
4540
        if (m_is_sending)
107,532✔
4541
            return;
56,830✔
4542
    }
107,532✔
4543
    {
77,812✔
4544
        std::lock_guard lock(m_log_mutex);
48,016✔
4545
        if (!m_log_messages.empty()) {
48,016✔
4546
            send_log_message(m_log_messages.front());
6,176✔
4547
            m_log_messages.pop();
6,176✔
4548
        }
6,176✔
4549
    }
48,016✔
4550
    // Otherwise, nothing to do
24,420✔
4551
}
48,016✔
4552

4553

4554
void SyncConnection::initiate_write_output_buffer()
4555
{
63,006✔
4556
    auto handler = [this](std::error_code ec, size_t) {
62,966✔
4557
        if (!ec) {
62,960✔
4558
            handle_write_output_buffer();
62,832✔
4559
        }
62,832✔
4560
    };
62,960✔
4561

32,712✔
4562
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
63,006✔
4563
                                   std::move(handler)); // Throws
63,006✔
4564
    m_is_sending = true;
63,006✔
4565
}
63,006✔
4566

4567

4568
void SyncConnection::initiate_pong_output_buffer()
4569
{
160✔
4570
    auto handler = [this](std::error_code ec, size_t) {
158✔
4571
        if (!ec) {
158✔
4572
            handle_pong_output_buffer();
158✔
4573
        }
158✔
4574
    };
158✔
4575

56✔
4576
    REALM_ASSERT(!m_is_sending);
160✔
4577
    REALM_ASSERT(!m_sending_pong);
160✔
4578
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
160✔
4579
                                   std::move(handler)); // Throws
160✔
4580

56✔
4581
    m_is_sending = true;
160✔
4582
    m_sending_pong = true;
160✔
4583
}
160✔
4584

4585

4586
void SyncConnection::send_pong(milliseconds_type timestamp)
4587
{
160✔
4588
    REALM_ASSERT(m_send_pong);
160✔
4589
    REALM_ASSERT(!m_sending_pong);
160✔
4590
    m_send_pong = false;
160✔
4591
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
160✔
4592

56✔
4593
    OutputBuffer& out = get_output_buffer();
160✔
4594
    get_server_protocol().make_pong(out, timestamp); // Throws
160✔
4595

56✔
4596
    initiate_pong_output_buffer(); // Throws
160✔
4597
}
160✔
4598

4599
void SyncConnection::send_log_message(const LogMessage& log_msg)
4600
{
6,176✔
4601
    OutputBuffer& out = get_output_buffer();
6,176✔
4602
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
6,176✔
4603
                                           log_msg.co_id); // Throws
6,176✔
4604

2,926✔
4605
    initiate_write_output_buffer(); // Throws
6,176✔
4606
}
6,176✔
4607

4608

4609
void SyncConnection::handle_write_output_buffer()
4610
{
62,832✔
4611
    release_output_buffer();
62,832✔
4612
    m_is_sending = false;
62,832✔
4613
    send_next_message(); // Throws
62,832✔
4614
}
62,832✔
4615

4616

4617
void SyncConnection::handle_pong_output_buffer()
4618
{
158✔
4619
    release_output_buffer();
158✔
4620
    REALM_ASSERT(m_is_sending);
158✔
4621
    REALM_ASSERT(m_sending_pong);
158✔
4622
    m_is_sending = false;
158✔
4623
    m_sending_pong = false;
158✔
4624
    send_next_message(); // Throws
158✔
4625
}
158✔
4626

4627

4628
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4629
{
20✔
4630
    const char* message = get_protocol_error_message(int(error_code));
20✔
4631
    std::size_t message_size = std::strlen(message);
20✔
4632
    bool try_again = determine_try_again(error_code);
20✔
4633

10✔
4634
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
20✔
4635
                  message_size, try_again, session_ident); // Throws
20✔
4636

10✔
4637
    OutputBuffer& out = get_output_buffer();
20✔
4638
    int protocol_version = get_client_protocol_version();
20✔
4639
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
20✔
4640
                                             session_ident); // Throws
20✔
4641

10✔
4642
    auto handler = [this](std::error_code ec, size_t) {
20✔
4643
        handle_write_error(ec); // Throws
20✔
4644
    };
20✔
4645
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
20✔
4646
    m_is_sending = true;
20✔
4647
}
20✔
4648

4649

4650
void SyncConnection::handle_write_error(std::error_code ec)
4651
{
20✔
4652
    m_is_sending = false;
20✔
4653
    REALM_ASSERT(m_is_closing);
20✔
4654
    if (!m_ssl_stream) {
20✔
4655
        m_socket->shutdown(network::Socket::shutdown_send, ec);
20✔
4656
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
20!
4657
            throw std::system_error(ec);
×
4658
    }
20✔
4659
}
20✔
4660

4661

4662
// For connection level errors, `sess` is ignored. For session level errors, a
4663
// session must be specified.
4664
//
4665
// If a session is specified, that session object will have been detached from
4666
// the ServerFile object (and possibly destroyed) upon return from
4667
// protocol_error().
4668
//
4669
// If a session is specified for a protocol level error, that session object
4670
// will have been destroyed upon return from protocol_error(). For session level
4671
// errors, the specified session will have been destroyed upon return from
4672
// protocol_error() if, and only if the negotiated protocol version is less than
4673
// 23.
4674
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4675
{
88✔
4676
    REALM_ASSERT(!m_is_closing);
88✔
4677
    bool session_level = is_session_level_error(error_code);
88✔
4678
    REALM_ASSERT(!session_level || sess);
88✔
4679
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
88✔
4680
    if (logger.would_log(util::Logger::Level::debug)) {
88✔
4681
        const char* message = get_protocol_error_message(int(error_code));
×
4682
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4683
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4684
    }
×
4685
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
88✔
4686
    if (session_level) {
88✔
4687
        sess->initiate_deactivation(error_code); // Throws
88✔
4688
        return;
88✔
4689
    }
88✔
4690
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4691
}
×
4692

4693

4694
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4695
{
20✔
4696
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
20✔
4697

10✔
4698
    // With recent versions of the protocol (when the version is greater than,
10✔
4699
    // or equal to 23), this function will only be called for connection level
10✔
4700
    // errors, never for session specific errors. However, for the purpose of
10✔
4701
    // emulating earlier protocol versions, this function might be called for
10✔
4702
    // session specific errors too.
10✔
4703
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
20✔
4704
    REALM_ASSERT(!is_session_level_error(error_code));
20✔
4705

10✔
4706
    REALM_ASSERT(m_send_trigger);
20✔
4707
    REALM_ASSERT(!m_is_closing);
20✔
4708
    m_is_closing = true;
20✔
4709

10✔
4710
    m_error_code = error_code;
20✔
4711
    m_error_session_ident = session_ident;
20✔
4712

10✔
4713
    // Don't waste time and effort sending any other messages
10✔
4714
    m_send_pong = false;
20✔
4715
    m_sessions_enlisted_to_send.clear();
20✔
4716

10✔
4717
    m_receiving_session = nullptr;
20✔
4718

10✔
4719
    terminate_sessions(); // Throws
20✔
4720

10✔
4721
    m_send_trigger->trigger();
20✔
4722
}
20✔
4723

4724

4725
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4726
{
692✔
4727
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
604✔
4728
    // Suicide
360✔
4729
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
692✔
4730
}
692✔
4731

4732

4733
void SyncConnection::close_due_to_error(std::error_code ec)
4734
{
430✔
4735
    // Suicide
228✔
4736
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
430✔
4737
              ec.message()); // Throws
430✔
4738
}
430✔
4739

4740

4741
void SyncConnection::terminate_sessions()
4742
{
1,146✔
4743
    for (auto& entry : m_sessions) {
1,758✔
4744
        Session& sess = *entry.second;
1,758✔
4745
        sess.terminate(); // Throws
1,758✔
4746
    }
1,758✔
4747
    m_sessions_enlisted_to_send.clear();
1,146✔
4748
    m_sessions.clear();
1,146✔
4749
}
1,146✔
4750

4751

4752
void SyncConnection::initiate_soft_close()
4753
{
20✔
4754
    if (!m_is_closing) {
20✔
4755
        session_ident_type session_ident = 0;                                    // Not session specific
20✔
4756
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
20✔
4757
    }
20✔
4758
}
20✔
4759

4760

4761
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4762
{
2,988✔
4763
    m_sessions.erase(session_ident);
2,988✔
4764
}
2,988✔
4765

4766
} // anonymous namespace
4767

4768

4769
// ============================ sync::Server implementation ============================
4770

4771
class Server::Implementation : public ServerImpl {
4772
public:
4773
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4774
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
4775
    {
7,968✔
4776
    }
7,968✔
4777
    virtual ~Implementation() {}
7,968✔
4778
};
4779

4780

4781
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4782
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
4783
{
7,968✔
4784
}
7,968✔
4785

4786

4787
Server::Server(Server&& serv) noexcept
4788
    : m_impl{std::move(serv.m_impl)}
4789
{
×
4790
}
×
4791

4792

4793
Server::~Server() noexcept {}
7,968✔
4794

4795

4796
void Server::start()
4797
{
7,296✔
4798
    m_impl->start(); // Throws
7,296✔
4799
}
7,296✔
4800

4801

4802
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4803
{
672✔
4804
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
672✔
4805
}
672✔
4806

4807

4808
network::Endpoint Server::listen_endpoint() const
4809
{
7,972✔
4810
    return m_impl->listen_endpoint(); // Throws
7,972✔
4811
}
7,972✔
4812

4813

4814
void Server::run()
4815
{
7,916✔
4816
    m_impl->run(); // Throws
7,916✔
4817
}
7,916✔
4818

4819

4820
void Server::stop() noexcept
4821
{
8,810✔
4822
    m_impl->stop();
8,810✔
4823
}
8,810✔
4824

4825

4826
uint_fast64_t Server::errors_seen() const noexcept
4827
{
672✔
4828
    return m_impl->errors_seen;
672✔
4829
}
672✔
4830

4831

4832
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4833
                                                      milliseconds_type timeout)
4834
{
×
4835
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4836
}
×
4837

4838

4839
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4840
{
4✔
4841
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4842
}
4✔
4843

4844

4845
void Server::close_connections()
4846
{
20✔
4847
    m_impl->close_connections();
20✔
4848
}
20✔
4849

4850

4851
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4852
{
72✔
4853
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4854
}
72✔
4855

4856

4857
void Server::recognize_external_change(const std::string& virt_path)
4858
{
4,800✔
4859
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4860
}
4,800✔
4861

4862

4863
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4864
{
×
4865
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4866
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc