• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_491

09 Aug 2024 04:34PM UTC coverage: 89.577% (-1.5%) from 91.087%
thomas.goyne_491

Pull #7967

Evergreen

tgoyne
Actually check for unuplaoded changes in no_pending_local_changes()

We can have local changesets stored which have already been uploaded and
acknoledged by the server, so checking all of the changesets is incorrect. We
need to instead only check changesets for versions after the current position
of the upload cursor.
Pull Request #7967: RCORE-2232 Actually check for unuploaded changes in no_pending_local_changes()

90956 of 164876 branches covered (55.17%)

37 of 38 new or added lines in 2 files covered. (97.37%)

42 existing lines in 8 files now uncovered.

145956 of 162940 relevant lines covered (89.58%)

8094301.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.02
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/memory_stream.hpp>
29
#include <realm/util/optional.hpp>
30
#include <realm/util/platform_info.hpp>
31
#include <realm/util/random.hpp>
32
#include <realm/util/safe_int_ops.hpp>
33
#include <realm/util/scope_exit.hpp>
34
#include <realm/util/scratch_allocator.hpp>
35
#include <realm/util/thread.hpp>
36
#include <realm/util/thread_exec_guard.hpp>
37
#include <realm/util/value_reset_guard.hpp>
38
#include <realm/version.hpp>
39

40
#include <algorithm>
41
#include <atomic>
42
#include <cctype>
43
#include <chrono>
44
#include <cmath>
45
#include <condition_variable>
46
#include <cstdint>
47
#include <cstdio>
48
#include <cstring>
49
#include <functional>
50
#include <locale>
51
#include <map>
52
#include <memory>
53
#include <queue>
54
#include <sstream>
55
#include <stdexcept>
56
#include <thread>
57
#include <vector>
58

59
// NOTE: The protocol specification is in `/doc/protocol.md`
60

61

62
// FIXME: Verify that session identifier spoofing cannot be used to get access
63
// to sessions belonging to other network conections in any way.
64
// FIXME: Seems that server must close connection with zero sessions after a
65
// certain timeout.
66

67

68
using namespace realm;
69
using namespace realm::sync;
70
using namespace realm::util;
71

72
// clang-format off
73
using ServerHistory         = _impl::ServerHistory;
74
using ServerProtocol        = _impl::ServerProtocol;
75
using ServerFileAccessCache = _impl::ServerFileAccessCache;
76
using ServerImplBase = _impl::ServerImplBase;
77

78
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
79
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
80
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
81
using IntegrationResult = ServerHistory::IntegrationResult;
82
using BootstrapError = ServerHistory::BootstrapError;
83
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
84
using ClientType = ServerHistory::ClientType;
85
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
86
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
87

88
using UploadChangeset = ServerProtocol::UploadChangeset;
89
// clang-format on
90

91

92
using UploadChangesets = std::vector<UploadChangeset>;
93

94
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
95

96

97
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
98
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
99
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
100
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
101

102

103
namespace {
104

105
enum class SchedStatus { done = 0, pending, in_progress };
106

107
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
108
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
109
{
×
110
    return "com.mongodb.realm-sync/";
×
111
}
×
112

113
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
114
{
390✔
115
    if (str.size() > cutoff) {
390✔
116
        return "..." + str.substr(str.size() - cutoff);
×
117
    }
×
118
    else {
390✔
119
        return str;
390✔
120
    }
390✔
121
}
390✔
122

123

124
class HttpListHeaderValueParser {
125
public:
126
    HttpListHeaderValueParser(std::string_view string) noexcept
127
        : m_string{string}
938✔
128
    {
2,076✔
129
    }
2,076✔
130
    bool next(std::string_view& elem) noexcept
131
    {
29,064✔
132
        while (m_pos < m_string.size()) {
29,064✔
133
            size_type i = m_pos;
26,988✔
134
            size_type j = m_string.find(',', i);
26,988✔
135
            if (j != std::string_view::npos) {
26,988✔
136
                m_pos = j + 1;
24,912✔
137
            }
24,912✔
138
            else {
2,076✔
139
                j = m_string.size();
2,076✔
140
                m_pos = j;
2,076✔
141
            }
2,076✔
142

143
            // Exclude leading and trailing white space
144
            while (i < j && is_http_lws(m_string[i]))
51,900✔
145
                ++i;
24,912✔
146
            while (j > i && is_http_lws(m_string[j - 1]))
26,988✔
147
                --j;
×
148

149
            if (i != j) {
26,988✔
150
                elem = m_string.substr(i, j - i);
26,988✔
151
                return true;
26,988✔
152
            }
26,988✔
153
        }
26,988✔
154
        return false;
2,076✔
155
    }
29,064✔
156

157
private:
158
    using size_type = std::string_view::size_type;
159
    const std::string_view m_string;
160
    size_type m_pos = 0;
161
    static bool is_http_lws(char ch) noexcept
162
    {
78,882✔
163
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
78,882✔
164
    }
78,882✔
165
};
166

167

168
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
169
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
170
using SteadyTimePoint = SteadyClock::time_point;
171

172
SteadyTimePoint steady_clock_now() noexcept
173
{
150,148✔
174
    return SteadyClock::now();
150,148✔
175
}
150,148✔
176

177
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
178
{
37,466✔
179
    auto duration = end_time - start_time;
37,466✔
180
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
37,466✔
181
    return milliseconds_type(millis_duration);
37,466✔
182
}
37,466✔
183

184

185
bool determine_try_again(ProtocolError error_code) noexcept
186
{
96✔
187
    return (error_code == ProtocolError::connection_closed);
96✔
188
}
96✔
189

190

191
class ServerFile;
192
class ServerImpl;
193
class HTTPConnection;
194
class SyncConnection;
195
class Session;
196

197

198
using Formatter = util::ResettableExpandableBufferOutputStream;
199
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
200

201
using ProtocolVersionRange = std::pair<int, int>;
202

203
class MiscBuffers {
204
public:
205
    Formatter formatter;
206
    OutputBuffer download_message;
207

208
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
209
    ProtocolVersionRanges protocol_version_ranges;
210

211
    std::vector<char> compress;
212

213
    MiscBuffers()
214
    {
1,208✔
215
        formatter.imbue(std::locale::classic());
1,208✔
216
        download_message.imbue(std::locale::classic());
1,208✔
217
    }
1,208✔
218
};
219

220

221
struct DownloadCache {
222
    std::unique_ptr<char[]> body;
223
    std::size_t uncompressed_body_size;
224
    std::size_t compressed_body_size;
225
    bool body_is_compressed;
226
    version_type end_version;
227
    DownloadCursor download_progress;
228
    std::uint_fast64_t downloadable_bytes;
229
    std::size_t num_changesets;
230
    std::size_t accum_original_size;
231
    std::size_t accum_compacted_size;
232
};
233

234

235
// An unblocked work unit is comprised of one Work object for each of the files
236
// that contribute work to the work unit, generally one reference file and a
237
// number of partial files.
238
class Work {
239
public:
240
    // In general, primary work is all forms of modifying work, including file
241
    // deletion.
242
    bool has_primary_work = false;
243

244
    // Only for reference files
245
    bool might_produce_new_sync_version = false;
246

247
    bool produced_new_realm_version = false;
248
    bool produced_new_sync_version = false;
249
    bool expired_reference_version = false;
250

251
    // True if, and only if changesets_from_downstream contains at least one
252
    // changeset.
253
    bool have_changesets_from_downstream = false;
254

255
    FileIdentAllocSlots file_ident_alloc_slots;
256
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
257
    IntegratableChangesets changesets_from_downstream;
258

259
    VersionInfo version_info;
260

261
    // Result of integration of changesets from downstream clients
262
    IntegrationResult integration_result;
263

264
    void reset() noexcept
265
    {
37,354✔
266
        has_primary_work = false;
37,354✔
267

268
        might_produce_new_sync_version = false;
37,354✔
269

270
        produced_new_realm_version = false;
37,354✔
271
        produced_new_sync_version = false;
37,354✔
272
        expired_reference_version = false;
37,354✔
273
        have_changesets_from_downstream = false;
37,354✔
274

275
        file_ident_alloc_slots.clear();
37,354✔
276
        changeset_buffers.clear();
37,354✔
277
        changesets_from_downstream.clear();
37,354✔
278

279
        version_info = {};
37,354✔
280
        integration_result = {};
37,354✔
281
    }
37,354✔
282
};
283

284

285
class WorkerState {
286
public:
287
    FileIdentAllocSlots file_ident_alloc_slots;
288
    util::ScratchMemory scratch_memory;
289
    bool use_file_cache = true;
290
    std::unique_ptr<ServerHistory> reference_hist;
291
    DBRef reference_sg;
292
};
293

294

295
// ============================ SessionQueue ============================
296

297
class SessionQueue {
298
public:
299
    void push_back(Session*) noexcept;
300
    Session* pop_front() noexcept;
301
    void clear() noexcept;
302

303
private:
304
    Session* m_back = nullptr;
305
};
306

307

308
// ============================ FileIdentReceiver ============================
309

310
class FileIdentReceiver {
311
public:
312
    virtual void receive_file_ident(SaltedFileIdent) = 0;
313

314
protected:
315
    ~FileIdentReceiver() {}
5,304✔
316
};
317

318

319
// ============================ WorkerBox =============================
320

321
class WorkerBox {
322
public:
323
    using JobType = util::UniqueFunction<void(WorkerState&)>;
324
    void add_work(WorkerState& state, JobType job)
325
    {
×
326
        std::unique_lock<std::mutex> lock(m_mutex);
×
327
        if (m_jobs.size() >= m_queue_limit) {
×
328
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
329
            // than to queue it.
×
330
            run_a_job(lock, state, job);
×
331
        }
×
332
        else {
×
333
            // Create worker threads on demand (if all existing threads are active):
×
334
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
335
                m_threads.emplace_back([this]() {
×
336
                    WorkerState state;
×
337
                    state.use_file_cache = false;
×
338
                    JobType the_job;
×
339
                    std::unique_lock<std::mutex> lock(m_mutex);
×
340
                    for (;;) {
×
341
                        while (m_jobs.empty() && !m_finish_up)
×
342
                            m_changes.wait(lock);
×
343
                        if (m_finish_up)
×
344
                            break; // terminate thread
×
345
                        the_job = std::move(m_jobs.back());
×
346
                        m_jobs.pop_back();
×
347
                        run_a_job(lock, state, the_job);
×
348
                        m_changes.notify_all();
×
349
                    }
×
350
                });
×
351
            }
×
352

×
353
            // Submit the job for execution:
×
354
            m_jobs.emplace_back(std::move(job));
×
355
            m_changes.notify_all();
×
356
        }
×
357
    }
×
358

359
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
360
    // propagation of exceptions.
361
    void wait_completion(WorkerState& state)
362
    {
×
363
        std::unique_lock<std::mutex> lock(m_mutex);
×
364
        while (!m_jobs.empty() || m_active > 0) {
×
365
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
366
                JobType the_job = std::move(m_jobs.back());
×
367
                m_jobs.pop_back();
×
368
                run_a_job(lock, state, the_job);
×
369
            }
×
370
            else {
×
371
                m_changes.wait(lock);
×
372
            }
×
373
        }
×
374
        if (m_epr) {
×
375
            std::rethrow_exception(m_epr);
×
376
        }
×
377
    }
×
378

379
    WorkerBox(unsigned int num_threads)
380
    {
×
381
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
382
        m_max_num_threads = num_threads;
×
383
    }
×
384

385
    ~WorkerBox()
386
    {
×
387
        {
×
388
            std::unique_lock<std::mutex> lock(m_mutex);
×
389
            m_finish_up = true;
×
390
            m_changes.notify_all();
×
391
        }
×
392
        for (auto& e : m_threads)
×
393
            e.join();
×
394
    }
×
395

396
private:
397
    std::mutex m_mutex;
398
    std::condition_variable m_changes;
399
    std::vector<std::thread> m_threads;
400
    std::vector<JobType> m_jobs;
401
    unsigned int m_active = 0;
402
    bool m_finish_up = false;
403
    unsigned int m_queue_limit = 0;
404
    unsigned int m_max_num_threads = 0;
405
    std::exception_ptr m_epr;
406

407
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
408
    {
×
409
        ++m_active;
×
410
        lock.unlock();
×
411
        try {
×
412
            job(state);
×
413
            lock.lock();
×
414
        }
×
415
        catch (...) {
×
416
            lock.lock();
×
417
            if (!m_epr)
×
418
                m_epr = std::current_exception();
×
419
        }
×
420
        --m_active;
×
421
    }
×
422
};
423

424

425
// ============================ ServerFile ============================
426

427
class ServerFile : public util::RefCountBase {
428
public:
429
    util::PrefixLogger logger;
430

431
    // Logger to be used by the worker thread
432
    util::PrefixLogger wlogger;
433

434
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
435
               bool disable_sync_to_disk);
436
    ~ServerFile() noexcept;
437

438
    void initialize();
439
    void activate();
440

441
    ServerImpl& get_server() noexcept
442
    {
42,836✔
443
        return m_server;
42,836✔
444
    }
42,836✔
445

446
    const std::string& get_real_path() const noexcept
447
    {
×
448
        return m_file.realm_path;
×
449
    }
×
450

451
    const std::string& get_virt_path() const noexcept
452
    {
×
453
        return m_file.virt_path;
×
454
    }
×
455

456
    ServerFileAccessCache::File& access()
457
    {
52,420✔
458
        return m_file.access(); // Throws
52,420✔
459
    }
52,420✔
460

461
    ServerFileAccessCache::File& worker_access()
462
    {
37,330✔
463
        return m_worker_file.access(); // Throws
37,330✔
464
    }
37,330✔
465

466
    version_type get_realm_version() const noexcept
467
    {
×
468
        return m_version_info.realm_version;
×
469
    }
×
470

471
    version_type get_sync_version() const noexcept
472
    {
×
473
        return m_version_info.sync_version.version;
×
474
    }
×
475

476
    SaltedVersion get_salted_sync_version() const noexcept
477
    {
107,302✔
478
        return m_version_info.sync_version;
107,302✔
479
    }
107,302✔
480

481
    DownloadCache& get_download_cache() noexcept;
482

483
    void register_client_access(file_ident_type client_file_ident);
484

485
    using file_ident_request_type = std::int_fast64_t;
486

487
    // Initiate a request for a new client file identifier.
488
    //
489
    // Unless the request is cancelled, the identifier will be delivered to the
490
    // receiver by way of an invocation of
491
    // FileIdentReceiver::receive_file_ident().
492
    //
493
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
494
    // until after request_file_ident() has returned (no callback reentrance).
495
    //
496
    // New client file identifiers will be delivered to receivers in the order
497
    // that they were requested.
498
    //
499
    // The returned value is a nonzero integer that can be used to cancel the
500
    // request before the file identifier is delivered using
501
    // cancel_file_ident_request().
502
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
503

504
    // Cancel the specified file identifier request.
505
    //
506
    // It is an error to call this function after the identifier has been
507
    // delivered.
508
    void cancel_file_ident_request(file_ident_request_type) noexcept;
509

510
    void add_unidentified_session(Session*);
511
    void identify_session(Session*, file_ident_type client_file_ident);
512

513
    void remove_unidentified_session(Session*) noexcept;
514
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
515

516
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
517

518
    bool can_add_changesets_from_downstream() const noexcept;
519
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
520
                                        version_type locked_server_version, const UploadChangeset*,
521
                                        std::size_t num_changesets);
522

523
    // bootstrap_client_session calls the function of same name in server_history
524
    // but corrects the upload_progress with information from pending
525
    // integratable changesets. A situation can occur where a client terminates
526
    // a session and starts a new session and re-uploads changesets that are known
527
    // by the ServerFile object but not by the ServerHistory.
528
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
529
                                            SaltedVersion server_version, ClientType client_type,
530
                                            UploadCursor& upload_progress, version_type& locked_server_version,
531
                                            Logger&);
532

533
    // NOTE: This function is executed by the worker thread
534
    void worker_process_work_unit(WorkerState&);
535

536
    void recognize_external_change();
537

538
private:
539
    ServerImpl& m_server;
540
    ServerFileAccessCache::Slot m_file;
541

542
    // In general, `m_version_info` refers to the last snapshot of the Realm
543
    // file that is supposed to be visible to remote peers engaging in regular
544
    // Realm file synchronization.
545
    VersionInfo m_version_info;
546

547
    file_ident_request_type m_last_file_ident_request = 0;
548

549
    // The set of sessions whose client file identifier is not yet known, i.e.,
550
    // those for which an IDENT message has not yet been received,
551
    std::set<Session*> m_unidentified_sessions;
552

553
    // A map of the sessions whose client file identifier is known, i.e, those
554
    // for which an IDENT message has been received.
555
    std::map<file_ident_type, Session*> m_identified_sessions;
556

557
    // Used when a file used as partial view wants to allocate a client file
558
    // identifier from the reference Realm.
559
    file_ident_request_type m_file_ident_request = 0;
560

561
    struct FileIdentRequestInfo {
562
        FileIdentReceiver* receiver;
563
        file_ident_type proxy_file;
564
        ClientType client_type;
565
    };
566

567
    // When nonempty, it counts towards outstanding blocked work (see
568
    // `m_has_blocked_work`).
569
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
570

571
    // Changesets received from the downstream clients, and waiting to be
572
    // integrated, as well as information about the clients progress in terms of
573
    // integrating changesets received from the server. When nonempty, it counts
574
    // towards outstanding blocked work (see `m_has_blocked_work`).
575
    //
576
    // At any given time, the set of changesets from a particular client-side
577
    // file may be comprised of changesets received via distinct sessions.
578
    //
579
    // See also `m_num_changesets_from_downstream`.
580
    IntegratableChangesets m_changesets_from_downstream;
581

582
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
583
    //
584
    // Its purpose is also to initialize
585
    // `Work::have_changesets_from_downstream`.
586
    std::size_t m_num_changesets_from_downstream = 0;
587

588
    // The total size, in bytes, of the changesets that were received from
589
    // clients, are targeting this file, and are currently part of the blocked
590
    // work unit.
591
    //
592
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
593
    // purpose is to allow the server to keep track of the accumulated size of
594
    // changesets being processed, or waiting to be processed (metric
595
    // `upload.pending.bytes`) (see
596
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
597
    //
598
    // Its purpose is also to enable the "very poor man's" backpressure solution
599
    // (see can_add_changesets_from_downstream()).
600
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
601

602
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
603
    // currently unblocked work unit.
604
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
605

606
    // When nonempty, it counts towards outstanding blocked work (see
607
    // `m_has_blocked_work`).
608
    std::vector<std::string> m_permission_changes;
609

610
    // True iff this file, or any of its associated partial files (when
611
    // applicable), has a nonzero amount of outstanding work that is currently
612
    // held back from being passed to the worker thread because a previously
613
    // accumulated chunk of work related to this file is currently in progress.
614
    bool m_has_blocked_work = false;
615

616
    // A file, that is not a partial file, is considered *exposed to the worker
617
    // thread* from the point in time where it is submitted to the worker
618
    // (Worker::enqueue()) and up until the point in time where
619
    // group_postprocess_stage_1() starts to execute. A partial file is
620
    // considered *exposed to the worker thread* precisely when the associated
621
    // reference file is exposed to the worker thread, but only if it was in
622
    // `m_reference_file->m_work.partial_files` at the point in time where the
623
    // reference file was passed to the worker.
624
    //
625
    // While this file is exposed to the worker thread, all members of `m_work`
626
    // other than `changesets_from_downstream` may be accessed and modified by
627
    // the worker thread only.
628
    //
629
    // While this file is exposed to the worker thread,
630
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
631
    // must not be modified by any thread. This special status of
632
    // `m_work.changesets_from_downstream` is required to allow
633
    // ServerFile::bootstrap_client_session() to read from it at any time.
634
    Work m_work;
635

636
    // For reference files, set to true when work is unblocked, and reset back
637
    // to false when the work finalization process completes
638
    // (group_postprocess_stage_3()). Always zero for partial files.
639
    bool m_has_work_in_progress = 0;
640

641
    // This one must only be accessed by the worker thread.
642
    //
643
    // More specifically, `m_worker_file.access()` must only be called by the
644
    // worker thread, and if it was ever called, it must be closed by the worker
645
    // thread before the ServerFile object is destroyed, if destruction happens
646
    // before the destruction of the server object itself.
647
    ServerFileAccessCache::Slot m_worker_file;
648

649
    std::vector<std::int_fast64_t> m_deleting_connections;
650

651
    DownloadCache m_download_cache;
652

653
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
654
    void on_work_added();
655
    void group_unblock_work();
656
    void unblock_work();
657

658
    /// Resume history scanning in all sessions bound to this file. To be called
659
    /// after a successfull integration of a changeset.
660
    void resume_download() noexcept;
661

662
    // NOTE: These functions are executed by the worker thread
663
    void worker_allocate_file_identifiers();
664
    bool worker_integrate_changes_from_downstream(WorkerState&);
665
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
666
                                           DBRef& sg_ptr);
667
    ServerHistory& get_reference_file_history(WorkerState& state);
668
    void group_postprocess_stage_1();
669
    void group_postprocess_stage_2();
670
    void group_postprocess_stage_3();
671
    void group_finalize_work_stage_1();
672
    void group_finalize_work_stage_2();
673
    void finalize_work_stage_1();
674
    void finalize_work_stage_2();
675
};
676

677

678
inline DownloadCache& ServerFile::get_download_cache() noexcept
679
{
42,164✔
680
    return m_download_cache;
42,164✔
681
}
42,164✔
682

683
inline void ServerFile::group_finalize_work_stage_1()
684
{
37,018✔
685
    finalize_work_stage_1(); // Throws
37,018✔
686
}
37,018✔
687

688
inline void ServerFile::group_finalize_work_stage_2()
689
{
37,020✔
690
    finalize_work_stage_2(); // Throws
37,020✔
691
}
37,020✔
692

693

694
// ============================ Worker ============================
695

696
// All write transaction on server-side Realm files performed on behalf of the
697
// server, must be performed by the worker thread, not the network event loop
698
// thread. This is to ensure that the network event loop thread never gets
699
// blocked waiting for the worker thread to end a long running write
700
// transaction.
701
//
702
// FIXME: Currently, the event loop thread does perform a number of write
703
// transactions, but only on subtier nodes of a star topology server cluster.
704
class Worker : public ServerHistory::Context {
705
public:
706
    std::shared_ptr<util::Logger> logger_ptr;
707
    util::Logger& logger;
708

709
    explicit Worker(ServerImpl&);
710

711
    ServerFileAccessCache& get_file_access_cache() noexcept;
712

713
    void enqueue(ServerFile*);
714

715
    // Overriding members of ServerHistory::Context
716
    std::mt19937_64& server_history_get_random() noexcept override final;
717

718
private:
719
    ServerImpl& m_server;
720
    std::mt19937_64 m_random;
721
    ServerFileAccessCache m_file_access_cache;
722

723
    util::Mutex m_mutex;
724
    util::CondVar m_cond; // Protected by `m_mutex`
725

726
    bool m_stop = false; // Protected by `m_mutex`
727

728
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
729

730
    WorkerState m_state;
731

732
    void run();
733
    void stop() noexcept;
734

735
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
736
};
737

738

739
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
740
{
1,144✔
741
    return m_file_access_cache;
1,144✔
742
}
1,144✔
743

744

745
// ============================ ServerImpl ============================
746

747
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
748
public:
749
    std::uint_fast64_t errors_seen = 0;
750

751
    std::atomic<milliseconds_type> m_par_time;
752
    std::atomic<milliseconds_type> m_seq_time;
753

754
    util::Mutex last_client_accesses_mutex;
755

756
    const std::shared_ptr<util::Logger> logger_ptr;
757
    util::Logger& logger;
758

759
    network::Service& get_service() noexcept
760
    {
51,312✔
761
        return m_service;
51,312✔
762
    }
51,312✔
763

764
    const network::Service& get_service() const noexcept
765
    {
×
766
        return m_service;
×
767
    }
×
768

769
    std::mt19937_64& get_random() noexcept
770
    {
65,524✔
771
        return m_random;
65,524✔
772
    }
65,524✔
773

774
    const Server::Config& get_config() const noexcept
775
    {
114,164✔
776
        return m_config;
114,164✔
777
    }
114,164✔
778

779
    std::size_t get_max_upload_backlog() const noexcept
780
    {
44,768✔
781
        return m_max_upload_backlog;
44,768✔
782
    }
44,768✔
783

784
    const std::string& get_root_dir() const noexcept
785
    {
5,304✔
786
        return m_root_dir;
5,304✔
787
    }
5,304✔
788

789
    network::ssl::Context& get_ssl_context() noexcept
790
    {
48✔
791
        return *m_ssl_context;
48✔
792
    }
48✔
793

794
    const AccessControl& get_access_control() const noexcept
795
    {
×
796
        return m_access_control;
×
797
    }
×
798

799
    ProtocolVersionRange get_protocol_version_range() const noexcept
800
    {
2,076✔
801
        return m_protocol_version_range;
2,076✔
802
    }
2,076✔
803

804
    ServerProtocol& get_server_protocol() noexcept
805
    {
133,460✔
806
        return m_server_protocol;
133,460✔
807
    }
133,460✔
808

809
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
810
    {
4,204✔
811
        return m_compress_memory_arena;
4,204✔
812
    }
4,204✔
813

814
    MiscBuffers& get_misc_buffers() noexcept
815
    {
48,448✔
816
        return m_misc_buffers;
48,448✔
817
    }
48,448✔
818

819
    int_fast64_t get_current_server_session_ident() const noexcept
820
    {
×
821
        return m_current_server_session_ident;
×
822
    }
×
823

824
    util::ScratchMemory& get_scratch_memory() noexcept
825
    {
×
826
        return m_scratch_memory;
×
827
    }
×
828

829
    Worker& get_worker() noexcept
830
    {
39,642✔
831
        return m_worker;
39,642✔
832
    }
39,642✔
833

834
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
835
    {
×
836
        parallel_section = m_par_time;
×
837
        sequential_section = m_seq_time;
×
838
    }
×
839

840
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
841
    ~ServerImpl() noexcept;
842

843
    void start();
844

845
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
846
    {
684✔
847
        m_config.listen_address = listen_address;
684✔
848
        m_config.listen_port = listen_port;
684✔
849
        m_config.reuse_address = reuse_address;
684✔
850

851
        start(); // Throws
684✔
852
    }
684✔
853

854
    network::Endpoint listen_endpoint() const
855
    {
1,262✔
856
        return m_acceptor.local_endpoint();
1,262✔
857
    }
1,262✔
858

859
    void run();
860
    void stop() noexcept;
861

862
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
863

864
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
865
    void remove_sync_connection(int_fast64_t connection_id);
866

867
    size_t get_number_of_http_connections()
868
    {
×
869
        return m_http_connections.size();
×
870
    }
×
871

872
    size_t get_number_of_sync_connections()
873
    {
×
874
        return m_sync_connections.size();
×
875
    }
×
876

877
    bool is_sync_stopped()
878
    {
39,428✔
879
        return m_sync_stopped;
39,428✔
880
    }
39,428✔
881

882
    const std::set<std::string>& get_realm_names() const noexcept
883
    {
×
884
        return m_realm_names;
×
885
    }
×
886

887
    // virt_path must be valid when get_or_create_file() is called.
888
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
889
    {
5,274✔
890
        util::bind_ptr<ServerFile> file = get_file(virt_path);
5,274✔
891
        if (REALM_LIKELY(file))
5,274✔
892
            return file;
4,130✔
893

894
        _impl::VirtualPathComponents virt_path_components =
1,144✔
895
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,144✔
896
        REALM_ASSERT(virt_path_components.is_valid);
1,144✔
897

898
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,144✔
899
        m_realm_names.insert(virt_path);         // Throws
1,144✔
900
        {
1,144✔
901
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,144✔
902
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,144✔
903
                                      disable_sync_to_disk)); // Throws
1,144✔
904
        }
1,144✔
905

906
        file->initialize();
1,144✔
907
        m_files[virt_path] = file; // Throws
1,144✔
908
        file->activate();          // Throws
1,144✔
909
        return file;
1,144✔
910
    }
5,274✔
911

912
    std::unique_ptr<ServerHistory> make_history_for_path()
913
    {
×
914
        return std::make_unique<ServerHistory>(*this);
×
915
    }
×
916

917
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
918
    {
5,276✔
919
        auto i = m_files.find(virt_path);
5,276✔
920
        if (REALM_LIKELY(i != m_files.end()))
5,276✔
921
            return i->second;
4,132✔
922
        return {};
1,144✔
923
    }
5,276✔
924

925
    // Returns the number of seconds since the Epoch of
926
    // std::chrono::system_clock.
927
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
928
    {
×
929
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
930
            return m_config.token_expiration_clock->now();
×
931
        return std::chrono::system_clock::now();
×
932
    }
×
933

934
    void set_connection_reaper_timeout(milliseconds_type);
935

936
    void close_connections();
937
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
938

939
    void recognize_external_change(const std::string& virt_path);
940

941
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
942
                                                  milliseconds_type timeout);
943

944
    // Server global outputbuffers that can be reused.
945
    // The server is single threaded, so there are no
946
    // synchronization issues.
947
    // output_buffers_count is equal to the
948
    // maximum number of buffers needed at any point.
949
    static constexpr int output_buffers_count = 1;
950
    OutputBuffer output_buffers[output_buffers_count];
951

952
    bool is_load_balancing_allowed() const
953
    {
×
954
        return m_allow_load_balancing;
×
955
    }
×
956

957
    // inc_byte_size_for_pending_downstream_changesets() must be called by
958
    // ServerFile objects when changesets from downstream clients have been
959
    // received.
960
    //
961
    // dec_byte_size_for_pending_downstream_changesets() must be called by
962
    // ServerFile objects when changesets from downstream clients have been
963
    // processed or discarded.
964
    //
965
    // ServerImpl uses this information to keep a running tally (metric
966
    // `upload.pending.bytes`) of the total byte size of pending changesets from
967
    // downstream clients.
968
    //
969
    // These functions must be called on the network thread.
970
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
971
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
972

973
    // Overriding member functions in _impl::ServerHistory::Context
974
    std::mt19937_64& server_history_get_random() noexcept override final;
975

976
private:
977
    Server::Config m_config;
978
    network::Service m_service;
979
    std::mt19937_64 m_random;
980
    const std::size_t m_max_upload_backlog;
981
    const std::string m_root_dir;
982
    const AccessControl m_access_control;
983
    const ProtocolVersionRange m_protocol_version_range;
984

985
    // The reserved files will be closed in situations where the server
986
    // runs out of file descriptors.
987
    std::unique_ptr<File> m_reserved_files[5];
988

989
    // The set of all Realm files known to this server, represented by their
990
    // virtual path.
991
    //
992
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
993
    // reported by an invocation of _impl::get_realm_names()), then the
994
    // corresponding virtual path is in `m_realm_names`, assuming no external
995
    // file-system level intervention.
996
    std::set<std::string> m_realm_names;
997

998
    std::unique_ptr<network::ssl::Context> m_ssl_context;
999
    ServerFileAccessCache m_file_access_cache;
1000
    Worker m_worker;
1001
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1002
    network::Acceptor m_acceptor;
1003
    std::int_fast64_t m_next_conn_id = 0;
1004
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1005
    network::Endpoint m_next_http_conn_endpoint;
1006
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1007
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1008
    ServerProtocol m_server_protocol;
1009
    compression::CompressMemoryArena m_compress_memory_arena;
1010
    MiscBuffers m_misc_buffers;
1011
    int_fast64_t m_current_server_session_ident;
1012
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1013
    bool m_allow_load_balancing = false;
1014

1015
    util::Mutex m_mutex;
1016

1017
    bool m_stopped = false; // Protected by `m_mutex`
1018

1019
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1020
    // When m_sync_stopped is true, the server does not perform any sync.
1021
    bool m_sync_stopped = false;
1022

1023
    std::atomic<bool> m_running{false}; // Debugging facility
1024

1025
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1026

1027
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1028

1029
    util::ScratchMemory m_scratch_memory;
1030

1031
    void listen();
1032
    void initiate_accept();
1033
    void handle_accept(std::error_code);
1034

1035
    void reap_connections();
1036
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1037
    void do_close_connections();
1038

1039
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1040
    {
1,208✔
1041
        if (config.max_upload_backlog == 0)
1,208✔
1042
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
1,208✔
1043
        return config.max_upload_backlog;
×
1044
    }
1,208✔
1045

1046
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1047
    {
1,208✔
1048
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
1,208✔
1049
        const int actual_max = get_current_protocol_version();
1,208✔
1050
        static_assert(actual_min <= actual_max, "");
1,208✔
1051
        int min = actual_min;
1,208✔
1052
        int max = actual_max;
1,208✔
1053
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
1,208!
1054
            if (config.max_protocol_version < min)
×
1055
                throw Server::NoSupportedProtocolVersions();
×
1056
            max = config.max_protocol_version;
×
1057
        }
×
1058
        return {min, max};
1,208✔
1059
    }
1,208✔
1060

1061
    void do_recognize_external_change(const std::string& virt_path);
1062

1063
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1064
                                                     milliseconds_type timeout);
1065
};
1066

1067
// ============================ SyncConnection ============================
1068

1069
class SyncConnection : public websocket::Config {
1070
public:
1071
    const std::shared_ptr<util::Logger> logger_ptr;
1072
    util::Logger& logger;
1073

1074
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1075
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1076
    // Clients with sync protocol version less than 10 do not support log messages
1077
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1078

1079
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1080
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1081
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1082
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1083
        : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
938✔
1084
                                                          serv.logger_ptr)} // Throws
938✔
1085
        , logger{*logger_ptr}
938✔
1086
        , m_server{serv}
938✔
1087
        , m_id{id}
938✔
1088
        , m_socket{std::move(socket)}
938✔
1089
        , m_ssl_stream{std::move(ssl_stream)}
938✔
1090
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
938✔
1091
        , m_websocket{*this}
938✔
1092
        , m_client_protocol_version{client_protocol_version}
938✔
1093
        , m_client_user_agent{std::move(client_user_agent)}
938✔
1094
        , m_remote_endpoint{std::move(remote_endpoint)}
938✔
1095
        , m_appservices_request_id{std::move(appservices_request_id)}
938✔
1096
    {
2,076✔
1097
        // Make the output buffer stream throw std::bad_alloc if it fails to
1098
        // expand the buffer
1099
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
2,076✔
1100

1101
        network::Service& service = m_server.get_service();
2,076✔
1102
        auto handler = [this](Status status) {
98,820✔
1103
            if (!status.is_ok())
98,820✔
1104
                return;
×
1105
            if (!m_is_sending)
98,820✔
1106
                send_next_message(); // Throws
43,712✔
1107
        };
98,820✔
1108
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
2,076✔
1109
    }
2,076✔
1110

1111
    ~SyncConnection() noexcept;
1112

1113
    ServerImpl& get_server() noexcept
1114
    {
116,914✔
1115
        return m_server;
116,914✔
1116
    }
116,914✔
1117

1118
    ServerProtocol& get_server_protocol() noexcept
1119
    {
133,460✔
1120
        return m_server.get_server_protocol();
133,460✔
1121
    }
133,460✔
1122

1123
    int get_client_protocol_version()
1124
    {
104,348✔
1125
        return m_client_protocol_version;
104,348✔
1126
    }
104,348✔
1127

1128
    const std::string& get_client_user_agent() const noexcept
1129
    {
5,272✔
1130
        return m_client_user_agent;
5,272✔
1131
    }
5,272✔
1132

1133
    const std::string& get_remote_endpoint() const noexcept
1134
    {
5,276✔
1135
        return m_remote_endpoint;
5,276✔
1136
    }
5,276✔
1137

1138
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1139
    {
2,076✔
1140
        return logger_ptr;
2,076✔
1141
    }
2,076✔
1142

1143
    std::mt19937_64& websocket_get_random() noexcept final override
1144
    {
64,382✔
1145
        return m_server.get_random();
64,382✔
1146
    }
64,382✔
1147

1148
    bool websocket_binary_message_received(const char* data, size_t size) final override
1149
    {
69,542✔
1150
        using sf = _impl::SimulatedFailure;
69,542✔
1151
        if (sf::check_trigger(sf::sync_server__read_head)) {
69,542✔
1152
            // Suicide
1153
            read_error(sf::sync_server__read_head);
470✔
1154
            return false;
470✔
1155
        }
470✔
1156
        // After a connection level error has occurred, all incoming messages
1157
        // will be ignored. By continuing to read until end of input, the server
1158
        // is able to know when the client closes the connection, which in
1159
        // general means that is has received the ERROR message.
1160
        if (REALM_LIKELY(!m_is_closing)) {
69,074✔
1161
            m_last_activity_at = steady_clock_now();
69,072✔
1162
            handle_message_received(data, size);
69,072✔
1163
        }
69,072✔
1164
        return true;
69,072✔
1165
    }
69,542✔
1166

1167
    bool websocket_ping_message_received(const char* data, size_t size) final override
1168
    {
×
1169
        if (REALM_LIKELY(!m_is_closing)) {
×
1170
            m_last_activity_at = steady_clock_now();
×
1171
            handle_ping_received(data, size);
×
1172
        }
×
1173
        return true;
×
1174
    }
×
1175

1176
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1177
    {
64,378✔
1178
        if (m_ssl_stream) {
64,378✔
1179
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1180
        }
70✔
1181
        else {
64,308✔
1182
            m_socket->async_write(data, size, std::move(handler)); // Throws
64,308✔
1183
        }
64,308✔
1184
    }
64,378✔
1185

1186
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1187
    {
210,196✔
1188
        if (m_ssl_stream) {
210,196✔
1189
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
140✔
1190
        }
140✔
1191
        else {
210,056✔
1192
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
210,056✔
1193
        }
210,056✔
1194
    }
210,196✔
1195

1196
    void async_read_until(char* buffer, size_t size, char delim,
1197
                          websocket::ReadCompletionHandler handler) final override
1198
    {
×
1199
        if (m_ssl_stream) {
×
1200
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1201
                                           std::move(handler)); // Throws
×
1202
        }
×
1203
        else {
×
1204
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1205
                                       std::move(handler)); // Throws
×
1206
        }
×
1207
    }
×
1208

1209
    void websocket_read_error_handler(std::error_code ec) final override
1210
    {
762✔
1211
        read_error(ec);
762✔
1212
    }
762✔
1213

1214
    void websocket_write_error_handler(std::error_code ec) final override
1215
    {
×
1216
        write_error(ec);
×
1217
    }
×
1218

1219
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view) final override
1220
    {
×
1221
        // WebSocket class has already logged a message for this error
1222
        close_due_to_error(ec); // Throws
×
1223
    }
×
1224

1225
    void websocket_protocol_error_handler(std::error_code ec) final override
1226
    {
×
1227
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1228
        close_due_to_error(ec);                                              // Throws
×
1229
    }
×
1230

1231
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1232
    {
×
1233
        // This is not called since we handle HTTP request in handle_request_for_sync()
1234
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
1235
    }
×
1236

1237
    int_fast64_t get_id() const noexcept
1238
    {
×
1239
        return m_id;
×
1240
    }
×
1241

1242
    network::Socket& get_socket() noexcept
1243
    {
×
1244
        return *m_socket;
×
1245
    }
×
1246

1247
    void initiate();
1248

1249
    // Commits suicide
1250
    template <class... Params>
1251
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1252

1253
    // Commits suicide
1254
    void terminate_if_dead(SteadyTimePoint now);
1255

1256
    void enlist_to_send(Session*) noexcept;
1257

1258
    // Sessions should get the output_buffer and insert a message, after which
1259
    // they call initiate_write_output_buffer().
1260
    OutputBuffer& get_output_buffer()
1261
    {
64,382✔
1262
        m_output_buffer.reset();
64,382✔
1263
        return m_output_buffer;
64,382✔
1264
    }
64,382✔
1265

1266
    // More advanced memory strategies can be implemented if needed.
1267
    void release_output_buffer() {}
64,220✔
1268

1269
    // When this function is called, the connection will initiate a write with
1270
    // its output_buffer. Sessions use this method.
1271
    void initiate_write_output_buffer();
1272

1273
    void initiate_pong_output_buffer();
1274

1275
    void handle_protocol_error(Status status);
1276

1277
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1278
                              bool need_client_file_ident, bool is_subserver);
1279

1280
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1281
                               salt_type client_file_ident_salt, version_type scan_server_version,
1282
                               version_type scan_client_version, version_type latest_server_version,
1283
                               salt_type latest_server_version_salt);
1284

1285
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1286
                                version_type progress_server_version, version_type locked_server_version,
1287
                                const UploadChangesets&);
1288

1289
    void receive_mark_message(session_ident_type, request_ident_type);
1290

1291
    void receive_unbind_message(session_ident_type);
1292

1293
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1294

1295
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1296

1297
    void protocol_error(ProtocolError, Session* = nullptr);
1298

1299
    void initiate_soft_close();
1300

1301
    void discard_session(session_ident_type) noexcept;
1302

1303
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1304
                          std::optional<std::string> co_id = std::nullopt);
1305

1306
private:
1307
    ServerImpl& m_server;
1308
    const int_fast64_t m_id;
1309
    std::unique_ptr<network::Socket> m_socket;
1310
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1311
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1312

1313
    websocket::Socket m_websocket;
1314
    std::unique_ptr<char[]> m_input_body_buffer;
1315
    OutputBuffer m_output_buffer;
1316
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1317

1318
    // The protocol version in use by the connected client.
1319
    const int m_client_protocol_version;
1320

1321
    // The user agent description passed by the client.
1322
    const std::string m_client_user_agent;
1323

1324
    const std::string m_remote_endpoint;
1325

1326
    const std::string m_appservices_request_id;
1327

1328
    // A queue of sessions that have enlisted for an opportunity to send a
1329
    // message. Sessions will be served in the order that they enlist. A session
1330
    // can only occur once in this queue (linked list). If the queue is not
1331
    // empty, and no message is currently being written to the socket, the first
1332
    // session is taken out of the queue, and then granted an opportunity to
1333
    // send a message.
1334
    //
1335
    // Sessions will never be destroyed while in this queue. This is ensured
1336
    // because the connection owns the sessions that are associated with it, and
1337
    // the connection only removes a session from m_sessions at points in time
1338
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1339
    // (Connection::send_next_message() and Connection::~Connection()).
1340
    SessionQueue m_sessions_enlisted_to_send;
1341

1342
    Session* m_receiving_session = nullptr;
1343

1344
    bool m_is_sending = false;
1345
    bool m_is_closing = false;
1346

1347
    bool m_send_pong = false;
1348
    bool m_sending_pong = false;
1349

1350
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1351

1352
    milliseconds_type m_last_ping_timestamp = 0;
1353

1354
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1355
    // set to true (initiation of soft close). Otherwise, if no messages have
1356
    // been received from the client, this is the time at which the connection
1357
    // object was initiated (completion of WebSocket handshake). Otherwise this
1358
    // is the time at which the last message was received from the client.
1359
    SteadyTimePoint m_last_activity_at;
1360

1361
    // These are initialized by do_initiate_soft_close().
1362
    //
1363
    // With recent versions of the protocol (when the version is greater than,
1364
    // or equal to 23), `m_error_session_ident` is always zero.
1365
    ProtocolError m_error_code = {};
1366
    session_ident_type m_error_session_ident = 0;
1367

1368
    struct LogMessage {
1369
        session_ident_type sess_ident;
1370
        util::Logger::Level level;
1371
        std::string message;
1372
        std::optional<std::string> co_id;
1373
    };
1374

1375
    std::mutex m_log_mutex;
1376
    std::queue<LogMessage> m_log_messages;
1377

1378
    static std::string make_logger_prefix(int_fast64_t id)
1379
    {
2,076✔
1380
        std::ostringstream out;
2,076✔
1381
        out.imbue(std::locale::classic());
2,076✔
1382
        out << "Sync Connection[" << id << "]: "; // Throws
2,076✔
1383
        return out.str();                         // Throws
2,076✔
1384
    }
2,076✔
1385

1386
    // The return value of handle_message_received() designates whether
1387
    // message processing should continue. If the connection object is
1388
    // destroyed during execution of handle_message_received(), the return
1389
    // value must be false.
1390
    void handle_message_received(const char* data, size_t size);
1391

1392
    void handle_ping_received(const char* data, size_t size);
1393

1394
    void send_next_message();
1395
    void send_pong(milliseconds_type timestamp);
1396
    void send_log_message(const LogMessage& log_msg);
1397

1398
    void handle_write_output_buffer();
1399
    void handle_pong_output_buffer();
1400

1401
    void initiate_write_error(ProtocolError, session_ident_type);
1402
    void handle_write_error(std::error_code ec);
1403

1404
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1405
    void read_error(std::error_code);
1406
    void write_error(std::error_code);
1407

1408
    void close_due_to_close_by_client(std::error_code);
1409
    void close_due_to_error(std::error_code);
1410

1411
    void terminate_sessions();
1412

1413
    void bad_session_ident(const char* message_type, session_ident_type);
1414
    void message_after_unbind(const char* message_type, session_ident_type);
1415
    void message_before_ident(const char* message_type, session_ident_type);
1416
};
1417

1418

1419
inline void SyncConnection::read_error(std::error_code ec)
1420
{
1,232✔
1421
    REALM_ASSERT(ec != util::error::operation_aborted);
1,232✔
1422
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,232✔
1423
        // Suicide
1424
        close_due_to_close_by_client(ec); // Throws
762✔
1425
        return;
762✔
1426
    }
762✔
1427
    if (ec == util::MiscExtErrors::delim_not_found) {
470✔
1428
        logger.error("Input message head delimited not found"); // Throws
×
1429
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1430
        return;
×
1431
    }
×
1432

1433
    logger.error("Reading failed: %1", ec.message()); // Throws
470✔
1434

1435
    // Suicide
1436
    close_due_to_error(ec); // Throws
470✔
1437
}
470✔
1438

1439
inline void SyncConnection::write_error(std::error_code ec)
1440
{
×
1441
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1442
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1443
        // Suicide
1444
        close_due_to_close_by_client(ec); // Throws
×
1445
        return;
×
1446
    }
×
1447
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1448

1449
    // Suicide
1450
    close_due_to_error(ec); // Throws
×
1451
}
×
1452

1453

1454
// ============================ HTTPConnection ============================
1455

1456
std::string g_user_agent = "User-Agent";
1457

1458
class HTTPConnection {
1459
public:
1460
    const std::shared_ptr<Logger> logger_ptr;
1461
    util::Logger& logger;
1462

1463
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1464
        : logger_ptr{std::make_shared<PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1,512✔
1465
                                                    serv.logger_ptr)} // Throws
1,512✔
1466
        , logger{*logger_ptr}
1,512✔
1467
        , m_server{serv}
1,512✔
1468
        , m_id{id}
1,512✔
1469
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1,512✔
1470
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1,512✔
1471
        , m_http_server{*this, logger_ptr}
1,512✔
1472
    {
3,326✔
1473
        // Make the output buffer stream throw std::bad_alloc if it fails to
1474
        // expand the buffer
1475
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
3,326✔
1476

1477
        if (is_ssl) {
3,326✔
1478
            using namespace network::ssl;
48✔
1479
            Context& ssl_context = serv.get_ssl_context();
48✔
1480
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1481
                                                    Stream::server); // Throws
48✔
1482
        }
48✔
1483
    }
3,326✔
1484

1485
    ServerImpl& get_server() noexcept
1486
    {
×
1487
        return m_server;
×
1488
    }
×
1489

1490
    int_fast64_t get_id() const noexcept
1491
    {
2,118✔
1492
        return m_id;
2,118✔
1493
    }
2,118✔
1494

1495
    network::Socket& get_socket() noexcept
1496
    {
5,086✔
1497
        return *m_socket;
5,086✔
1498
    }
5,086✔
1499

1500
    template <class H>
1501
    void async_write(const char* data, size_t size, H handler)
1502
    {
2,096✔
1503
        if (m_ssl_stream) {
2,096✔
1504
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1505
        }
14✔
1506
        else {
2,082✔
1507
            m_socket->async_write(data, size, std::move(handler)); // Throws
2,082✔
1508
        }
2,082✔
1509
    }
2,096✔
1510

1511
    template <class H>
1512
    void async_read(char* buffer, size_t size, H handler)
1513
    {
8✔
1514
        if (m_ssl_stream) {
8!
1515
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1516
                                     std::move(handler)); // Throws
×
1517
        }
×
1518
        else {
8✔
1519
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1520
                                 std::move(handler)); // Throws
8✔
1521
        }
8✔
1522
    }
8✔
1523

1524
    template <class H>
1525
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1526
    {
18,754✔
1527
        if (m_ssl_stream) {
18,754!
1528
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1529
                                           std::move(handler)); // Throws
126✔
1530
        }
126✔
1531
        else {
18,628✔
1532
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
18,628✔
1533
                                       std::move(handler)); // Throws
18,628✔
1534
        }
18,628✔
1535
    }
18,754✔
1536

1537
    void initiate(std::string remote_endpoint)
1538
    {
2,116✔
1539
        m_last_activity_at = steady_clock_now();
2,116✔
1540
        m_remote_endpoint = std::move(remote_endpoint);
2,116✔
1541

1542
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
2,116✔
1543

1544
        if (m_ssl_stream) {
2,116✔
1545
            initiate_ssl_handshake(); // Throws
24✔
1546
        }
24✔
1547
        else {
2,092✔
1548
            initiate_http(); // Throws
2,092✔
1549
        }
2,092✔
1550
    }
2,116✔
1551

1552
    void respond_200_ok()
1553
    {
×
1554
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1555
    }
×
1556

1557
    void respond_404_not_found()
1558
    {
×
1559
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1560
    }
×
1561

1562
    void respond_503_service_unavailable()
1563
    {
×
1564
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1565
    }
×
1566

1567
    // Commits suicide
1568
    template <class... Params>
1569
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1570
    {
42✔
1571
        logger.log(log_level, log_message, log_params...); // Throws
42✔
1572
        m_ssl_stream.reset();
42✔
1573
        m_socket.reset();
42✔
1574
        m_server.remove_http_connection(m_id); // Suicide
42✔
1575
    }
42✔
1576

1577
    // Commits suicide
1578
    void terminate_if_dead(SteadyTimePoint now)
1579
    {
2✔
1580
        milliseconds_type time = steady_duration(m_last_activity_at, now);
2✔
1581
        const Server::Config& config = m_server.get_config();
2✔
1582
        if (m_is_sending) {
2✔
1583
            if (time >= config.http_response_timeout) {
×
1584
                // Suicide
1585
                terminate(Logger::Level::detail,
×
1586
                          "HTTP connection closed (request timeout)"); // Throws
×
1587
            }
×
1588
        }
×
1589
        else {
2✔
1590
            if (time >= config.http_request_timeout) {
2✔
1591
                // Suicide
1592
                terminate(Logger::Level::detail,
×
1593
                          "HTTP connection closed (response timeout)"); // Throws
×
1594
            }
×
1595
        }
2✔
1596
    }
2✔
1597

1598
    std::string get_appservices_request_id() const
1599
    {
2,096✔
1600
        return m_appservices_request_id.to_string();
2,096✔
1601
    }
2,096✔
1602

1603
private:
1604
    ServerImpl& m_server;
1605
    const int_fast64_t m_id;
1606
    const ObjectId m_appservices_request_id = ObjectId::gen();
1607
    std::unique_ptr<network::Socket> m_socket;
1608
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1609
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1610
    HTTPServer<HTTPConnection> m_http_server;
1611
    OutputBuffer m_output_buffer;
1612
    bool m_is_sending = false;
1613
    SteadyTimePoint m_last_activity_at;
1614
    std::string m_remote_endpoint;
1615
    int m_negotiated_protocol_version = 0;
1616

1617
    void initiate_ssl_handshake()
1618
    {
24✔
1619
        auto handler = [this](std::error_code ec) {
24✔
1620
            if (ec != util::error::operation_aborted)
24✔
1621
                handle_ssl_handshake(ec); // Throws
24✔
1622
        };
24✔
1623
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1624
    }
24✔
1625

1626
    void handle_ssl_handshake(std::error_code ec)
1627
    {
24✔
1628
        if (ec) {
24✔
1629
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
10✔
1630
            close_due_to_error(ec);                                         // Throws
10✔
1631
            return;
10✔
1632
        }
10✔
1633
        initiate_http(); // Throws
14✔
1634
    }
14✔
1635

1636
    void initiate_http()
1637
    {
2,108✔
1638
        logger.debug("Connection initiates HTTP receipt");
2,108✔
1639

1640
        auto handler = [this](HTTPRequest request, std::error_code ec) {
2,108✔
1641
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
2,108✔
1642
                return;
×
1643
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
2,108✔
1644
                logger.error("Malformed HTTP request");
×
1645
                close_due_to_error(ec); // Throws
×
1646
                return;
×
1647
            }
×
1648
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
2,108✔
1649
                logger.error("Bad HTTP request");
8✔
1650
                const char* body = "The HTTP request was corrupted";
8✔
1651
                handle_400_bad_request(body); // Throws
8✔
1652
                return;
8✔
1653
            }
8✔
1654
            if (REALM_UNLIKELY(ec)) {
2,100✔
1655
                read_error(ec); // Throws
12✔
1656
                return;
12✔
1657
            }
12✔
1658
            handle_http_request(std::move(request)); // Throws
2,088✔
1659
        };
2,088✔
1660
        m_http_server.async_receive_request(std::move(handler)); // Throws
2,108✔
1661
    }
2,108✔
1662

1663
    void handle_http_request(const HTTPRequest& request)
1664
    {
2,088✔
1665
        StringData path = request.path;
2,088✔
1666

1667
        logger.debug("HTTP request received, request = %1", request);
2,088✔
1668

1669
        m_is_sending = true;
2,088✔
1670
        m_last_activity_at = steady_clock_now();
2,088✔
1671

1672
        // FIXME: When thinking of this function as a switching device, it seem
1673
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
1674
        // supposed to be mandatory, then that check ought to be delegated to
1675
        // handle_request_for_sync(), as that will yield a sharper separation of
1676
        // concerns.
1677
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
2,088✔
1678
            handle_request_for_sync(request); // Throws
2,076✔
1679
        }
2,076✔
1680
        else {
12✔
1681
            handle_404_not_found(request); // Throws
12✔
1682
        }
12✔
1683
    }
2,088✔
1684

1685
    void handle_request_for_sync(const HTTPRequest& request)
1686
    {
2,076✔
1687
        if (m_server.is_sync_stopped()) {
2,076✔
1688
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1689
                         "stopped"); // Throws
×
1690
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1691
                                                    "connections"); // Throws
×
1692
            return;
×
1693
        }
×
1694

1695
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
2,076✔
1696

1697
        // Figure out whether there are any protocol versions supported by both
1698
        // the client and the server, and if so, choose the newest one of them.
1699
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
2,076✔
1700
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
2,076✔
1701
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
2,076✔
1702
        {
2,076✔
1703
            protocol_version_ranges.clear();
2,076✔
1704
            util::MemoryInputStream in;
2,076✔
1705
            in.imbue(std::locale::classic());
2,076✔
1706
            in.unsetf(std::ios_base::skipws);
2,076✔
1707
            std::string_view value;
2,076✔
1708
            if (sec_websocket_protocol)
2,076✔
1709
                value = *sec_websocket_protocol;
2,076✔
1710
            HttpListHeaderValueParser parser{value};
2,076✔
1711
            std::string_view elem;
2,076✔
1712
            while (parser.next(elem)) {
29,064✔
1713
                // FIXME: Use std::string_view::begins_with() in C++20.
1714
                const StringData protocol{elem};
26,988✔
1715
                std::string_view prefix;
26,988✔
1716
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
26,988✔
1717
                    prefix = get_pbs_websocket_protocol_prefix();
26,988✔
UNCOV
1718
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
×
1719
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1720
                if (!prefix.empty()) {
26,988✔
1721
                    auto parse_version = [&](std::string_view str) {
26,988✔
1722
                        in.set_buffer(str.data(), str.data() + str.size());
26,986✔
1723
                        int version = 0;
26,986✔
1724
                        in >> version;
26,986✔
1725
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
26,986✔
1726
                            return version;
26,988✔
1727
                        return -1;
2,147,483,647✔
1728
                    };
26,986✔
1729
                    int min, max;
26,988✔
1730
                    std::string_view range = elem.substr(prefix.size());
26,988✔
1731
                    auto i = range.find('-');
26,988✔
1732
                    if (i != std::string_view::npos) {
26,988✔
1733
                        min = parse_version(range.substr(0, i));
×
1734
                        max = parse_version(range.substr(i + 1));
×
1735
                    }
×
1736
                    else {
26,988✔
1737
                        min = parse_version(range);
26,988✔
1738
                        max = min;
26,988✔
1739
                    }
26,988✔
1740
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
26,988✔
1741
                        protocol_version_ranges.emplace_back(min, max); // Throws
26,988✔
1742
                        continue;
26,988✔
1743
                    }
26,988✔
UNCOV
1744
                    logger.error("Protocol version negotiation failed: Client sent malformed "
×
UNCOV
1745
                                 "specification of supported protocol versions: '%1'",
×
UNCOV
1746
                                 elem); // Throws
×
UNCOV
1747
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
×
UNCOV
1748
                                           "specification of supported protocol "
×
UNCOV
1749
                                           "versions\n"); // Throws
×
UNCOV
1750
                    return;
×
1751
                }
26,988✔
UNCOV
1752
                logger.warn("Unrecognized protocol token in HTTP response header "
×
UNCOV
1753
                            "Sec-WebSocket-Protocol: '%1'",
×
UNCOV
1754
                            elem); // Throws
×
UNCOV
1755
            }
×
1756
            if (protocol_version_ranges.empty()) {
2,076✔
1757
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1758
                             "specification of supported protocol versions"); // Throws
×
1759
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1760
                                       "of supported protocol versions\n"); // Throws
×
1761
                return;
×
1762
            }
×
1763
        }
2,076✔
1764
        {
2,076✔
1765
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
2,076✔
1766
            int server_min = server_range.first;
2,076✔
1767
            int server_max = server_range.second;
2,076✔
1768
            int best_match = 0;
2,076✔
1769
            int overall_client_min = std::numeric_limits<int>::max();
2,076✔
1770
            int overall_client_max = std::numeric_limits<int>::min();
2,076✔
1771
            for (const auto& range : protocol_version_ranges) {
26,988✔
1772
                int client_min = range.first;
26,988✔
1773
                int client_max = range.second;
26,988✔
1774
                if (client_max >= server_min && client_min <= server_max) {
26,988✔
1775
                    // Overlap
1776
                    int version = std::min(client_max, server_max);
26,988✔
1777
                    if (version > best_match) {
26,988✔
1778
                        best_match = version;
2,076✔
1779
                    }
2,076✔
1780
                }
26,988✔
1781
                if (client_min < overall_client_min)
26,988✔
1782
                    overall_client_min = client_min;
26,988✔
1783
                if (client_max > overall_client_max)
26,988✔
1784
                    overall_client_max = client_max;
2,076✔
1785
            }
26,988✔
1786
            Formatter& formatter = misc_buffers.formatter;
2,076✔
1787
            if (REALM_UNLIKELY(best_match == 0)) {
2,076✔
1788
                const char* elaboration = "No version supported by both client and server";
×
1789
                auto format_ranges = [&](const auto& list) {
×
1790
                    bool nonfirst = false;
×
1791
                    for (auto range : list) {
×
1792
                        if (nonfirst)
×
1793
                            formatter << ", "; // Throws
×
1794
                        int min = range.first, max = range.second;
×
1795
                        REALM_ASSERT(min <= max);
×
1796
                        formatter << min;
×
1797
                        if (max != min)
×
1798
                            formatter << "-" << max;
×
1799
                        nonfirst = true;
×
1800
                    }
×
1801
                };
×
1802
                using Range = ProtocolVersionRange;
×
1803
                formatter.reset();
×
1804
                format_ranges(protocol_version_ranges); // Throws
×
1805
                logger.error("Protocol version negotiation failed: %1 "
×
1806
                             "(client supports: %2)",
×
1807
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1808
                formatter.reset();
×
1809
                formatter << "Protocol version negotiation failed: "
×
1810
                             ""
×
1811
                          << elaboration << ".\n\n";                                   // Throws
×
1812
                formatter << "Server supports: ";                                      // Throws
×
1813
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1814
                formatter << "\n";                                                     // Throws
×
1815
                formatter << "Client supports: ";                                      // Throws
×
1816
                format_ranges(protocol_version_ranges);                                // Throws
×
1817
                formatter << "\n";                                                     // Throws
×
1818
                handle_400_bad_request({formatter.data(), formatter.size()});          // Throws
×
1819
                return;
×
1820
            }
×
1821
            m_negotiated_protocol_version = best_match;
2,076✔
1822
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
2,076✔
1823
                         m_negotiated_protocol_version); // Throws
2,076✔
1824
            formatter.reset();
2,076✔
1825
        }
2,076✔
1826

1827
        std::string sec_websocket_protocol_2;
×
1828
        {
2,076✔
1829
            std::string_view prefix =
2,076✔
1830
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
2,076✔
1831
                    ? get_old_pbs_websocket_protocol_prefix()
2,076✔
1832
                    : get_pbs_websocket_protocol_prefix();
2,076✔
1833
            std::ostringstream out;
2,076✔
1834
            out.imbue(std::locale::classic());
2,076✔
1835
            out << prefix << m_negotiated_protocol_version; // Throws
2,076✔
1836
            sec_websocket_protocol_2 = std::move(out).str();
2,076✔
1837
        }
2,076✔
1838

1839
        std::error_code ec;
2,076✔
1840
        util::Optional<HTTPResponse> response =
2,076✔
1841
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
2,076✔
1842

1843
        if (ec) {
2,076✔
1844
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1845
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1846
            }
×
1847
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1848
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1849
            }
×
1850
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1851
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1852
            }
×
1853
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1854
                logger.error("The header Sec-WebSocket-Key is missing");
×
1855
            }
×
1856

1857
            logger.error("The HTTP request with the error is:\n%1", request);
×
1858
            logger.error("Check the proxy configuration and make sure that the "
×
1859
                         "HTTP request is a valid Websocket request.");
×
1860
            close_due_to_error(ec);
×
1861
            return;
×
1862
        }
×
1863
        REALM_ASSERT(response);
2,076✔
1864
        add_common_http_response_headers(*response);
2,076✔
1865

1866
        std::string user_agent;
2,076✔
1867
        {
2,076✔
1868
            auto i = request.headers.find(g_user_agent);
2,076✔
1869
            if (i != request.headers.end())
2,076✔
1870
                user_agent = i->second; // Throws (copy)
2,076✔
1871
        }
2,076✔
1872

1873
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
2,076✔
1874
                        this](std::error_code ec) {
2,076✔
1875
            // If the operation is aborted, the socket object may have been destroyed.
1876
            if (ec != util::error::operation_aborted) {
2,076✔
1877
                if (ec) {
2,076✔
1878
                    write_error(ec);
×
1879
                    return;
×
1880
                }
×
1881

1882
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
2,076✔
1883
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
2,076✔
1884
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
2,076✔
1885
                    get_appservices_request_id()); // Throws
2,076✔
1886
                SyncConnection& sync_conn_ref = *sync_conn;
2,076✔
1887
                m_server.add_sync_connection(m_id, std::move(sync_conn));
2,076✔
1888
                m_server.remove_http_connection(m_id);
2,076✔
1889
                sync_conn_ref.initiate();
2,076✔
1890
            }
2,076✔
1891
        };
2,076✔
1892
        m_http_server.async_send_response(*response, std::move(handler));
2,076✔
1893
    }
2,076✔
1894

1895
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1896
    {
20✔
1897
        std::string body_2 = std::string(body); // Throws
20✔
1898

1899
        HTTPResponse response;
20✔
1900
        response.status = http_status;
20✔
1901
        add_common_http_response_headers(response);
20✔
1902
        response.headers["Connection"] = "close";
20✔
1903

1904
        if (!body_2.empty()) {
20✔
1905
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1906
            response.body = std::move(body_2);
20✔
1907
        }
20✔
1908

1909
        auto handler = [this](std::error_code ec) {
20✔
1910
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1911
                return;
×
1912
            if (REALM_UNLIKELY(ec)) {
20✔
1913
                write_error(ec);
×
1914
                return;
×
1915
            }
×
1916
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1917
        };
20✔
1918
        m_http_server.async_send_response(response, std::move(handler));
20✔
1919
    }
20✔
1920

1921
    void handle_400_bad_request(std::string_view body)
1922
    {
8✔
1923
        logger.detail("400 Bad Request");
8✔
1924
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1925
    }
8✔
1926

1927
    void handle_404_not_found(const HTTPRequest&)
1928
    {
12✔
1929
        logger.detail("404 Not Found"); // Throws
12✔
1930
        handle_text_response(HTTPStatus::NotFound,
12✔
1931
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1932
    }
12✔
1933

1934
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1935
    {
×
1936
        logger.debug("503 Service Unavailable");                       // Throws
×
1937
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1938
    }
×
1939

1940
    void add_common_http_response_headers(HTTPResponse& response)
1941
    {
2,096✔
1942
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
2,096✔
1943
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
2,096✔
1944
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
1945
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1946
        }
20✔
1947
    }
2,096✔
1948

1949
    void read_error(std::error_code ec)
1950
    {
12✔
1951
        REALM_ASSERT(ec != util::error::operation_aborted);
12✔
1952
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
12!
1953
            // Suicide
1954
            close_due_to_close_by_client(ec); // Throws
12✔
1955
            return;
12✔
1956
        }
12✔
1957
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1958
            logger.error("Input message head delimited not found"); // Throws
×
1959
            close_due_to_error(ec);                                 // Throws
×
1960
            return;
×
1961
        }
×
1962

1963
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1964

1965
        // Suicide
1966
        close_due_to_error(ec); // Throws
×
1967
    }
×
1968

1969
    void write_error(std::error_code ec)
1970
    {
×
1971
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1972
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1973
            // Suicide
1974
            close_due_to_close_by_client(ec); // Throws
×
1975
            return;
×
1976
        }
×
1977
        logger.error("Writing failed: %1", ec.message()); // Throws
×
1978

1979
        // Suicide
1980
        close_due_to_error(ec); // Throws
×
1981
    }
×
1982

1983
    void close_due_to_close_by_client(std::error_code ec)
1984
    {
12✔
1985
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
12✔
1986
        // Suicide
1987
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
12✔
1988
    }
12✔
1989

1990
    void close_due_to_error(std::error_code ec)
1991
    {
10✔
1992
        // Suicide
1993
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
10✔
1994
                  ec.message()); // Throws
10✔
1995
    }
10✔
1996

1997
    static std::string make_logger_prefix(int_fast64_t id)
1998
    {
3,326✔
1999
        std::ostringstream out;
3,326✔
2000
        out.imbue(std::locale::classic());
3,326✔
2001
        out << "HTTP Connection[" << id << "]: "; // Throws
3,326✔
2002
        return out.str();                         // Throws
3,326✔
2003
    }
3,326✔
2004
};
2005

2006

2007
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2008
public:
2009
    std::size_t num_changesets = 0;
2010
    std::size_t accum_original_size = 0;
2011
    std::size_t accum_compacted_size = 0;
2012

2013
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2014
        : m_protocol{protocol}
18,802✔
2015
        , m_buffer{buffer}
18,802✔
2016
        , m_logger{logger}
18,802✔
2017
    {
42,170✔
2018
    }
42,170✔
2019

2020
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2021
    {
42,716✔
2022
        version_type client_version = entry.remote_version;
42,716✔
2023
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
42,716✔
2024
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
42,716✔
2025
        ++num_changesets;
42,716✔
2026
        accum_original_size += original_size;
42,716✔
2027
        accum_compacted_size += entry.changeset.size();
42,716✔
2028
    }
42,716✔
2029

2030
private:
2031
    ServerProtocol& m_protocol;
2032
    OutputBuffer& m_buffer;
2033
    util::Logger& m_logger;
2034
};
2035

2036

2037
// ============================ Session ============================
2038

2039
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2040
//   Protocol             ent file    IDENT    message   message   Error     message
2041
//   state                identifier  message  received  received  occurred  sent
2042
// ---------------------------------------------------------------------------------
2043
//   AllocatingIdent      yes         yes      no        no        no        no
2044
//   SendIdent            no          yes      no        no        no        no
2045
//   WaitForIdent         no          no       no        no        no        no
2046
//   WaitForUnbind        maybe       no       yes       no        no        no
2047
//   SendError            maybe       maybe    maybe     no        yes       no
2048
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2049
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2050
//
2051
//
2052
//   Condition                      Expression
2053
// ----------------------------------------------------------
2054
//   Need client file identifier    need_client_file_ident()
2055
//   Send IDENT message             must_send_ident_message()
2056
//   IDENT message received         ident_message_received()
2057
//   UNBIND message received        unbind_message_received()
2058
//   Error occurred                 error_occurred()
2059
//   ERROR message sent             m_error_message_sent
2060
//
2061
//
2062
//   Protocol
2063
//   state                Will send              Can receive
2064
// -----------------------------------------------------------------------
2065
//   AllocatingIdent      none                   UNBIND
2066
//   SendIdent            IDENT                  UNBIND
2067
//   WaitForIdent         none                   IDENT, UNBIND
2068
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2069
//                        MARK, ALLOC            ALLOC, UNBIND
2070
//   SendError            ERROR                  any
2071
//   WaitForUnbindErr     none                   any
2072
//   SendUnbound          UNBOUND                none
2073
//
2074
class Session final : private FileIdentReceiver {
2075
public:
2076
    util::PrefixLogger logger;
2077

2078
    Session(SyncConnection& conn, session_ident_type session_ident)
2079
        : logger{util::LogCategory::server, make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2,526✔
2080
        , m_connection{conn}
2,526✔
2081
        , m_session_ident{session_ident}
2,526✔
2082
    {
5,302✔
2083
    }
5,302✔
2084

2085
    ~Session() noexcept
2086
    {
5,304✔
2087
        REALM_ASSERT(!is_enlisted_to_send());
5,304✔
2088
        detach_from_server_file();
5,304✔
2089
    }
5,304✔
2090

2091
    SyncConnection& get_connection() noexcept
2092
    {
42,192✔
2093
        return m_connection;
42,192✔
2094
    }
42,192✔
2095

2096
    const Optional<std::array<char, 64>>& get_encryption_key()
2097
    {
×
2098
        return m_connection.get_server().get_config().encryption_key;
×
2099
    }
×
2100

2101
    session_ident_type get_session_ident() const noexcept
2102
    {
160✔
2103
        return m_session_ident;
160✔
2104
    }
160✔
2105

2106
    ServerProtocol& get_server_protocol() noexcept
2107
    {
57,936✔
2108
        return m_connection.get_server_protocol();
57,936✔
2109
    }
57,936✔
2110

2111
    bool need_client_file_ident() const noexcept
2112
    {
7,026✔
2113
        return (m_file_ident_request != 0);
7,026✔
2114
    }
7,026✔
2115

2116
    bool must_send_ident_message() const noexcept
2117
    {
4,340✔
2118
        return m_send_ident_message;
4,340✔
2119
    }
4,340✔
2120

2121
    bool ident_message_received() const noexcept
2122
    {
346,104✔
2123
        return m_client_file_ident != 0;
346,104✔
2124
    }
346,104✔
2125

2126
    bool unbind_message_received() const noexcept
2127
    {
348,204✔
2128
        return m_unbind_message_received;
348,204✔
2129
    }
348,204✔
2130

2131
    bool error_occurred() const noexcept
2132
    {
341,132✔
2133
        return int(m_error_code) != 0;
341,132✔
2134
    }
341,132✔
2135

2136
    bool relayed_alloc_request_in_progress() const noexcept
2137
    {
×
2138
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2139
    }
×
2140

2141
    // Returns the file identifier (always a nonzero value) of the client side
2142
    // file if ident_message_received() returns true. Otherwise it returns zero.
2143
    file_ident_type get_client_file_ident() const noexcept
2144
    {
×
2145
        return m_client_file_ident;
×
2146
    }
×
2147

2148
    void initiate()
2149
    {
5,300✔
2150
        logger.detail("Session initiated", m_session_ident); // Throws
5,300✔
2151
    }
5,300✔
2152

2153
    void terminate()
2154
    {
4,324✔
2155
        logger.detail("Session terminated", m_session_ident); // Throws
4,324✔
2156
    }
4,324✔
2157

2158
    // Initiate the deactivation process, if it has not been initiated already
2159
    // by the client.
2160
    //
2161
    // IMPORTANT: This function must not be called with protocol versions
2162
    // earlier than 23.
2163
    //
2164
    // The deactivation process will eventually lead to termination of the
2165
    // session.
2166
    //
2167
    // The session will detach itself from the server file when the deactivation
2168
    // process is initiated, regardless of whether it is initiated by the
2169
    // client, or by calling this function.
2170
    void initiate_deactivation(ProtocolError error_code)
2171
    {
80✔
2172
        REALM_ASSERT(is_session_level_error(error_code));
80✔
2173
        REALM_ASSERT(!error_occurred()); // Must only be called once
80✔
2174

2175
        // If the UNBIND message has been received, then the client has
2176
        // initiated the deactivation process already.
2177
        if (REALM_LIKELY(!unbind_message_received())) {
80✔
2178
            detach_from_server_file();
80✔
2179
            m_error_code = error_code;
80✔
2180
            // Protocol state is now SendError
2181
            ensure_enlisted_to_send();
80✔
2182
            return;
80✔
2183
        }
80✔
2184
        // Protocol state was SendUnbound, and remains unchanged
2185
    }
80✔
2186

2187
    bool is_enlisted_to_send() const noexcept
2188
    {
277,626✔
2189
        return m_next != nullptr;
277,626✔
2190
    }
277,626✔
2191

2192
    void ensure_enlisted_to_send() noexcept
2193
    {
53,776✔
2194
        if (!is_enlisted_to_send())
53,776✔
2195
            enlist_to_send();
52,792✔
2196
    }
53,776✔
2197

2198
    void enlist_to_send() noexcept
2199
    {
111,258✔
2200
        m_connection.enlist_to_send(this);
111,258✔
2201
    }
111,258✔
2202

2203
    // Overriding memeber function in FileIdentReceiver
2204
    void receive_file_ident(SaltedFileIdent file_ident) override final
2205
    {
1,344✔
2206
        // Protocol state must be AllocatingIdent or WaitForUnbind
2207
        if (!ident_message_received()) {
1,344✔
2208
            REALM_ASSERT(need_client_file_ident());
1,344✔
2209
            REALM_ASSERT(m_send_ident_message);
1,344✔
2210
        }
1,344✔
2211
        else {
×
2212
            REALM_ASSERT(!m_send_ident_message);
×
2213
        }
×
2214
        REALM_ASSERT(!unbind_message_received());
1,344✔
2215
        REALM_ASSERT(!error_occurred());
1,344✔
2216
        REALM_ASSERT(!m_error_message_sent);
1,344✔
2217

2218
        m_file_ident_request = 0;
1,344✔
2219
        m_allocated_file_ident = file_ident;
1,344✔
2220

2221
        // If the protocol state was AllocatingIdent, it is now SendIdent,
2222
        // otherwise it continues to be WaitForUnbind.
2223

2224
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,344✔
2225
                     file_ident.salt); // Throws
1,344✔
2226

2227
        ensure_enlisted_to_send();
1,344✔
2228
    }
1,344✔
2229

2230
    // Called by the associated connection object when this session is granted
2231
    // an opportunity to initiate the sending of a message.
2232
    //
2233
    // This function may lead to the destruction of the session object
2234
    // (suicide).
2235
    void send_message()
2236
    {
111,072✔
2237
        if (REALM_LIKELY(!unbind_message_received())) {
111,072✔
2238
            if (REALM_LIKELY(!error_occurred())) {
108,732✔
2239
                if (REALM_LIKELY(ident_message_received())) {
108,648✔
2240
                    // State is WaitForUnbind.
2241
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
107,302✔
2242
                    if (REALM_LIKELY(!relayed_alloc)) {
107,304✔
2243
                        // Send DOWNLOAD or MARK.
2244
                        continue_history_scan(); // Throws
107,304✔
2245
                        // Session object may have been
2246
                        // destroyed at this point (suicide)
2247
                        return;
107,304✔
2248
                    }
107,304✔
2249
                    send_alloc_message(); // Throws
2,147,483,647✔
2250
                    return;
2,147,483,647✔
2251
                }
107,302✔
2252
                // State is SendIdent
2253
                send_ident_message(); // Throws
1,346✔
2254
                return;
1,346✔
2255
            }
108,648✔
2256
            // State is SendError
2257
            send_error_message(); // Throws
84✔
2258
            return;
84✔
2259
        }
108,732✔
2260
        // State is SendUnbound
2261
        send_unbound_message(); // Throws
2,340✔
2262
        terminate();            // Throws
2,340✔
2263
        m_connection.discard_session(m_session_ident);
2,340✔
2264
        // This session is now destroyed!
2265
    }
2,340✔
2266

2267
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2268
                              bool is_subserver, ProtocolError& error)
2269
    {
5,306✔
2270
        if (logger.would_log(util::Logger::Level::info)) {
5,306✔
2271
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
362✔
2272
                          "need_client_file_ident=%3, is_subserver=%4)",
362✔
2273
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
362✔
2274
                          int(is_subserver)); // Throws
362✔
2275
        }
362✔
2276

2277
        ServerImpl& server = m_connection.get_server();
5,306✔
2278
        _impl::VirtualPathComponents virt_path_components =
5,306✔
2279
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
5,306✔
2280

2281
        if (!virt_path_components.is_valid) {
5,306✔
2282
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2283
                         "signed_user_token='%2')",
28✔
2284
                         path,
28✔
2285
                         short_token_fmt(signed_user_token)); // Throws
28✔
2286
            error = ProtocolError::illegal_realm_path;
28✔
2287
            return false;
28✔
2288
        }
28✔
2289

2290
        // The user has proper permissions at this stage.
2291

2292
        m_server_file = server.get_or_create_file(path); // Throws
5,278✔
2293

2294
        m_server_file->add_unidentified_session(this); // Throws
5,278✔
2295

2296
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
5,278✔
2297
                    m_connection.get_client_protocol_version(),
5,278✔
2298
                    m_connection.get_client_user_agent()); // Throws
5,278✔
2299

2300
        m_is_subserver = is_subserver;
5,278✔
2301
        if (REALM_LIKELY(!need_client_file_ident)) {
5,278✔
2302
            // Protocol state is now WaitForUnbind
2303
            return true;
3,876✔
2304
        }
3,876✔
2305

2306
        // FIXME: We must make a choice about client file ident for read only
2307
        // sessions. They should have a special read-only client file ident.
2308
        file_ident_type proxy_file = 0; // No proxy
1,402✔
2309
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
1,402✔
2310
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
1,402✔
2311
        m_send_ident_message = true;
1,402✔
2312
        // Protocol state is now AllocatingIdent
2313

2314
        return true;
1,402✔
2315
    }
5,278✔
2316

2317
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2318
                               version_type scan_server_version, version_type scan_client_version,
2319
                               version_type latest_server_version, salt_type latest_server_version_salt,
2320
                               ProtocolError& error)
2321
    {
4,336✔
2322
        // Protocol state must be WaitForIdent
2323
        REALM_ASSERT(!need_client_file_ident());
4,336✔
2324
        REALM_ASSERT(!m_send_ident_message);
4,336✔
2325
        REALM_ASSERT(!ident_message_received());
4,336✔
2326
        REALM_ASSERT(!unbind_message_received());
4,336✔
2327
        REALM_ASSERT(!error_occurred());
4,336✔
2328
        REALM_ASSERT(!m_error_message_sent);
4,336✔
2329

2330
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
4,336✔
2331
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
4,336✔
2332
                     "latest_server_version_salt=%6)",
4,336✔
2333
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
4,336✔
2334
                     latest_server_version, latest_server_version_salt); // Throws
4,336✔
2335

2336
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
4,336✔
2337
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
4,336✔
2338
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
4,336✔
2339
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
4,336✔
2340
        UploadCursor upload_threshold = {0, 0};
4,336✔
2341
        version_type locked_server_version = 0;
4,336✔
2342
        BootstrapError error_2 =
4,336✔
2343
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
4,336✔
2344
                                                    client_type, upload_threshold, locked_server_version,
4,336✔
2345
                                                    logger); // Throws
4,336✔
2346
        switch (error_2) {
4,336✔
2347
            case BootstrapError::no_error:
4,308✔
2348
                break;
4,308✔
2349
            case BootstrapError::client_file_expired:
✔
2350
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2351
                error = ProtocolError::client_file_expired;
×
2352
                return false;
×
2353
            case BootstrapError::bad_client_file_ident:
✔
2354
                logger.error("Bad client file ident (%1) in IDENT message",
×
2355
                             client_file_ident); // Throws
×
2356
                error = ProtocolError::bad_client_file_ident;
×
2357
                return false;
×
2358
            case BootstrapError::bad_client_file_ident_salt:
4✔
2359
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2360
                             client_file_ident_salt); // Throws
4✔
2361
                error = ProtocolError::diverging_histories;
4✔
2362
                return false;
4✔
2363
            case BootstrapError::bad_download_server_version:
✔
2364
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2365
                error = ProtocolError::bad_server_version;
×
2366
                return false;
×
2367
            case BootstrapError::bad_download_client_version:
4✔
2368
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2369
                error = ProtocolError::bad_client_version;
4✔
2370
                return false;
4✔
2371
            case BootstrapError::bad_server_version:
20✔
2372
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2373
                error = ProtocolError::bad_server_version;
20✔
2374
                return false;
20✔
2375
            case BootstrapError::bad_server_version_salt:
4✔
2376
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2377
                error = ProtocolError::diverging_histories;
4✔
2378
                return false;
4✔
2379
            case BootstrapError::bad_client_type:
✔
2380
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2381
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2382
                                                              // `bad_client_type`.
2383
                return false;
×
2384
        }
4,336✔
2385

2386
        // Make sure there is no other session currently associcated with the
2387
        // same client-side file
2388
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
4,306✔
2389
            SyncConnection& other_conn = other_sess->get_connection();
×
2390
            // It is a protocol violation if the other session is associated
2391
            // with the same connection
2392
            if (&other_conn == &m_connection) {
×
2393
                logger.error("Client file already bound in other session associated with "
×
2394
                             "the same connection"); // Throws
×
2395
                error = ProtocolError::bound_in_other_session;
×
2396
                return false;
×
2397
            }
×
2398
            // When the other session is associated with a different connection
2399
            // (`other_conn`), the clash may be due to the server not yet having
2400
            // realized that the other connection has been closed by the
2401
            // client. If so, the other connention is a "zombie". In the
2402
            // interest of getting rid of zombie connections as fast as
2403
            // possible, we shall assume that a clash with a session in another
2404
            // connection is always due to that other connection being a
2405
            // zombie. And when such a situation is detected, we want to close
2406
            // the zombie connection immediately.
2407
            auto log_level = Logger::Level::detail;
×
2408
            other_conn.terminate(log_level,
×
2409
                                 "Sync connection closed (superseded session)"); // Throws
×
2410
        }
×
2411

2412
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
4,306✔
2413

2414
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
4,306✔
2415
                                                                  m_session_ident, client_file_ident));
4,306✔
2416

2417
        m_server_file->identify_session(this, client_file_ident); // Throws
4,306✔
2418

2419
        m_client_file_ident = client_file_ident;
4,306✔
2420
        m_download_progress = download_progress;
4,306✔
2421
        m_upload_threshold = upload_threshold;
4,306✔
2422
        m_locked_server_version = locked_server_version;
4,306✔
2423

2424
        ServerImpl& server = m_connection.get_server();
4,306✔
2425
        const Server::Config& config = server.get_config();
4,306✔
2426
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
4,306✔
2427

2428
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
4,306✔
2429
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2430
                                              client_file_ident); // Throws
×
2431
        }
×
2432

2433
        // Protocol  state is now WaitForUnbind
2434
        enlist_to_send();
4,306✔
2435
        return true;
4,306✔
2436
    }
4,306✔
2437

2438
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2439
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2440
                                ProtocolError& error)
2441
    {
44,770✔
2442
        // Protocol state must be WaitForUnbind
2443
        REALM_ASSERT(!m_send_ident_message);
44,770✔
2444
        REALM_ASSERT(ident_message_received());
44,770✔
2445
        REALM_ASSERT(!unbind_message_received());
44,770✔
2446
        REALM_ASSERT(!error_occurred());
44,770✔
2447
        REALM_ASSERT(!m_error_message_sent);
44,770✔
2448

2449
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
44,770✔
2450
                      "locked_server_version=%3, num_changesets=%4)",
44,770✔
2451
                      progress_client_version, progress_server_version, locked_server_version,
44,770✔
2452
                      upload_changesets.size()); // Throws
44,770✔
2453

2454
        // We are unable to reproduce the cursor object for the upload progress
2455
        // when the protocol version is less than 29, because the client does
2456
        // not provide the required information. When the protocol version is
2457
        // less than 25, we can always get a consistent cursor by taking it from
2458
        // the changeset that was uploaded last, but in protocol versions 25,
2459
        // 26, 27, and 28, things are more complicated. Here, we receive new
2460
        // values for `last_integrated_server_version` which we cannot afford to
2461
        // ignore, but we do not know what client versions they correspond
2462
        // to. Fortunately, we can produce a cursor that works, and is mutually
2463
        // consistent with previous cursors, by simply bumping
2464
        // `upload_progress.client_version` when
2465
        // `upload_progress.last_intgerated_server_version` grows.
2466
        //
2467
        // To see that this scheme works, consider the last changeset, A, that
2468
        // will have already been uploaded and integrated at the beginning of
2469
        // the next session, and the first changeset, B, that follows A in the
2470
        // client side history, and is not upload skippable (of local origin and
2471
        // nonempty). We then need to show that A will be skipped, if uploaded
2472
        // in the next session, but B will not.
2473
        //
2474
        // Let V be the client version produced by A, and let T be the value of
2475
        // `upload_progress.client_version` as determined in this session, which
2476
        // is used as threshold in the next session. Then we know that A is
2477
        // skipped during the next session if V is less than, or equal to T. If
2478
        // the protocol version is at least 29, the protocol requires that T is
2479
        // greater than, or equal to V. If the protocol version is less than 25,
2480
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
2481
        // or 28, we construct T such that it is always greater than, or equal
2482
        // to V, so in all cases, A will be skipped during the next session.
2483
        //
2484
        // Let W be the client version on which B is based. We then know that B
2485
        // will be retained if, and only if W is greater than, or equalto T. If
2486
        // the protocol version is at least 29, we know that T is less than, or
2487
        // equal to W, since B is not integrated until the next session. If the
2488
        // protocol version is less tahn 25, we know that T is V. Since V must
2489
        // be less than, or equal to W, we again know that T is less than, or
2490
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
2491
        // construct T such that it is equal to V + N, where N is the number of
2492
        // observed increments in `last_integrated_server_version` since the
2493
        // client version prodiced by A. For each of these observed increments,
2494
        // there must have been a distinct new client version, but all these
2495
        // client versions must be less than, or equal to W, since B is not
2496
        // integrated until the next session. Therefore, we know that T = V + N
2497
        // is less than, or qual to W. So, in all cases, B will not skipped
2498
        // during the next session.
2499
        int protocol_version = m_connection.get_client_protocol_version();
44,770✔
2500
        static_cast<void>(protocol_version); // No protocol diversion (yet)
44,770✔
2501

2502
        UploadCursor upload_progress;
44,770✔
2503
        upload_progress = {progress_client_version, progress_server_version};
44,770✔
2504

2505
        // `upload_progress.client_version` must be nondecreasing across the
2506
        // session.
2507
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
44,770✔
2508
        if (REALM_UNLIKELY(!good_1)) {
44,770✔
2509
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2510
                         m_upload_progress.client_version); // Throws
×
2511
            error = ProtocolError::bad_client_version;
×
2512
            return false;
×
2513
        }
×
2514
        // `upload_progress.last_integrated_server_version` must be a version
2515
        // that the client can have heard about.
2516
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
44,770✔
2517
        if (REALM_UNLIKELY(!good_2)) {
44,770✔
2518
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2519
                         upload_progress.last_integrated_server_version,
×
2520
                         m_download_progress.server_version); // Throws
×
2521
            error = ProtocolError::bad_server_version;
×
2522
            return false;
×
2523
        }
×
2524

2525
        // `upload_progress` must be consistent.
2526
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
44,770✔
2527
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2528
                         upload_progress.last_integrated_server_version); // Throws
×
2529
            error = ProtocolError::bad_server_version;
×
2530
            return false;
×
2531
        }
×
2532
        // `upload_progress` and `m_upload_threshold` must be mutually
2533
        // consistent.
2534
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
44,770✔
2535
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2536
                         "threshold (%3, %4)",
×
2537
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2538
                         m_upload_threshold.client_version,
×
2539
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2540
            error = ProtocolError::bad_server_version;
×
2541
            return false;
×
2542
        }
×
2543
        // `upload_progress` and `m_upload_progress` must be mutually
2544
        // consistent.
2545
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
44,770✔
2546
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2547
                         "upload progress (%3, %4)",
×
2548
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2549
                         m_upload_progress.client_version,
×
2550
                         m_upload_progress.last_integrated_server_version); // Throws
×
2551
            error = ProtocolError::bad_server_version;
×
2552
            return false;
×
2553
        }
×
2554

2555
        version_type locked_server_version_2 = locked_server_version;
44,770✔
2556

2557
        // `locked_server_version_2` must be nondecreasing over the lifetime of
2558
        // the client-side file.
2559
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
44,770✔
2560
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2561
                         m_locked_server_version); // Throws
×
2562
            error = ProtocolError::bad_server_version;
×
2563
            return false;
×
2564
        }
×
2565
        // `locked_server_version_2` must be a version that the client can have
2566
        // heard about.
2567
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
44,770✔
2568
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2569
                         m_download_progress.server_version); // Throws
×
2570
            error = ProtocolError::bad_server_version;
×
2571
            return false;
×
2572
        }
×
2573

2574
        std::size_t num_previously_integrated_changesets = 0;
44,770✔
2575
        if (!upload_changesets.empty()) {
44,770✔
2576
            UploadCursor up = m_upload_progress;
24,686✔
2577
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
36,992✔
2578
                // `uc.upload_cursor.client_version` must be increasing across
2579
                // all the changesets in this UPLOAD message, and all must be
2580
                // greater than upload_progress.client_version of previous
2581
                // UPLOAD message.
2582
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
36,992✔
2583
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2584
                                 "changeset (%1 <= %2)",
×
2585
                                 uc.upload_cursor.client_version,
×
2586
                                 up.client_version); // Throws
×
2587
                    error = ProtocolError::bad_client_version;
×
2588
                    return false;
×
2589
                }
×
2590
                // `uc.upload_progress` must be consistent.
2591
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
36,992✔
2592
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2593
                                 uc.upload_cursor.client_version,
×
2594
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2595
                    error = ProtocolError::bad_server_version;
×
2596
                    return false;
×
2597
                }
×
2598
                // `uc.upload_progress` must be mutually consistent with
2599
                // previous upload cursor.
2600
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
36,992✔
2601
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2602
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2603
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2604
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2605
                    error = ProtocolError::bad_server_version;
×
2606
                    return false;
×
2607
                }
×
2608
                // `uc.upload_progress` must be mutually consistent with
2609
                // threshold, that is, for changesets that have not previously
2610
                // been integrated, it is important that the specified value of
2611
                // `last_integrated_server_version` is greater than, or equal to
2612
                // the reciprocal history base version.
2613
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
36,992✔
2614
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
36,992✔
2615
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2616
                                 "inconsistent with threshold (%3, %4)",
×
2617
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2618
                                 m_upload_threshold.client_version,
×
2619
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2620
                    error = ProtocolError::bad_server_version;
×
2621
                    return false;
×
2622
                }
×
2623
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
36,992✔
2624
                if (previously_integrated)
36,992✔
2625
                    ++num_previously_integrated_changesets;
2,500✔
2626
                up = uc.upload_cursor;
36,992✔
2627
            }
36,992✔
2628
            // `upload_progress.client_version` must be greater than, or equal
2629
            // to client versions produced by each of the changesets in this
2630
            // UPLOAD message.
2631
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
24,686✔
2632
                logger.error("Upload progress less than client version produced by uploaded "
×
2633
                             "changeset (%1 > %2)",
×
2634
                             up.client_version,
×
2635
                             upload_progress.client_version); // Throws
×
2636
                error = ProtocolError::bad_client_version;
×
2637
                return false;
×
2638
            }
×
2639
            // The upload cursor of last uploaded changeset must be mutually
2640
            // consistent with the reported upload progress.
2641
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
24,686✔
2642
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2643
                             "inconsistent with upload progress (%3, %4)",
×
2644
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2645
                             upload_progress.last_integrated_server_version); // Throws
×
2646
                error = ProtocolError::bad_server_version;
×
2647
                return false;
×
2648
            }
×
2649
        }
24,686✔
2650

2651
        // FIXME: Part of a very poor man's substitute for a proper backpressure
2652
        // scheme.
2653
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
44,770✔
2654
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2655
            // Using this exact error code, because it causes `try_again` flag
2656
            // to be set to true, which causes the client to wait for about 5
2657
            // minuites before trying to connect again.
2658
            error = ProtocolError::connection_closed;
×
2659
            return false;
×
2660
        }
×
2661

2662
        m_upload_progress = upload_progress;
44,770✔
2663

2664
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
44,770✔
2665
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
44,770✔
2666

2667
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
44,770✔
2668
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
44,770✔
2669

2670
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
44,770✔
2671
        if (!have_anything_to_do)
44,770✔
2672
            return true;
304✔
2673

2674
        if (!have_real_upload_progress)
44,466✔
2675
            upload_progress = m_upload_threshold;
×
2676

2677
        if (num_previously_integrated_changesets > 0) {
44,466✔
2678
            logger.detail("Ignoring %1 previously integrated changesets",
826✔
2679
                          num_previously_integrated_changesets); // Throws
826✔
2680
        }
826✔
2681
        if (num_changesets_to_integrate > 0) {
44,466✔
2682
            logger.detail("Initiate integration of %1 remote changesets",
24,290✔
2683
                          num_changesets_to_integrate); // Throws
24,290✔
2684
        }
24,290✔
2685

2686
        REALM_ASSERT(m_server_file);
44,466✔
2687
        ServerFile& file = *m_server_file;
44,466✔
2688
        std::size_t offset = num_previously_integrated_changesets;
44,466✔
2689
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
44,466✔
2690
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
44,466✔
2691

2692
        m_locked_server_version = locked_server_version_2;
44,466✔
2693
        return true;
44,466✔
2694
    }
44,770✔
2695

2696
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2697
    {
12,008✔
2698
        // Protocol state must be WaitForUnbind
2699
        REALM_ASSERT(!m_send_ident_message);
12,008✔
2700
        REALM_ASSERT(ident_message_received());
12,008✔
2701
        REALM_ASSERT(!unbind_message_received());
12,008✔
2702
        REALM_ASSERT(!error_occurred());
12,008✔
2703
        REALM_ASSERT(!m_error_message_sent);
12,008✔
2704

2705
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
12,008✔
2706

2707
        m_download_completion_request = request_ident;
12,008✔
2708

2709
        ensure_enlisted_to_send();
12,008✔
2710
        return true;
12,008✔
2711
    }
12,008✔
2712

2713
    // Returns true if the deactivation process has been completed, at which
2714
    // point the caller (SyncConnection::receive_unbind_message()) should
2715
    // terminate the session.
2716
    //
2717
    // CAUTION: This function may commit suicide!
2718
    void receive_unbind_message()
2719
    {
2,394✔
2720
        // Protocol state may be anything but SendUnbound
2721
        REALM_ASSERT(!m_unbind_message_received);
2,394✔
2722

2723
        logger.detail("Received: UNBIND"); // Throws
2,394✔
2724

2725
        detach_from_server_file();
2,394✔
2726
        m_unbind_message_received = true;
2,394✔
2727

2728
        // Detect completion of the deactivation process
2729
        if (m_error_message_sent) {
2,394✔
2730
            // Deactivation process completed
2731
            terminate(); // Throws
26✔
2732
            m_connection.discard_session(m_session_ident);
26✔
2733
            // This session is now destroyed!
2734
            return;
26✔
2735
        }
26✔
2736

2737
        // Protocol state is now SendUnbound
2738
        ensure_enlisted_to_send();
2,368✔
2739
    }
2,368✔
2740

2741
    void receive_error_message(session_ident_type, int, std::string_view)
2742
    {
×
2743
        REALM_ASSERT(!m_unbind_message_received);
×
2744

2745
        logger.detail("Received: ERROR"); // Throws
×
2746
    }
×
2747

2748
private:
2749
    SyncConnection& m_connection;
2750

2751
    const session_ident_type m_session_ident;
2752

2753
    // Not null if, and only if this session is in
2754
    // m_connection.m_sessions_enlisted_to_send.
2755
    Session* m_next = nullptr;
2756

2757
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2758
    // reset to null when the deactivation process is initiated, either when the
2759
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2760
    util::bind_ptr<ServerFile> m_server_file;
2761

2762
    bool m_disable_download = false;
2763
    bool m_is_subserver = false;
2764

2765
    using file_ident_request_type = ServerFile::file_ident_request_type;
2766

2767
    // When nonzero, this session has an outstanding request for a client file
2768
    // identifier.
2769
    file_ident_request_type m_file_ident_request = 0;
2770

2771
    // Payload for next outgoing ALLOC message.
2772
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2773

2774
    // Zero until the session receives an IDENT message from the client.
2775
    file_ident_type m_client_file_ident = 0;
2776

2777
    // Zero until initiate_deactivation() is called.
2778
    ProtocolError m_error_code = {};
2779

2780
    // The current point of progression of the download process. Set to (<server
2781
    // version>, <client version>) of the IDENT message when the IDENT message
2782
    // is received. At the time of return from continue_history_scan(), it
2783
    // points to the latest server version such that all preceding changesets in
2784
    // the server-side history have been downloaded, are currently being
2785
    // downloaded, or are *download excluded*.
2786
    DownloadCursor m_download_progress = {0, 0};
2787

2788
    request_ident_type m_download_completion_request = 0;
2789

2790
    // Records the progress of the upload process. Used to check that the client
2791
    // uploads changesets in order. Also, when m_upload_progress >
2792
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2793
    // version of the upload progress.
2794
    UploadCursor m_upload_progress = {0, 0};
2795

2796
    // Initialized on reception of the IDENT message. Specifies the actual
2797
    // upload progress (as recorded on the server-side) at the beginning of the
2798
    // session, and it remains fixed throughout the session.
2799
    //
2800
    // m_upload_threshold includes the progress resulting from the received
2801
    // changesets that have not yet been integrated (only relevant for
2802
    // synchronous backup).
2803
    UploadCursor m_upload_threshold = {0, 0};
2804

2805
    // Works partially as a cache of the persisted value, and partially as a way
2806
    // of checking that the client respects that it can never decrease.
2807
    version_type m_locked_server_version = 0;
2808

2809
    bool m_send_ident_message = false;
2810
    bool m_unbind_message_received = false;
2811
    bool m_error_message_sent = false;
2812

2813
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2814
    /// has been sent in the current session. The variable is used to ensure
2815
    /// that a DOWNLOAD message is always sent in a session. The received
2816
    /// DOWNLOAD message is needed by the client to ensure that its current
2817
    /// download progress is up to date.
2818
    bool m_one_download_message_sent = false;
2819

2820
    static std::string make_logger_prefix(session_ident_type session_ident)
2821
    {
5,304✔
2822
        std::ostringstream out;
5,304✔
2823
        out.imbue(std::locale::classic());
5,304✔
2824
        out << "Session[" << session_ident << "]: "; // Throws
5,304✔
2825
        return out.str();                            // Throws
5,304✔
2826
    }
5,304✔
2827

2828
    // Scan the history for changesets to be downloaded.
2829
    // If the history is longer than the end point of the previous scan,
2830
    // a DOWNLOAD message will be sent.
2831
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2832
    // requested to be notified about download completion.
2833
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2834
    //
2835
    // This function may lead to the destruction of the session object
2836
    // (suicide).
2837
    void continue_history_scan()
2838
    {
107,298✔
2839
        // Protocol state must be WaitForUnbind
2840
        REALM_ASSERT(!m_send_ident_message);
107,298✔
2841
        REALM_ASSERT(ident_message_received());
107,298✔
2842
        REALM_ASSERT(!unbind_message_received());
107,298✔
2843
        REALM_ASSERT(!error_occurred());
107,298✔
2844
        REALM_ASSERT(!m_error_message_sent);
107,298✔
2845
        REALM_ASSERT(!is_enlisted_to_send());
107,298✔
2846

2847
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
107,298✔
2848
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
107,298✔
2849

2850
        ServerImpl& server = m_connection.get_server();
107,298✔
2851
        const Server::Config& config = server.get_config();
107,298✔
2852
        if (REALM_UNLIKELY(m_disable_download))
107,298✔
2853
            return;
×
2854

2855
        bool have_more_to_scan =
107,298✔
2856
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
107,298✔
2857
        if (have_more_to_scan) {
107,298✔
2858
            m_server_file->register_client_access(m_client_file_ident);     // Throws
42,158✔
2859
            const ServerHistory& history = m_server_file->access().history; // Throws
42,158✔
2860
            const char* body;
42,158✔
2861
            std::size_t uncompressed_body_size;
42,158✔
2862
            std::size_t compressed_body_size = 0;
42,158✔
2863
            bool body_is_compressed = false;
42,158✔
2864
            version_type end_version = last_server_version.version;
42,158✔
2865
            DownloadCursor download_progress;
42,158✔
2866
            UploadCursor upload_progress = {0, 0};
42,158✔
2867
            std::uint_fast64_t downloadable_bytes = 0;
42,158✔
2868
            std::size_t num_changesets;
42,158✔
2869
            std::size_t accum_original_size;
42,158✔
2870
            std::size_t accum_compacted_size;
42,158✔
2871
            ServerProtocol& protocol = get_server_protocol();
42,158✔
2872
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
42,158!
2873
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
42,158!
2874
            DownloadCache& cache = m_server_file->get_download_cache();
42,158✔
2875
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
42,158!
2876
            if (fetch_from_cache) {
42,158✔
2877
                body = cache.body.get();
×
2878
                uncompressed_body_size = cache.uncompressed_body_size;
×
2879
                compressed_body_size = cache.compressed_body_size;
×
2880
                body_is_compressed = cache.body_is_compressed;
×
2881
                download_progress = cache.download_progress;
×
2882
                downloadable_bytes = cache.downloadable_bytes;
×
2883
                num_changesets = cache.num_changesets;
×
2884
                accum_original_size = cache.accum_original_size;
×
2885
                accum_compacted_size = cache.accum_compacted_size;
×
2886
            }
×
2887
            else {
42,158✔
2888
                // Discard the old cached DOWNLOAD body before generating a new
2889
                // one to be cached. This can make a big difference because the
2890
                // size of that body can be very large (10GiB has been seen in a
2891
                // real-world case).
2892
                if (enable_cache)
42,158✔
2893
                    cache.body = {};
×
2894

2895
                OutputBuffer& out = server.get_misc_buffers().download_message;
42,158✔
2896
                out.reset();
42,158✔
2897
                download_progress = m_download_progress;
42,158✔
2898
                auto fetch_and_compress = [&](std::size_t max_download_size) {
42,168✔
2899
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
42,168✔
2900
                    std::uint_fast64_t cumulative_byte_size_current;
42,168✔
2901
                    std::uint_fast64_t cumulative_byte_size_total;
42,168✔
2902
                    bool not_expired = history.fetch_download_info(
42,168✔
2903
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
42,168✔
2904
                        cumulative_byte_size_current, cumulative_byte_size_total,
42,168✔
2905
                        max_download_size); // Throws
42,168✔
2906
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
42,168✔
2907
                    SyncConnection& conn = get_connection();
42,168✔
2908
                    if (REALM_UNLIKELY(!not_expired)) {
42,168✔
2909
                        logger.debug("History scanning failed: Client file entry "
×
2910
                                     "expired during session"); // Throws
×
2911
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2912
                        // Session object may have been destroyed at this point
2913
                        // (suicide).
2914
                        return false;
×
2915
                    }
×
2916

2917
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
42,168✔
2918
                    uncompressed_body_size = out.size();
42,168✔
2919
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
42,168✔
2920
                    body = uncompressed.data();
42,168✔
2921
                    std::size_t max_uncompressed = 1024;
42,168✔
2922
                    if (uncompressed.size() > max_uncompressed) {
42,168✔
2923
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,204✔
2924
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,204✔
2925
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,204✔
2926
                        if (buffer.size() < uncompressed.size()) {
4,204✔
2927
                            body = buffer.data();
4,204✔
2928
                            compressed_body_size = buffer.size();
4,204✔
2929
                            body_is_compressed = true;
4,204✔
2930
                        }
4,204✔
2931
                    }
4,204✔
2932
                    num_changesets = handler.num_changesets;
42,168✔
2933
                    accum_original_size = handler.accum_original_size;
42,168✔
2934
                    accum_compacted_size = handler.accum_compacted_size;
42,168✔
2935
                    return true;
42,168✔
2936
                };
42,168✔
2937
                if (enable_cache) {
42,158✔
2938
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2939
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2940
                        // Session object may have been destroyed at this point
2941
                        // (suicide).
2942
                        return;
×
2943
                    }
×
2944
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2945
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2946
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2947
                    std::copy(body, body + body_size, cache.body.get());
×
2948
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2949
                    cache.compressed_body_size = compressed_body_size;
×
2950
                    cache.body_is_compressed = body_is_compressed;
×
2951
                    cache.end_version = end_version;
×
2952
                    cache.download_progress = download_progress;
×
2953
                    cache.downloadable_bytes = downloadable_bytes;
×
2954
                    cache.num_changesets = num_changesets;
×
2955
                    cache.accum_original_size = accum_original_size;
×
2956
                    cache.accum_compacted_size = accum_compacted_size;
×
2957
                }
×
2958
                else {
42,158✔
2959
                    std::size_t max_download_size = config.max_download_size;
42,158✔
2960
                    if (!fetch_and_compress(max_download_size)) { // Throws
42,158✔
2961
                        // Session object may have been destroyed at this point
2962
                        // (suicide).
2963
                        return;
×
2964
                    }
×
2965
                }
42,158✔
2966
            }
42,158✔
2967

2968
            OutputBuffer& out = m_connection.get_output_buffer();
42,158✔
2969
            protocol.make_download_message(
42,158✔
2970
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
42,158✔
2971
                download_progress.last_integrated_client_version, last_server_version.version,
42,158✔
2972
                last_server_version.salt, upload_progress.client_version,
42,158✔
2973
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
42,158✔
2974
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
42,158✔
2975

2976
            m_download_progress = download_progress;
42,158✔
2977
            logger.debug("Setting of m_download_progress.server_version = %1",
42,158✔
2978
                         m_download_progress.server_version); // Throws
42,158✔
2979
            send_download_message();
42,158✔
2980
            m_one_download_message_sent = true;
42,158✔
2981

2982
            enlist_to_send();
42,158✔
2983
        }
42,158✔
2984
        else if (m_download_completion_request) {
65,140✔
2985
            // Send a MARK message
2986
            request_ident_type request_ident = m_download_completion_request;
11,994✔
2987
            send_mark_message(request_ident);  // Throws
11,994✔
2988
            m_download_completion_request = 0; // Request handled
11,994✔
2989
            enlist_to_send();
11,994✔
2990
        }
11,994✔
2991
    }
107,298✔
2992

2993
    void send_ident_message()
2994
    {
1,344✔
2995
        // Protocol state must be SendIdent
2996
        REALM_ASSERT(!need_client_file_ident());
1,344✔
2997
        REALM_ASSERT(m_send_ident_message);
1,344✔
2998
        REALM_ASSERT(!ident_message_received());
1,344✔
2999
        REALM_ASSERT(!unbind_message_received());
1,344✔
3000
        REALM_ASSERT(!error_occurred());
1,344✔
3001
        REALM_ASSERT(!m_error_message_sent);
1,344✔
3002

3003
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,344✔
3004

3005
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,344✔
3006
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,344✔
3007

3008
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,344✔
3009
                     client_file_ident_salt); // Throws
1,344✔
3010

3011
        ServerProtocol& protocol = get_server_protocol();
1,344✔
3012
        OutputBuffer& out = m_connection.get_output_buffer();
1,344✔
3013
        int protocol_version = m_connection.get_client_protocol_version();
1,344✔
3014
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,344✔
3015
                                    client_file_ident_salt); // Throws
1,344✔
3016
        m_connection.initiate_write_output_buffer();         // Throws
1,344✔
3017

3018
        m_allocated_file_ident.ident = 0; // Consumed
1,344✔
3019
        m_send_ident_message = false;
1,344✔
3020
        // Protocol state is now WaitForStateRequest or WaitForIdent
3021
    }
1,344✔
3022

3023
    void send_download_message()
3024
    {
42,172✔
3025
        m_connection.initiate_write_output_buffer(); // Throws
42,172✔
3026
    }
42,172✔
3027

3028
    void send_mark_message(request_ident_type request_ident)
3029
    {
11,992✔
3030
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
11,992✔
3031

3032
        ServerProtocol& protocol = get_server_protocol();
11,992✔
3033
        OutputBuffer& out = m_connection.get_output_buffer();
11,992✔
3034
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
11,992✔
3035
        m_connection.initiate_write_output_buffer();                     // Throws
11,992✔
3036
    }
11,992✔
3037

3038
    void send_alloc_message()
3039
    {
×
3040
        // Protocol state must be WaitForUnbind
3041
        REALM_ASSERT(!m_send_ident_message);
×
3042
        REALM_ASSERT(ident_message_received());
×
3043
        REALM_ASSERT(!unbind_message_received());
×
3044
        REALM_ASSERT(!error_occurred());
×
3045
        REALM_ASSERT(!m_error_message_sent);
×
3046

3047
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3048

3049
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3050
        REALM_ASSERT(false);
×
3051

3052
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3053

3054
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3055

3056
        ServerProtocol& protocol = get_server_protocol();
×
3057
        OutputBuffer& out = m_connection.get_output_buffer();
×
3058
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3059
        m_connection.initiate_write_output_buffer();                   // Throws
×
3060

3061
        m_allocated_file_ident.ident = 0; // Consumed
×
3062

3063
        // Other messages may be waiting to be sent.
3064
        enlist_to_send();
×
3065
    }
×
3066

3067
    void send_unbound_message()
3068
    {
2,342✔
3069
        // Protocol state must be SendUnbound
3070
        REALM_ASSERT(unbind_message_received());
2,342✔
3071
        REALM_ASSERT(!m_error_message_sent);
2,342✔
3072

3073
        logger.debug("Sending: UNBOUND"); // Throws
2,342✔
3074

3075
        ServerProtocol& protocol = get_server_protocol();
2,342✔
3076
        OutputBuffer& out = m_connection.get_output_buffer();
2,342✔
3077
        protocol.make_unbound_message(out, m_session_ident); // Throws
2,342✔
3078
        m_connection.initiate_write_output_buffer();         // Throws
2,342✔
3079
    }
2,342✔
3080

3081
    void send_error_message()
3082
    {
80✔
3083
        // Protocol state must be SendError
3084
        REALM_ASSERT(!unbind_message_received());
80✔
3085
        REALM_ASSERT(error_occurred());
80✔
3086
        REALM_ASSERT(!m_error_message_sent);
80✔
3087

3088
        REALM_ASSERT(is_session_level_error(m_error_code));
80✔
3089

3090
        ProtocolError error_code = m_error_code;
80✔
3091
        const char* message = get_protocol_error_message(int(error_code));
80✔
3092
        std::size_t message_size = std::strlen(message);
80✔
3093
        bool try_again = determine_try_again(error_code);
80✔
3094

3095
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
80✔
3096
                      try_again); // Throws
80✔
3097

3098
        ServerProtocol& protocol = get_server_protocol();
80✔
3099
        OutputBuffer& out = m_connection.get_output_buffer();
80✔
3100
        int protocol_version = m_connection.get_client_protocol_version();
80✔
3101
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
80✔
3102
                                    m_session_ident); // Throws
80✔
3103
        m_connection.initiate_write_output_buffer();  // Throws
80✔
3104

3105
        m_error_message_sent = true;
80✔
3106
        // Protocol state is now WaitForUnbindErr
3107
    }
80✔
3108

3109
    void send_log_message(util::Logger::Level level, const std::string&& message)
3110
    {
4,308✔
3111
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
4,308✔
3112
            return logger.log(level, message.c_str());
×
3113
        }
×
3114

3115
        m_connection.send_log_message(level, std::move(message), m_session_ident);
4,308✔
3116
    }
4,308✔
3117

3118
    // Idempotent
3119
    void detach_from_server_file() noexcept
3120
    {
7,778✔
3121
        if (!m_server_file)
7,778✔
3122
            return;
2,500✔
3123
        ServerFile& file = *m_server_file;
5,278✔
3124
        if (ident_message_received()) {
5,278✔
3125
            file.remove_identified_session(m_client_file_ident);
4,308✔
3126
        }
4,308✔
3127
        else {
970✔
3128
            file.remove_unidentified_session(this);
970✔
3129
        }
970✔
3130
        if (m_file_ident_request != 0)
5,278✔
3131
            file.cancel_file_ident_request(m_file_ident_request);
58✔
3132
        m_server_file.reset();
5,278✔
3133
    }
5,278✔
3134

3135
    friend class SessionQueue;
3136
};
3137

3138

3139
// ============================ SessionQueue implementation ============================
3140

3141
void SessionQueue::push_back(Session* sess) noexcept
3142
{
111,254✔
3143
    REALM_ASSERT(!sess->m_next);
111,254✔
3144
    if (m_back) {
111,254✔
3145
        sess->m_next = m_back->m_next;
40,832✔
3146
        m_back->m_next = sess;
40,832✔
3147
    }
40,832✔
3148
    else {
70,422✔
3149
        sess->m_next = sess;
70,422✔
3150
    }
70,422✔
3151
    m_back = sess;
111,254✔
3152
}
111,254✔
3153

3154

3155
Session* SessionQueue::pop_front() noexcept
3156
{
161,088✔
3157
    Session* sess = nullptr;
161,088✔
3158
    if (m_back) {
161,088✔
3159
        sess = m_back->m_next;
111,072✔
3160
        if (sess != m_back) {
111,072✔
3161
            m_back->m_next = sess->m_next;
40,770✔
3162
        }
40,770✔
3163
        else {
70,302✔
3164
            m_back = nullptr;
70,302✔
3165
        }
70,302✔
3166
        sess->m_next = nullptr;
111,072✔
3167
    }
111,072✔
3168
    return sess;
161,088✔
3169
}
161,088✔
3170

3171

3172
void SessionQueue::clear() noexcept
3173
{
3,344✔
3174
    if (m_back) {
3,344✔
3175
        Session* sess = m_back;
118✔
3176
        for (;;) {
170✔
3177
            Session* next = sess->m_next;
170✔
3178
            sess->m_next = nullptr;
170✔
3179
            if (next == m_back)
170✔
3180
                break;
118✔
3181
            sess = next;
52✔
3182
        }
52✔
3183
        m_back = nullptr;
118✔
3184
    }
118✔
3185
}
3,344✔
3186

3187

3188
// ============================ ServerFile implementation ============================
3189

3190
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3191
                       std::string real_path, bool disable_sync_to_disk)
3192
    : logger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.logger_ptr}               // Throws
450✔
3193
    , wlogger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.get_worker().logger_ptr} // Throws
450✔
3194
    , m_server{server}
450✔
3195
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
450✔
3196
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
450✔
3197
{
1,142✔
3198
}
1,142✔
3199

3200

3201
ServerFile::~ServerFile() noexcept
3202
{
1,144✔
3203
    REALM_ASSERT(m_unidentified_sessions.empty());
1,144✔
3204
    REALM_ASSERT(m_identified_sessions.empty());
1,144✔
3205
    REALM_ASSERT(m_file_ident_request == 0);
1,144✔
3206
}
1,144✔
3207

3208

3209
void ServerFile::initialize()
3210
{
1,142✔
3211
    const ServerHistory& history = access().history; // Throws
1,142✔
3212
    file_ident_type partial_file_ident = 0;
1,142✔
3213
    version_type partial_progress_reference_version = 0;
1,142✔
3214
    bool has_upstream_sync_status;
1,142✔
3215
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,142✔
3216
                       partial_progress_reference_version); // Throws
1,142✔
3217
    REALM_ASSERT(!has_upstream_sync_status);
1,142✔
3218
    REALM_ASSERT(partial_file_ident == 0);
1,142✔
3219
}
1,142✔
3220

3221

3222
void ServerFile::activate() {}
1,144✔
3223

3224

3225
// This function must be called only after a completed invocation of
3226
// initialize(). Both functinos must only ever be called by the network event
3227
// loop thread.
3228
void ServerFile::register_client_access(file_ident_type) {}
90,928✔
3229

3230

3231
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file,
3232
                                    ClientType client_type) -> file_ident_request_type
3233
{
1,400✔
3234
    auto request = ++m_last_file_ident_request;
1,400✔
3235
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
1,400✔
3236

3237
    on_work_added(); // Throws
1,400✔
3238
    return request;
1,400✔
3239
}
1,400✔
3240

3241

3242
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3243
{
58✔
3244
    auto i = m_file_ident_requests.find(request);
58✔
3245
    REALM_ASSERT(i != m_file_ident_requests.end());
58✔
3246
    FileIdentRequestInfo& info = i->second;
58✔
3247
    REALM_ASSERT(info.receiver);
58✔
3248
    info.receiver = nullptr;
58✔
3249
}
58✔
3250

3251

3252
void ServerFile::add_unidentified_session(Session* sess)
3253
{
5,274✔
3254
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
5,274✔
3255
    m_unidentified_sessions.insert(sess); // Throws
5,274✔
3256
}
5,274✔
3257

3258

3259
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3260
{
4,306✔
3261
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
4,306✔
3262
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
4,306✔
3263

3264
    m_identified_sessions[client_file_ident] = sess; // Throws
4,306✔
3265
    m_unidentified_sessions.erase(sess);
4,306✔
3266
}
4,306✔
3267

3268

3269
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3270
{
970✔
3271
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
970✔
3272
    m_unidentified_sessions.erase(sess);
970✔
3273
}
970✔
3274

3275

3276
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3277
{
4,306✔
3278
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
4,306✔
3279
    m_identified_sessions.erase(client_file_ident);
4,306✔
3280
}
4,306✔
3281

3282

3283
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3284
{
4,304✔
3285
    auto i = m_identified_sessions.find(client_file_ident);
4,304✔
3286
    if (i == m_identified_sessions.end())
4,304✔
3287
        return nullptr;
4,308✔
3288
    return i->second;
4,294,967,294✔
3289
}
4,304✔
3290

3291
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3292
{
44,774✔
3293
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
44,774✔
3294
}
44,774✔
3295

3296

3297
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3298
                                                version_type locked_server_version, const UploadChangeset* changesets,
3299
                                                std::size_t num_changesets)
3300
{
44,466✔
3301
    register_client_access(client_file_ident); // Throws
44,466✔
3302

3303
    bool dirty = false;
44,466✔
3304

3305
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
44,466✔
3306
    std::size_t num_bytes = 0;
44,466✔
3307
    for (std::size_t i = 0; i < num_changesets; ++i) {
78,958✔
3308
        const UploadChangeset& uc = changesets[i];
34,492✔
3309
        auto& changesets = list.changesets;
34,492✔
3310
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,492✔
3311
                                uc.changeset); // Throws
34,492✔
3312
        num_bytes += uc.changeset.size();
34,492✔
3313
        dirty = true;
34,492✔
3314
    }
34,492✔
3315

3316
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
44,466✔
3317
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
44,466✔
3318
    if (upload_progress.client_version > list.upload_progress.client_version) {
44,468✔
3319
        list.upload_progress = upload_progress;
44,468✔
3320
        dirty = true;
44,468✔
3321
    }
44,468✔
3322

3323
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
44,466✔
3324
    if (locked_server_version > list.locked_server_version) {
44,466✔
3325
        list.locked_server_version = locked_server_version;
39,214✔
3326
        dirty = true;
39,214✔
3327
    }
39,214✔
3328

3329
    if (REALM_LIKELY(dirty)) {
44,468✔
3330
        if (num_changesets > 0) {
44,468✔
3331
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
24,292✔
3332
        }
24,292✔
3333
        else {
20,176✔
3334
            on_work_added(); // Throws
20,176✔
3335
        }
20,176✔
3336
    }
44,468✔
3337
}
44,466✔
3338

3339

3340
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3341
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3342
                                                    ClientType client_type, UploadCursor& upload_progress,
3343
                                                    version_type& locked_server_version, Logger& logger)
3344
{
4,340✔
3345
    // The Realm file may contain a later snapshot than the one reflected by
3346
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
3347
    if (server_version.version > m_version_info.sync_version.version)
4,340✔
3348
        return BootstrapError::bad_server_version;
20✔
3349

3350
    const ServerHistory& hist = access().history; // Throws
4,320✔
3351
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
4,320✔
3352
                                                         client_type, upload_progress, locked_server_version,
4,320✔
3353
                                                         logger); // Throws
4,320✔
3354

3355
    // FIXME: Rather than taking previously buffered changesets from the same
3356
    // client file into account when determining the upload progress, and then
3357
    // allowing for an error during the integration of those changesets to be
3358
    // reported to, and terminate the new session, consider to instead postpone
3359
    // the bootstrapping of the new session until all previously buffered
3360
    // changesets from same client file have been fully processed.
3361

3362
    if (error == BootstrapError::no_error) {
4,320✔
3363
        register_client_access(client_file_ident.ident); // Throws
4,308✔
3364

3365
        // If upload, or releaseing of server versions progressed further during
3366
        // previous sessions than the persisted points, take that into account
3367
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
4,308✔
3368
        if (i != m_work.changesets_from_downstream.end()) {
4,308✔
3369
            const IntegratableChangesetList& list = i->second;
1,314✔
3370
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,314✔
3371
            upload_progress = list.upload_progress;
1,314✔
3372
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,314✔
3373
            locked_server_version = list.locked_server_version;
1,314✔
3374
        }
1,314✔
3375
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
4,308✔
3376
        if (j != m_changesets_from_downstream.end()) {
4,308✔
3377
            const IntegratableChangesetList& list = j->second;
86✔
3378
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
86✔
3379
            upload_progress = list.upload_progress;
86✔
3380
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
86✔
3381
            locked_server_version = list.locked_server_version;
86✔
3382
        }
86✔
3383
    }
4,308✔
3384

3385
    return error;
4,320✔
3386
}
4,340✔
3387

3388
// NOTE: This function is executed by the worker thread
3389
void ServerFile::worker_process_work_unit(WorkerState& state)
3390
{
37,326✔
3391
    SteadyTimePoint start_time = steady_clock_now();
37,326✔
3392
    milliseconds_type parallel_time = 0;
37,326✔
3393

3394
    Work& work = m_work;
37,326✔
3395
    wlogger.debug("Work unit execution started"); // Throws
37,326✔
3396

3397
    if (work.has_primary_work) {
37,326✔
3398
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
37,326✔
3399
            worker_allocate_file_identifiers(); // Throws
1,350✔
3400

3401
        if (!m_work.changesets_from_downstream.empty())
37,326✔
3402
            worker_integrate_changes_from_downstream(state); // Throws
35,982✔
3403
    }
37,326✔
3404

3405
    wlogger.debug("Work unit execution completed"); // Throws
37,326✔
3406

3407
    milliseconds_type time = steady_duration(start_time);
37,326✔
3408
    milliseconds_type seq_time = time - parallel_time;
37,326✔
3409
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
37,326✔
3410
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
37,326✔
3411

3412
    // Pass control back to the network event loop thread
3413
    network::Service& service = m_server.get_service();
37,326✔
3414
    service.post([this](Status) {
37,326✔
3415
        // FIXME: The safety of capturing `this` here, relies on the fact
3416
        // that ServerFile objects currently are not destroyed until the
3417
        // server object is destroyed.
3418
        group_postprocess_stage_1(); // Throws
37,018✔
3419
        // Suicide may have happened at this point
3420
    }); // Throws
37,018✔
3421
}
37,326✔
3422

3423

3424
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3425
{
24,292✔
3426
    m_num_changesets_from_downstream += num_changesets;
24,292✔
3427

3428
    if (num_bytes > 0) {
24,292✔
3429
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
24,292✔
3430
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
24,292✔
3431
    }
24,292✔
3432

3433
    on_work_added(); // Throws
24,292✔
3434
}
24,292✔
3435

3436

3437
void ServerFile::on_work_added()
3438
{
45,870✔
3439
    if (m_has_blocked_work)
45,870✔
3440
        return;
8,462✔
3441
    m_has_blocked_work = true;
37,408✔
3442
    // Reference file
3443
    if (m_has_work_in_progress)
37,408✔
3444
        return;
12,526✔
3445
    group_unblock_work(); // Throws
24,882✔
3446
}
24,882✔
3447

3448

3449
void ServerFile::group_unblock_work()
3450
{
37,356✔
3451
    REALM_ASSERT(!m_has_work_in_progress);
37,356✔
3452
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
37,356✔
3453
        unblock_work(); // Throws
37,350✔
3454
        const Work& work = m_work;
37,350✔
3455
        if (REALM_LIKELY(work.has_primary_work)) {
37,350✔
3456
            logger.trace("Work unit unblocked"); // Throws
37,346✔
3457
            m_has_work_in_progress = true;
37,346✔
3458
            Worker& worker = m_server.get_worker();
37,346✔
3459
            worker.enqueue(this); // Throws
37,346✔
3460
        }
37,346✔
3461
    }
37,350✔
3462
}
37,356✔
3463

3464

3465
void ServerFile::unblock_work()
3466
{
37,354✔
3467
    REALM_ASSERT(m_has_blocked_work);
37,354✔
3468

3469
    m_work.reset();
37,354✔
3470

3471
    // Discard requests for file identifiers whose receiver is no longer
3472
    // waiting.
3473
    {
37,354✔
3474
        auto i = m_file_ident_requests.begin();
37,354✔
3475
        auto end = m_file_ident_requests.end();
37,354✔
3476
        while (i != end) {
38,754✔
3477
            auto j = i++;
1,400✔
3478
            const FileIdentRequestInfo& info = j->second;
1,400✔
3479
            if (!info.receiver)
1,400✔
3480
                m_file_ident_requests.erase(j);
4✔
3481
        }
1,400✔
3482
    }
37,354✔
3483
    std::size_t n = m_file_ident_requests.size();
37,354✔
3484
    if (n > 0) {
37,354✔
3485
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,356✔
3486
        std::size_t i = 0;
1,356✔
3487
        for (const auto& pair : m_file_ident_requests) {
1,396✔
3488
            const FileIdentRequestInfo& info = pair.second;
1,396✔
3489
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,396✔
3490
            slot.proxy_file = info.proxy_file;
1,396✔
3491
            slot.client_type = info.client_type;
1,396✔
3492
            ++i;
1,396✔
3493
        }
1,396✔
3494
        m_work.has_primary_work = true;
1,356✔
3495
    }
1,356✔
3496

3497
    // FIXME: `ServerFile::m_changesets_from_downstream` and
3498
    // `Work::changesets_from_downstream` should be renamed to something else,
3499
    // as it may contain kinds of data other than changesets.
3500

3501
    using std::swap;
37,354✔
3502
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
37,354✔
3503
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
37,354✔
3504
    bool has_changesets = !m_work.changesets_from_downstream.empty();
37,354✔
3505
    if (has_changesets) {
37,354✔
3506
        m_work.has_primary_work = true;
35,994✔
3507
    }
35,994✔
3508

3509
    // Keep track of the size of pending changesets
3510
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
37,354✔
3511
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
37,354✔
3512
    m_blocked_changesets_from_downstream_byte_size = 0;
37,354✔
3513

3514
    m_num_changesets_from_downstream = 0;
37,354✔
3515
    m_has_blocked_work = false;
37,354✔
3516
}
37,354✔
3517

3518

3519
void ServerFile::resume_download() noexcept
3520
{
23,322✔
3521
    for (const auto& entry : m_identified_sessions) {
37,968✔
3522
        Session& sess = *entry.second;
37,968✔
3523
        sess.ensure_enlisted_to_send();
37,968✔
3524
    }
37,968✔
3525
}
23,322✔
3526

3527

3528
void ServerFile::recognize_external_change()
3529
{
4,800✔
3530
    VersionInfo prev_version_info = m_version_info;
4,800✔
3531
    const ServerHistory& history = access().history;       // Throws
4,800✔
3532
    bool has_upstream_status;                              // Dummy
4,800✔
3533
    sync::file_ident_type partial_file_ident;              // Dummy
4,800✔
3534
    sync::version_type partial_progress_reference_version; // Dummy
4,800✔
3535
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,800✔
3536
                       partial_progress_reference_version); // Throws
4,800✔
3537

3538
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,800✔
3539
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,800✔
3540
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,800✔
3541
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,798✔
3542
        resume_download();
4,798✔
3543
    }
4,798✔
3544
}
4,800✔
3545

3546

3547
// NOTE: This function is executed by the worker thread
3548
void ServerFile::worker_allocate_file_identifiers()
3549
{
1,350✔
3550
    Work& work = m_work;
1,350✔
3551
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,350✔
3552
    ServerHistory& hist = worker_access().history;                                      // Throws
1,350✔
3553
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,350✔
3554
    m_work.produced_new_realm_version = true;
1,350✔
3555
}
1,350✔
3556

3557

3558
// Returns true when, and only when this function produces a new sync version
3559
// (adds a new entry to the sync history).
3560
//
3561
// NOTE: This function is executed by the worker thread
3562
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3563
{
35,982✔
3564
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
35,982✔
3565

3566
    std::unique_ptr<ServerHistory> hist_ptr;
35,982✔
3567
    DBRef sg_ptr;
35,982✔
3568
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
35,982✔
3569
    bool backup_whole_realm = false;
35,982✔
3570
    bool produced_new_realm_version = hist.integrate_client_changesets(
35,982✔
3571
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
35,982✔
3572
        wlogger); // Throws
35,982✔
3573
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
35,982✔
3574
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
35,982✔
3575
    if (produced_new_realm_version) {
35,982✔
3576
        m_work.produced_new_realm_version = true;
35,954✔
3577
        if (produced_new_sync_version) {
35,954✔
3578
            m_work.produced_new_sync_version = true;
18,536✔
3579
        }
18,536✔
3580
    }
35,954✔
3581
    return produced_new_sync_version;
35,982✔
3582
}
35,982✔
3583

3584
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3585
                                                   DBRef& sg_ptr)
3586
{
35,980✔
3587
    if (state.use_file_cache)
35,980✔
3588
        return worker_access().history; // Throws
35,980✔
UNCOV
3589
    const std::string& path = m_worker_file.realm_path;
×
UNCOV
3590
    hist_ptr = m_server.make_history_for_path();                   // Throws
×
UNCOV
3591
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
×
UNCOV
3592
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
×
UNCOV
3593
    sg_ptr->claim_sync_agent();                                    // Throws
×
UNCOV
3594
    return *hist_ptr;                                              // Throws
×
3595
}
35,980✔
3596

3597

3598
// When worker thread finishes work unit.
3599
void ServerFile::group_postprocess_stage_1()
3600
{
37,018✔
3601
    REALM_ASSERT(m_has_work_in_progress);
37,018✔
3602

3603
    group_finalize_work_stage_1(); // Throws
37,018✔
3604
    group_finalize_work_stage_2(); // Throws
37,018✔
3605
    group_postprocess_stage_2();   // Throws
37,018✔
3606
}
37,018✔
3607

3608

3609
void ServerFile::group_postprocess_stage_2()
3610
{
37,020✔
3611
    REALM_ASSERT(m_has_work_in_progress);
37,020✔
3612
    group_postprocess_stage_3(); // Throws
37,020✔
3613
    // Suicide may have happened at this point
3614
}
37,020✔
3615

3616

3617
// When all files, including the reference file, have been backed up.
3618
void ServerFile::group_postprocess_stage_3()
3619
{
37,018✔
3620
    REALM_ASSERT(m_has_work_in_progress);
37,018✔
3621
    m_has_work_in_progress = false;
37,018✔
3622

3623
    logger.trace("Work unit postprocessing complete"); // Throws
37,018✔
3624
    if (m_has_blocked_work)
37,018✔
3625
        group_unblock_work(); // Throws
12,472✔
3626
}
37,018✔
3627

3628

3629
void ServerFile::finalize_work_stage_1()
3630
{
37,020✔
3631
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
37,020✔
3632
        // Report the byte size of completed downstream changesets.
3633
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
18,544✔
3634
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
18,544✔
3635
        m_unblocked_changesets_from_downstream_byte_size = 0;
18,544✔
3636
    }
18,544✔
3637

3638
    // Deal with errors (bad changesets) pertaining to downstream clients
3639
    std::size_t num_changesets_removed = 0;
37,020✔
3640
    std::size_t num_bytes_removed = 0;
37,020✔
3641
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
37,020✔
3642
        file_ident_type client_file_ident = entry.first;
20✔
3643
        ExtendedIntegrationError error = entry.second;
20✔
3644
        ProtocolError error_2 = ProtocolError::other_session_error;
20✔
3645
        switch (error) {
20✔
3646
            case ExtendedIntegrationError::client_file_expired:
✔
3647
                logger.debug("Changeset integration failed: Client file entry "
×
3648
                             "expired during session"); // Throws
×
3649
                error_2 = ProtocolError::client_file_expired;
×
3650
                break;
×
3651
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3652
                error_2 = ProtocolError::bad_origin_file_ident;
×
3653
                break;
×
3654
            case ExtendedIntegrationError::bad_changeset:
20✔
3655
                error_2 = ProtocolError::bad_changeset;
20✔
3656
                break;
20✔
3657
        }
20✔
3658
        auto i = m_identified_sessions.find(client_file_ident);
20✔
3659
        if (i != m_identified_sessions.end()) {
20✔
3660
            Session& sess = *i->second;
20✔
3661
            SyncConnection& conn = sess.get_connection();
20✔
3662
            conn.protocol_error(error_2, &sess); // Throws
20✔
3663
        }
20✔
3664
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
20✔
3665
        std::size_t num_changesets = list.changesets.size();
20✔
3666
        std::size_t num_bytes = 0;
20✔
3667
        for (const IntegratableChangeset& ic : list.changesets)
20✔
3668
            num_bytes += ic.changeset.size();
×
3669
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
20✔
3670
                    client_file_ident); // Throws
20✔
3671
        num_changesets_removed += num_changesets;
20✔
3672
        num_bytes_removed += num_bytes;
20✔
3673
        m_changesets_from_downstream.erase(client_file_ident);
20✔
3674
    }
20✔
3675

3676
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
37,020✔
3677
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
37,020✔
3678

3679
    if (num_changesets_removed == 0)
37,020✔
3680
        return;
37,020✔
3681

3682
    m_num_changesets_from_downstream -= num_changesets_removed;
×
3683

3684
    // The byte size of the blocked changesets must be decremented.
3685
    if (num_bytes_removed > 0) {
×
3686
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3687
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3688
    }
×
3689
}
×
3690

3691

3692
void ServerFile::finalize_work_stage_2()
3693
{
37,020✔
3694
    // Expose new snapshot to remote peers
3695
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
37,020✔
3696
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
37,020✔
3697
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
37,000✔
3698
        m_version_info = m_work.version_info;
37,000✔
3699
    }
37,000✔
3700

3701
    bool resume_download_and_upload = m_work.produced_new_sync_version;
37,020✔
3702

3703
    // Deliver allocated file identifiers to requesters
3704
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
37,020✔
3705
    auto begin = m_file_ident_requests.begin();
37,020✔
3706
    auto i = begin;
37,020✔
3707
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
37,020✔
3708
        FileIdentRequestInfo& info = i->second;
1,376✔
3709
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,376✔
3710
        REALM_ASSERT(info.client_type == slot.client_type);
1,376✔
3711
        if (FileIdentReceiver* receiver = info.receiver) {
1,376✔
3712
            info.receiver = nullptr;
1,344✔
3713
            receiver->receive_file_ident(slot.file_ident); // Throws
1,344✔
3714
        }
1,344✔
3715
        ++i;
1,376✔
3716
    }
1,376✔
3717
    m_file_ident_requests.erase(begin, i);
37,020✔
3718

3719
    // Resume download to downstream clients
3720
    if (resume_download_and_upload) {
37,020✔
3721
        resume_download();
18,524✔
3722
    }
18,524✔
3723
}
37,020✔
3724

3725
// ============================ Worker implementation ============================
3726

3727
Worker::Worker(ServerImpl& server)
3728
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, "Worker: ", server.logger_ptr)}
552✔
3729
    // Throws
3730
    , logger(*logger_ptr)
552✔
3731
    , m_server{server}
552✔
3732
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
552✔
3733
{
1,208✔
3734
    util::seed_prng_nondeterministically(m_random); // Throws
1,208✔
3735
}
1,208✔
3736

3737

3738
void Worker::enqueue(ServerFile* file)
3739
{
37,354✔
3740
    util::LockGuard lock{m_mutex};
37,354✔
3741
    m_queue.push_back(file); // Throws
37,354✔
3742
    m_cond.notify_all();
37,354✔
3743
}
37,354✔
3744

3745

3746
std::mt19937_64& Worker::server_history_get_random() noexcept
3747
{
2,428✔
3748
    return m_random;
2,428✔
3749
}
2,428✔
3750

3751

3752
void Worker::run()
3753
{
1,152✔
3754
    for (;;) {
38,478✔
3755
        ServerFile* file = nullptr;
38,478✔
3756
        {
38,478✔
3757
            util::LockGuard lock{m_mutex};
38,478✔
3758
            for (;;) {
76,404✔
3759
                if (REALM_UNLIKELY(m_stop))
76,404✔
3760
                    return;
1,152✔
3761
                if (!m_queue.empty()) {
75,252✔
3762
                    file = m_queue.front();
37,326✔
3763
                    m_queue.pop_front();
37,326✔
3764
                    break;
37,326✔
3765
                }
37,326✔
3766
                m_cond.wait(lock);
37,926✔
3767
            }
37,926✔
3768
        }
38,478✔
3769
        file->worker_process_work_unit(m_state); // Throws
37,326✔
3770
    }
37,326✔
3771
}
1,152✔
3772

3773

3774
void Worker::stop() noexcept
3775
{
1,152✔
3776
    util::LockGuard lock{m_mutex};
1,152✔
3777
    m_stop = true;
1,152✔
3778
    m_cond.notify_all();
1,152✔
3779
}
1,152✔
3780

3781

3782
// ============================ ServerImpl implementation ============================
3783

3784
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3785
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::server, std::move(config.logger))}
552✔
3786
    , logger{*logger_ptr}
552✔
3787
    , m_config{std::move(config)}
552✔
3788
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
552✔
3789
    , m_root_dir{root_dir} // Throws
552✔
3790
    , m_access_control{std::move(pkey)}
552✔
3791
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
552✔
3792
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
552✔
3793
    , m_worker{*this}                                                                    // Throws
552✔
3794
    , m_acceptor{get_service()}
552✔
3795
    , m_server_protocol{}       // Throws
552✔
3796
    , m_compress_memory_arena{} // Throws
552✔
3797
{
1,208✔
3798
    if (m_config.ssl) {
1,208✔
3799
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3800
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3801
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3802
    }
24✔
3803
}
1,208✔
3804

3805

3806
ServerImpl::~ServerImpl() noexcept
3807
{
1,208✔
3808
    bool server_destroyed_while_still_running = m_running;
1,208✔
3809
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
1,208✔
3810
}
1,208✔
3811

3812

3813
void ServerImpl::start()
3814
{
1,208✔
3815
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
1,208✔
3816
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
1,208✔
3817
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
1,208✔
3818
                m_protocol_version_range.first,
1,208✔
3819
                m_protocol_version_range.second); // Throws
1,208✔
3820
    logger.info("Platform: %1", util::get_platform_info());
1,208✔
3821
    bool is_debug_build = false;
1,208✔
3822
#if REALM_DEBUG
1,208✔
3823
    is_debug_build = true;
1,208✔
3824
#endif
1,208✔
3825
    {
1,208✔
3826
        const char* lead_text = "Build mode";
1,208✔
3827
        if (is_debug_build) {
1,208✔
3828
            logger.info("%1: Debug", lead_text); // Throws
1,208✔
3829
        }
1,208✔
3830
        else {
×
3831
            logger.info("%1: Release", lead_text); // Throws
×
3832
        }
×
3833
    }
1,208✔
3834
    if (is_debug_build) {
1,208✔
3835
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
1,208✔
3836
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
1,208✔
3837
    }
1,208✔
3838
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
1,208✔
3839
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
1,208✔
3840
    {
1,208✔
3841
        const char* lead_text = "Encryption";
1,208✔
3842
        if (m_config.encryption_key) {
1,208✔
3843
            logger.info("%1: Yes", lead_text); // Throws
4✔
3844
        }
4✔
3845
        else {
1,204✔
3846
            logger.info("%1: No", lead_text); // Throws
1,204✔
3847
        }
1,204✔
3848
    }
1,208✔
3849
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
1,208✔
3850
    {
1,208✔
3851
        const char* lead_text = "Disable sync to disk";
1,208✔
3852
        if (m_config.disable_sync_to_disk) {
1,208✔
3853
            logger.info("%1: All files", lead_text); // Throws
504✔
3854
        }
504✔
3855
        else {
704✔
3856
            logger.info("%1: No", lead_text); // Throws
704✔
3857
        }
704✔
3858
    }
1,208✔
3859
    if (m_config.disable_sync_to_disk) {
1,208✔
3860
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
504✔
3861
                    "never do this in production!"); // Throws
504✔
3862
    }
504✔
3863
    logger.info("Download bootstrap caching: %1",
1,208✔
3864
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
1,208✔
3865
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
1,208✔
3866
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
1,208✔
3867
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
1,208✔
3868
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
1,208✔
3869
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
1,208✔
3870
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
1,208✔
3871
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
1,208✔
3872
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
1,208✔
3873

3874
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
1,208✔
3875

3876
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
1,208✔
3877

3878
    listen(); // Throws
1,208✔
3879
}
1,208✔
3880

3881

3882
void ServerImpl::run()
3883
{
1,152✔
3884
    auto ta = util::make_temp_assign(m_running, true);
1,152✔
3885

3886
    {
1,152✔
3887
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
1,152✔
3888
        std::string name;
1,152✔
3889
        if (util::Thread::get_name(name)) {
1,152✔
3890
            name += "-worker";
628✔
3891
            worker_thread.start_with_signals_blocked(name); // Throws
628✔
3892
        }
628✔
3893
        else {
524✔
3894
            worker_thread.start_with_signals_blocked(); // Throws
524✔
3895
        }
524✔
3896

3897
        m_service.run(); // Throws
1,152✔
3898

3899
        worker_thread.stop_and_rethrow(); // Throws
1,152✔
3900
    }
1,152✔
3901

3902
    logger.info("Realm sync server stopped");
1,152✔
3903
}
1,152✔
3904

3905

3906
void ServerImpl::stop() noexcept
3907
{
2,062✔
3908
    util::LockGuard lock{m_mutex};
2,062✔
3909
    if (m_stopped)
2,062✔
3910
        return;
854✔
3911
    m_stopped = true;
1,208✔
3912
    m_wait_or_service_stopped_cond.notify_all();
1,208✔
3913
    m_service.stop();
1,208✔
3914
}
1,208✔
3915

3916

3917
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3918
{
24,292✔
3919
    m_pending_changesets_from_downstream_byte_size += byte_size;
24,292✔
3920
    logger.debug("Byte size for pending downstream changesets incremented by "
24,292✔
3921
                 "%1 to reach a total of %2",
24,292✔
3922
                 byte_size,
24,292✔
3923
                 m_pending_changesets_from_downstream_byte_size); // Throws
24,292✔
3924
}
24,292✔
3925

3926

3927
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3928
{
18,544✔
3929
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
18,544✔
3930
    m_pending_changesets_from_downstream_byte_size -= byte_size;
18,544✔
3931
    logger.debug("Byte size for pending downstream changesets decremented by "
18,544✔
3932
                 "%1 to reach a total of %2",
18,544✔
3933
                 byte_size,
18,544✔
3934
                 m_pending_changesets_from_downstream_byte_size); // Throws
18,544✔
3935
}
18,544✔
3936

3937

3938
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3939
{
1,142✔
3940
    return get_random();
1,142✔
3941
}
1,142✔
3942

3943

3944
void ServerImpl::listen()
3945
{
1,208✔
3946
    network::Resolver resolver{get_service()};
1,208✔
3947
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
1,208✔
3948
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
1,208✔
3949
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
1,208✔
3950

3951
    auto i = endpoints.begin();
1,208✔
3952
    auto end = endpoints.end();
1,208✔
3953
    for (;;) {
1,208✔
3954
        std::error_code ec;
1,208✔
3955
        m_acceptor.open(i->protocol(), ec);
1,208✔
3956
        if (!ec) {
1,208✔
3957
            using SocketBase = network::SocketBase;
1,208✔
3958
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
1,208✔
3959
            if (!ec) {
1,208✔
3960
                m_acceptor.bind(*i, ec);
1,208✔
3961
                if (!ec)
1,208✔
3962
                    break;
1,208✔
3963
            }
1,208✔
3964
            m_acceptor.close();
×
3965
        }
×
3966
        if (i + 1 == end) {
×
3967
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
3968
                // FIXME: We don't have the error code for previous attempts, so
3969
                // can't print a nice message.
3970
                logger.error("Failed to bind to %1:%2", i2->address(),
×
3971
                             i2->port()); // Throws
×
3972
            }
×
3973
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
3974
                         ec.message()); // Throws
×
3975
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
3976
        }
×
3977
    }
×
3978

3979
    m_acceptor.listen(m_config.listen_backlog);
1,208✔
3980

3981
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
1,208✔
3982
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
1,208✔
3983
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
1,208✔
3984
                m_config.listen_backlog, ssl_mode); // Throws
1,208✔
3985

3986
    initiate_accept();
1,208✔
3987
}
1,208✔
3988

3989

3990
void ServerImpl::initiate_accept()
3991
{
3,326✔
3992
    auto handler = [this](std::error_code ec) {
3,326✔
3993
        if (ec != util::error::operation_aborted)
2,118✔
3994
            handle_accept(ec);
2,116✔
3995
    };
2,118✔
3996
    bool is_ssl = bool(m_ssl_context);
3,326✔
3997
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
3,326✔
3998
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
3,326✔
3999
}
3,326✔
4000

4001

4002
void ServerImpl::handle_accept(std::error_code ec)
4003
{
2,116✔
4004
    if (ec) {
2,116✔
4005
        if (ec != util::error::connection_aborted) {
×
4006
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4007

4008
            // We close the reserved files to get a few extra file descriptors.
4009
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4010
                m_reserved_files[i].reset();
×
4011
            }
×
4012

4013
            // FIXME: There are probably errors that need to be treated
4014
            // specially, and not cause the server to "crash".
4015

4016
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4017
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4018
                             "consider increasing the limit in your system config"); // Throws
×
4019
                throw OutOfFilesError(ec);
×
4020
            }
×
4021
            else {
×
4022
                throw std::system_error(ec);
×
4023
            }
×
4024
        }
×
4025
        logger.debug("Skipping aborted connection"); // Throws
×
4026
    }
×
4027
    else {
2,116✔
4028
        HTTPConnection& conn = *m_next_http_conn;
2,116✔
4029
        if (m_config.tcp_no_delay)
2,116✔
4030
            conn.get_socket().set_option(network::SocketBase::no_delay(true));  // Throws
1,760✔
4031
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn)); // Throws
2,116✔
4032
        Formatter& formatter = m_misc_buffers.formatter;
2,116✔
4033
        formatter.reset();
2,116✔
4034
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
2,116✔
4035
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
2,116✔
4036
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
2,116✔
4037
    }
2,116✔
4038
    initiate_accept(); // Throws
2,116✔
4039
}
2,116✔
4040

4041

4042
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4043
{
2,118✔
4044
    m_http_connections.erase(conn_id);
2,118✔
4045
}
2,118✔
4046

4047

4048
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4049
{
2,076✔
4050
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
2,076✔
4051
}
2,076✔
4052

4053

4054
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4055
{
1,236✔
4056
    m_sync_connections.erase(connection_id);
1,236✔
4057
}
1,236✔
4058

4059

4060
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4061
{
4✔
4062
    get_service().post([this, timeout](Status) {
4✔
4063
        m_config.connection_reaper_timeout = timeout;
4✔
4064
    });
4✔
4065
}
4✔
4066

4067

4068
void ServerImpl::close_connections()
4069
{
16✔
4070
    get_service().post([this](Status) {
16✔
4071
        do_close_connections(); // Throws
16✔
4072
    });
16✔
4073
}
16✔
4074

4075

4076
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4077
{
72✔
4078
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4079
}
72✔
4080

4081

4082
void ServerImpl::recognize_external_change(const std::string& virt_path)
4083
{
4,800✔
4084
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4085
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4086
        do_recognize_external_change(virt_path); // Throws
4,800✔
4087
    });                                          // Throws
4,800✔
4088
}
4,800✔
4089

4090

4091
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4092
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4093
{
×
4094
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4095
                "timeout = %1",
×
4096
                timeout); // Throws
×
4097

4098
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4099
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4100
                                                    timeout); // Throws
×
4101
    });
×
4102
}
×
4103

4104

4105
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4106
{
1,350✔
4107
    m_connection_reaper_timer.emplace(get_service());
1,350✔
4108
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
1,350✔
4109
        if (status != ErrorCodes::OperationAborted) {
142✔
4110
            reap_connections();                        // Throws
142✔
4111
            initiate_connection_reaper_timer(timeout); // Throws
142✔
4112
        }
142✔
4113
    }); // Throws
142✔
4114
}
1,350✔
4115

4116

4117
void ServerImpl::reap_connections()
4118
{
142✔
4119
    logger.debug("Discarding dead connections"); // Throws
142✔
4120
    SteadyTimePoint now = steady_clock_now();
142✔
4121
    {
142✔
4122
        auto end = m_http_connections.end();
142✔
4123
        auto i = m_http_connections.begin();
142✔
4124
        while (i != end) {
144✔
4125
            HTTPConnection& conn = *i->second;
2✔
4126
            ++i;
2✔
4127
            // Suicide
4128
            conn.terminate_if_dead(now); // Throws
2✔
4129
        }
2✔
4130
    }
142✔
4131
    {
142✔
4132
        auto end = m_sync_connections.end();
142✔
4133
        auto i = m_sync_connections.begin();
142✔
4134
        while (i != end) {
280✔
4135
            SyncConnection& conn = *i->second;
138✔
4136
            ++i;
138✔
4137
            // Suicide
4138
            conn.terminate_if_dead(now); // Throws
138✔
4139
        }
138✔
4140
    }
142✔
4141
}
142✔
4142

4143

4144
void ServerImpl::do_close_connections()
4145
{
16✔
4146
    for (auto& entry : m_sync_connections) {
16✔
4147
        SyncConnection& conn = *entry.second;
16✔
4148
        conn.initiate_soft_close(); // Throws
16✔
4149
    }
16✔
4150
}
16✔
4151

4152

4153
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4154
{
4,800✔
4155
    auto i = m_files.find(virt_path);
4,800✔
4156
    if (i == m_files.end())
4,800✔
4157
        return;
×
4158
    ServerFile& file = *i->second;
4,800✔
4159
    file.recognize_external_change();
4,800✔
4160
}
4,800✔
4161

4162

4163
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4164
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4165
{
×
4166
    static_cast<void>(timeout);
×
4167
    if (m_sync_stopped)
×
4168
        return;
×
4169
    do_close_connections(); // Throws
×
4170
    m_sync_stopped = true;
×
4171
    bool completion_reached = false;
×
4172
    completion_handler(completion_reached); // Throws
×
4173
}
×
4174

4175

4176
// ============================ SyncConnection implementation ============================
4177

4178
SyncConnection::~SyncConnection() noexcept
4179
{
2,076✔
4180
    m_sessions_enlisted_to_send.clear();
2,076✔
4181
    m_sessions.clear();
2,076✔
4182
}
2,076✔
4183

4184

4185
void SyncConnection::initiate()
4186
{
2,076✔
4187
    m_last_activity_at = steady_clock_now();
2,076✔
4188
    logger.debug("Sync Connection initiated");
2,076✔
4189
    m_websocket.initiate_server_websocket_after_handshake();
2,076✔
4190
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
2,076✔
4191
                     m_appservices_request_id);
2,076✔
4192
}
2,076✔
4193

4194

4195
template <class... Params>
4196
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4197
{
1,236✔
4198
    terminate_sessions();                              // Throws
1,236✔
4199
    logger.log(log_level, log_message, log_params...); // Throws
1,236✔
4200
    m_websocket.stop();
1,236✔
4201
    m_ssl_stream.reset();
1,236✔
4202
    m_socket.reset();
1,236✔
4203
    // Suicide
4204
    m_server.remove_sync_connection(m_id);
1,236✔
4205
}
1,236✔
4206

4207

4208
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4209
{
138✔
4210
    milliseconds_type time = steady_duration(m_last_activity_at, now);
138✔
4211
    const Server::Config& config = m_server.get_config();
138✔
4212
    if (m_is_closing) {
138✔
4213
        if (time >= config.soft_close_timeout) {
×
4214
            // Suicide
4215
            terminate(Logger::Level::detail,
×
4216
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4217
        }
×
4218
    }
×
4219
    else {
138✔
4220
        if (time >= config.connection_reaper_timeout) {
138✔
4221
            // Suicide
4222
            terminate(Logger::Level::detail,
4✔
4223
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4224
        }
4✔
4225
    }
138✔
4226
}
138✔
4227

4228

4229
void SyncConnection::enlist_to_send(Session* sess) noexcept
4230
{
111,256✔
4231
    REALM_ASSERT(m_send_trigger);
111,256✔
4232
    REALM_ASSERT(!m_is_closing);
111,256✔
4233
    REALM_ASSERT(!sess->is_enlisted_to_send());
111,256✔
4234
    m_sessions_enlisted_to_send.push_back(sess);
111,256✔
4235
    m_send_trigger->trigger();
111,256✔
4236
}
111,256✔
4237

4238

4239
void SyncConnection::handle_protocol_error(Status status)
4240
{
×
4241
    logger.error("%1", status);
×
4242
    switch (status.code()) {
×
4243
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4244
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4245
            break;
×
4246
        case ErrorCodes::LimitExceeded:
×
4247
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4248
            break;
×
4249
        default:
×
4250
            protocol_error(ProtocolError::other_error);
×
4251
            break;
×
4252
    }
×
4253
}
×
4254

4255
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4256
                                          std::string signed_user_token, bool need_client_file_ident,
4257
                                          bool is_subserver)
4258
{
5,304✔
4259
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
5,304✔
4260
    bool was_inserted = p.second;
5,304✔
4261
    if (REALM_UNLIKELY(!was_inserted)) {
5,304✔
4262
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4263
                     session_ident);                           // Throws
×
4264
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4265
        return;
×
4266
    }
×
4267
    try {
5,304✔
4268
        p.first->second.reset(new Session(*this, session_ident)); // Throws
5,304✔
4269
    }
5,304✔
4270
    catch (...) {
5,304✔
4271
        m_sessions.erase(p.first);
×
4272
        throw;
×
4273
    }
×
4274

4275
    Session& sess = *p.first->second;
5,300✔
4276
    sess.initiate(); // Throws
5,300✔
4277
    ProtocolError error;
5,300✔
4278
    bool success =
5,300✔
4279
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
5,300✔
4280
                                  error); // Throws
5,300✔
4281
    if (REALM_UNLIKELY(!success))         // Throws
5,300✔
4282
        protocol_error(error, &sess);     // Throws
28✔
4283
}
5,300✔
4284

4285

4286
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4287
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4288
                                           version_type scan_client_version, version_type latest_server_version,
4289
                                           salt_type latest_server_version_salt)
4290
{
4,356✔
4291
    auto i = m_sessions.find(session_ident);
4,356✔
4292
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,356✔
4293
        bad_session_ident("IDENT", session_ident); // Throws
×
4294
        return;
×
4295
    }
×
4296
    Session& sess = *i->second;
4,356✔
4297
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,356✔
4298
        message_after_unbind("IDENT", session_ident); // Throws
×
4299
        return;
×
4300
    }
×
4301
    if (REALM_UNLIKELY(sess.error_occurred())) {
4,356✔
4302
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4303
        // messages, other than UNBIND, must be ignored.
4304
        return;
16✔
4305
    }
16✔
4306
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
4,340✔
4307
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4308
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4309
        return;
×
4310
    }
×
4311
    if (REALM_UNLIKELY(sess.ident_message_received())) {
4,340✔
4312
        logger.error("Received second IDENT message for session"); // Throws
×
4313
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4314
        return;
×
4315
    }
×
4316

4317
    ProtocolError error = {};
4,340✔
4318
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
4,340✔
4319
                                              scan_client_version, latest_server_version, latest_server_version_salt,
4,340✔
4320
                                              error); // Throws
4,340✔
4321
    if (REALM_UNLIKELY(!success))                     // Throws
4,340✔
4322
        protocol_error(error, &sess);                 // Throws
32✔
4323
}
4,340✔
4324

4325
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4326
                                            version_type progress_server_version, version_type locked_server_version,
4327
                                            const UploadChangesets& upload_changesets)
4328
{
44,774✔
4329
    auto i = m_sessions.find(session_ident);
44,774✔
4330
    if (REALM_UNLIKELY(i == m_sessions.end())) {
44,774✔
4331
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4332
        return;
×
4333
    }
×
4334
    Session& sess = *i->second;
44,774✔
4335
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
44,774✔
4336
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4337
        return;
×
4338
    }
×
4339
    if (REALM_UNLIKELY(sess.error_occurred())) {
44,774✔
4340
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4341
        // messages, other than UNBIND, must be ignored.
4342
        return;
×
4343
    }
×
4344
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
44,774✔
4345
        message_before_ident("UPLOAD", session_ident); // Throws
×
4346
        return;
×
4347
    }
×
4348

4349
    ProtocolError error = {};
44,774✔
4350
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
44,774✔
4351
                                               locked_server_version, upload_changesets, error); // Throws
44,774✔
4352
    if (REALM_UNLIKELY(!success))                                                                // Throws
44,774✔
4353
        protocol_error(error, &sess);                                                            // Throws
×
4354
}
44,774✔
4355

4356

4357
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4358
{
12,062✔
4359
    auto i = m_sessions.find(session_ident);
12,062✔
4360
    if (REALM_UNLIKELY(i == m_sessions.end())) {
12,062✔
4361
        bad_session_ident("MARK", session_ident);
×
4362
        return;
×
4363
    }
×
4364
    Session& sess = *i->second;
12,062✔
4365
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
12,062✔
4366
        message_after_unbind("MARK", session_ident); // Throws
×
4367
        return;
×
4368
    }
×
4369
    if (REALM_UNLIKELY(sess.error_occurred())) {
12,062✔
4370
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4371
        // messages, other than UNBIND, must be ignored.
4372
        return;
48✔
4373
    }
48✔
4374
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
12,014✔
4375
        message_before_ident("MARK", session_ident); // Throws
×
4376
        return;
×
4377
    }
×
4378

4379
    ProtocolError error;
12,014✔
4380
    bool success = sess.receive_mark_message(request_ident, error); // Throws
12,014✔
4381
    if (REALM_UNLIKELY(!success))                                   // Throws
12,014✔
4382
        protocol_error(error, &sess);                               // Throws
×
4383
}
12,014✔
4384

4385

4386
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4387
{
2,394✔
4388
    auto i = m_sessions.find(session_ident); // Throws
2,394✔
4389
    if (REALM_UNLIKELY(i == m_sessions.end())) {
2,394✔
4390
        bad_session_ident("UNBIND", session_ident); // Throws
×
4391
        return;
×
4392
    }
×
4393
    Session& sess = *i->second;
2,394✔
4394
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
2,394✔
4395
        message_after_unbind("UNBIND", session_ident); // Throws
×
4396
        return;
×
4397
    }
×
4398

4399
    sess.receive_unbind_message(); // Throws
2,394✔
4400
    // NOTE: The session might have gotten destroyed at this time!
4401
}
2,394✔
4402

4403

4404
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4405
{
192✔
4406
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
192✔
4407
    m_send_pong = true;
192✔
4408
    m_last_ping_timestamp = timestamp;
192✔
4409
    if (!m_is_sending)
192✔
4410
        send_next_message();
190✔
4411
}
192✔
4412

4413

4414
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4415
                                           std::string_view error_body)
4416
{
×
4417
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4418
                 session_ident); // Throws
×
4419
    auto i = m_sessions.find(session_ident);
×
4420
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4421
        bad_session_ident("ERROR", session_ident);
×
4422
        return;
×
4423
    }
×
4424
    Session& sess = *i->second;
×
4425
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4426
        message_after_unbind("ERROR", session_ident); // Throws
×
4427
        return;
×
4428
    }
×
4429

4430
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4431
}
×
4432

4433
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4434
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4435
{
6,382✔
4436
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
6,382✔
4437
        return logger.log(level, message.c_str());
×
4438
    }
×
4439

4440
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
6,382✔
4441
    {
6,382✔
4442
        std::lock_guard lock(m_log_mutex);
6,382✔
4443
        m_log_messages.push(std::move(log_msg));
6,382✔
4444
    }
6,382✔
4445
    m_send_trigger->trigger();
6,382✔
4446
}
6,382✔
4447

4448

4449
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4450
{
×
4451
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4452
                 session_ident);                      // Throws
×
4453
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4454
}
×
4455

4456

4457
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4458
{
×
4459
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4460
                 session_ident);                      // Throws
×
4461
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4462
}
×
4463

4464

4465
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4466
{
×
4467
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4468
                 session_ident);                      // Throws
×
4469
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4470
}
×
4471

4472

4473
void SyncConnection::handle_message_received(const char* data, size_t size)
4474
{
69,078✔
4475
    // parse_message_received() parses the message and calls the
4476
    // proper handler on the SyncConnection object (this).
4477
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
69,078✔
4478
    return;
69,078✔
4479
}
69,078✔
4480

4481

4482
void SyncConnection::handle_ping_received(const char* data, size_t size)
4483
{
×
4484
    // parse_message_received() parses the message and calls the
4485
    // proper handler on the SyncConnection object (this).
4486
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4487
    return;
×
4488
}
×
4489

4490

4491
void SyncConnection::send_next_message()
4492
{
108,122✔
4493
    REALM_ASSERT(!m_is_sending);
108,122✔
4494
    REALM_ASSERT(!m_sending_pong);
108,122✔
4495
    if (m_send_pong) {
108,122✔
4496
        send_pong(m_last_ping_timestamp);
192✔
4497
        if (m_sending_pong)
192✔
4498
            return;
192✔
4499
    }
192✔
4500
    for (;;) {
161,084✔
4501
        Session* sess = m_sessions_enlisted_to_send.pop_front();
161,084✔
4502
        if (!sess) {
161,084✔
4503
            // No sessions were enlisted to send
4504
            if (REALM_LIKELY(!m_is_closing))
50,016✔
4505
                break; // Check to see if there are any log messages to go out
50,000✔
4506
            // Send a connection level ERROR
4507
            REALM_ASSERT(!is_session_level_error(m_error_code));
16✔
4508
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
16✔
4509
            return;
16✔
4510
        }
50,016✔
4511
        sess->send_message(); // Throws
111,068✔
4512
        // NOTE: The session might have gotten destroyed at this time!
4513

4514
        // At this point, `m_is_sending` is true if, and only if the session
4515
        // chose to send a message. If it chose to not send a message, we must
4516
        // loop back and give the next session in `m_sessions_enlisted_to_send`
4517
        // a chance.
4518
        if (m_is_sending)
111,068✔
4519
            return;
57,928✔
4520
    }
111,068✔
4521
    {
49,986✔
4522
        std::lock_guard lock(m_log_mutex);
49,986✔
4523
        if (!m_log_messages.empty()) {
49,986✔
4524
            send_log_message(m_log_messages.front());
6,240✔
4525
            m_log_messages.pop();
6,240✔
4526
        }
6,240✔
4527
    }
49,986✔
4528
    // Otherwise, nothing to do
4529
}
49,986✔
4530

4531

4532
void SyncConnection::initiate_write_output_buffer()
4533
{
64,178✔
4534
    auto handler = [this](std::error_code ec, size_t) {
64,178✔
4535
        if (!ec) {
64,154✔
4536
            handle_write_output_buffer();
64,032✔
4537
        }
64,032✔
4538
    };
64,154✔
4539

4540
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
64,178✔
4541
                                   std::move(handler)); // Throws
64,178✔
4542
    m_is_sending = true;
64,178✔
4543
}
64,178✔
4544

4545

4546
void SyncConnection::initiate_pong_output_buffer()
4547
{
192✔
4548
    auto handler = [this](std::error_code ec, size_t) {
192✔
4549
        if (!ec) {
192✔
4550
            handle_pong_output_buffer();
192✔
4551
        }
192✔
4552
    };
192✔
4553

4554
    REALM_ASSERT(!m_is_sending);
192✔
4555
    REALM_ASSERT(!m_sending_pong);
192✔
4556
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
192✔
4557
                                   std::move(handler)); // Throws
192✔
4558

4559
    m_is_sending = true;
192✔
4560
    m_sending_pong = true;
192✔
4561
}
192✔
4562

4563

4564
void SyncConnection::send_pong(milliseconds_type timestamp)
4565
{
192✔
4566
    REALM_ASSERT(m_send_pong);
192✔
4567
    REALM_ASSERT(!m_sending_pong);
192✔
4568
    m_send_pong = false;
192✔
4569
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
192✔
4570

4571
    OutputBuffer& out = get_output_buffer();
192✔
4572
    get_server_protocol().make_pong(out, timestamp); // Throws
192✔
4573

4574
    initiate_pong_output_buffer(); // Throws
192✔
4575
}
192✔
4576

4577
void SyncConnection::send_log_message(const LogMessage& log_msg)
4578
{
6,244✔
4579
    OutputBuffer& out = get_output_buffer();
6,244✔
4580
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
6,244✔
4581
                                           log_msg.co_id); // Throws
6,244✔
4582

4583
    initiate_write_output_buffer(); // Throws
6,244✔
4584
}
6,244✔
4585

4586

4587
void SyncConnection::handle_write_output_buffer()
4588
{
64,034✔
4589
    release_output_buffer();
64,034✔
4590
    m_is_sending = false;
64,034✔
4591
    send_next_message(); // Throws
64,034✔
4592
}
64,034✔
4593

4594

4595
void SyncConnection::handle_pong_output_buffer()
4596
{
192✔
4597
    release_output_buffer();
192✔
4598
    REALM_ASSERT(m_is_sending);
192✔
4599
    REALM_ASSERT(m_sending_pong);
192✔
4600
    m_is_sending = false;
192✔
4601
    m_sending_pong = false;
192✔
4602
    send_next_message(); // Throws
192✔
4603
}
192✔
4604

4605

4606
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4607
{
16✔
4608
    const char* message = get_protocol_error_message(int(error_code));
16✔
4609
    std::size_t message_size = std::strlen(message);
16✔
4610
    bool try_again = determine_try_again(error_code);
16✔
4611

4612
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
16✔
4613
                  message_size, try_again, session_ident); // Throws
16✔
4614

4615
    OutputBuffer& out = get_output_buffer();
16✔
4616
    int protocol_version = get_client_protocol_version();
16✔
4617
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
16✔
4618
                                             session_ident); // Throws
16✔
4619

4620
    auto handler = [this](std::error_code ec, size_t) {
16✔
4621
        handle_write_error(ec); // Throws
16✔
4622
    };
16✔
4623
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
16✔
4624
    m_is_sending = true;
16✔
4625
}
16✔
4626

4627

4628
void SyncConnection::handle_write_error(std::error_code ec)
4629
{
16✔
4630
    m_is_sending = false;
16✔
4631
    REALM_ASSERT(m_is_closing);
16✔
4632
    if (!m_ssl_stream) {
16✔
4633
        m_socket->shutdown(network::Socket::shutdown_send, ec);
16✔
4634
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
16!
4635
            throw std::system_error(ec);
×
4636
    }
16✔
4637
}
16✔
4638

4639

4640
// For connection level errors, `sess` is ignored. For session level errors, a
4641
// session must be specified.
4642
//
4643
// If a session is specified, that session object will have been detached from
4644
// the ServerFile object (and possibly destroyed) upon return from
4645
// protocol_error().
4646
//
4647
// If a session is specified for a protocol level error, that session object
4648
// will have been destroyed upon return from protocol_error(). For session level
4649
// errors, the specified session will have been destroyed upon return from
4650
// protocol_error() if, and only if the negotiated protocol version is less than
4651
// 23.
4652
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4653
{
80✔
4654
    REALM_ASSERT(!m_is_closing);
80✔
4655
    bool session_level = is_session_level_error(error_code);
80✔
4656
    REALM_ASSERT(!session_level || sess);
80✔
4657
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
80✔
4658
    if (logger.would_log(util::Logger::Level::debug)) {
80✔
4659
        const char* message = get_protocol_error_message(int(error_code));
×
4660
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4661
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4662
    }
×
4663
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
80✔
4664
    if (session_level) {
80✔
4665
        sess->initiate_deactivation(error_code); // Throws
80✔
4666
        return;
80✔
4667
    }
80✔
4668
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4669
}
×
4670

4671

4672
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4673
{
16✔
4674
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
16✔
4675

4676
    // With recent versions of the protocol (when the version is greater than,
4677
    // or equal to 23), this function will only be called for connection level
4678
    // errors, never for session specific errors. However, for the purpose of
4679
    // emulating earlier protocol versions, this function might be called for
4680
    // session specific errors too.
4681
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
16✔
4682
    REALM_ASSERT(!is_session_level_error(error_code));
16✔
4683

4684
    REALM_ASSERT(m_send_trigger);
16✔
4685
    REALM_ASSERT(!m_is_closing);
16✔
4686
    m_is_closing = true;
16✔
4687

4688
    m_error_code = error_code;
16✔
4689
    m_error_session_ident = session_ident;
16✔
4690

4691
    // Don't waste time and effort sending any other messages
4692
    m_send_pong = false;
16✔
4693
    m_sessions_enlisted_to_send.clear();
16✔
4694

4695
    m_receiving_session = nullptr;
16✔
4696

4697
    terminate_sessions(); // Throws
16✔
4698

4699
    m_send_trigger->trigger();
16✔
4700
}
16✔
4701

4702

4703
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4704
{
762✔
4705
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
762✔
4706
    // Suicide
4707
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
762✔
4708
}
762✔
4709

4710

4711
void SyncConnection::close_due_to_error(std::error_code ec)
4712
{
470✔
4713
    // Suicide
4714
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
470✔
4715
              ec.message()); // Throws
470✔
4716
}
470✔
4717

4718

4719
void SyncConnection::terminate_sessions()
4720
{
1,252✔
4721
    for (auto& entry : m_sessions) {
1,954✔
4722
        Session& sess = *entry.second;
1,954✔
4723
        sess.terminate(); // Throws
1,954✔
4724
    }
1,954✔
4725
    m_sessions_enlisted_to_send.clear();
1,252✔
4726
    m_sessions.clear();
1,252✔
4727
}
1,252✔
4728

4729

4730
void SyncConnection::initiate_soft_close()
4731
{
16✔
4732
    if (!m_is_closing) {
16✔
4733
        session_ident_type session_ident = 0;                                    // Not session specific
16✔
4734
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
16✔
4735
    }
16✔
4736
}
16✔
4737

4738

4739
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4740
{
2,372✔
4741
    m_sessions.erase(session_ident);
2,372✔
4742
}
2,372✔
4743

4744
} // anonymous namespace
4745

4746

4747
// ============================ sync::Server implementation ============================
4748

4749
class Server::Implementation : public ServerImpl {
4750
public:
4751
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4752
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
552✔
4753
    {
1,208✔
4754
    }
1,208✔
4755
    virtual ~Implementation() {}
1,208✔
4756
};
4757

4758

4759
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4760
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
552✔
4761
{
1,208✔
4762
}
1,208✔
4763

4764

4765
Server::Server(Server&& serv) noexcept
4766
    : m_impl{std::move(serv.m_impl)}
4767
{
×
4768
}
×
4769

4770

4771
Server::~Server() noexcept {}
1,208✔
4772

4773

4774
void Server::start()
4775
{
524✔
4776
    m_impl->start(); // Throws
524✔
4777
}
524✔
4778

4779

4780
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4781
{
684✔
4782
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
684✔
4783
}
684✔
4784

4785

4786
network::Endpoint Server::listen_endpoint() const
4787
{
1,262✔
4788
    return m_impl->listen_endpoint(); // Throws
1,262✔
4789
}
1,262✔
4790

4791

4792
void Server::run()
4793
{
1,152✔
4794
    m_impl->run(); // Throws
1,152✔
4795
}
1,152✔
4796

4797

4798
void Server::stop() noexcept
4799
{
2,062✔
4800
    m_impl->stop();
2,062✔
4801
}
2,062✔
4802

4803

4804
uint_fast64_t Server::errors_seen() const noexcept
4805
{
684✔
4806
    return m_impl->errors_seen;
684✔
4807
}
684✔
4808

4809

4810
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4811
                                                      milliseconds_type timeout)
4812
{
×
4813
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4814
}
×
4815

4816

4817
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4818
{
4✔
4819
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4820
}
4✔
4821

4822

4823
void Server::close_connections()
4824
{
16✔
4825
    m_impl->close_connections();
16✔
4826
}
16✔
4827

4828

4829
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4830
{
72✔
4831
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4832
}
72✔
4833

4834

4835
void Server::recognize_external_change(const std::string& virt_path)
4836
{
4,800✔
4837
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4838
}
4,800✔
4839

4840

4841
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4842
{
×
4843
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4844
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc