• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_484

05 Aug 2024 04:20PM UTC coverage: 91.097% (-0.01%) from 91.108%
thomas.goyne_484

Pull #7912

Evergreen

tgoyne
Extract some duplicated code for sync triggers and timers
Pull Request #7912: Extract some duplicated code for sync triggers and timers

102644 of 181486 branches covered (56.56%)

62 of 71 new or added lines in 6 files covered. (87.32%)

87 existing lines in 14 files now uncovered.

216695 of 237872 relevant lines covered (91.1%)

5798402.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.05
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/memory_stream.hpp>
29
#include <realm/util/optional.hpp>
30
#include <realm/util/platform_info.hpp>
31
#include <realm/util/random.hpp>
32
#include <realm/util/safe_int_ops.hpp>
33
#include <realm/util/scope_exit.hpp>
34
#include <realm/util/scratch_allocator.hpp>
35
#include <realm/util/thread.hpp>
36
#include <realm/util/thread_exec_guard.hpp>
37
#include <realm/util/value_reset_guard.hpp>
38
#include <realm/version.hpp>
39

40
#include <algorithm>
41
#include <atomic>
42
#include <cctype>
43
#include <chrono>
44
#include <cmath>
45
#include <condition_variable>
46
#include <cstdint>
47
#include <cstdio>
48
#include <cstring>
49
#include <functional>
50
#include <locale>
51
#include <map>
52
#include <memory>
53
#include <queue>
54
#include <sstream>
55
#include <stdexcept>
56
#include <thread>
57
#include <vector>
58

59
// NOTE: The protocol specification is in `/doc/protocol.md`
60

61

62
// FIXME: Verify that session identifier spoofing cannot be used to get access
63
// to sessions belonging to other network conections in any way.
64
// FIXME: Seems that server must close connection with zero sessions after a
65
// certain timeout.
66

67

68
using namespace realm;
69
using namespace realm::sync;
70
using namespace realm::util;
71

72
// clang-format off
73
using ServerHistory         = _impl::ServerHistory;
74
using ServerProtocol        = _impl::ServerProtocol;
75
using ServerFileAccessCache = _impl::ServerFileAccessCache;
76
using ServerImplBase = _impl::ServerImplBase;
77

78
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
79
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
80
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
81
using IntegrationResult = ServerHistory::IntegrationResult;
82
using BootstrapError = ServerHistory::BootstrapError;
83
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
84
using ClientType = ServerHistory::ClientType;
85
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
86
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
87

88
using UploadChangeset = ServerProtocol::UploadChangeset;
89
// clang-format on
90

91

92
using UploadChangesets = std::vector<UploadChangeset>;
93

94
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
95

96

97
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
98
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
99
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
100
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
101

102

103
namespace {
104

105
enum class SchedStatus { done = 0, pending, in_progress };
106

107
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
108
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
109
{
×
110
    return "com.mongodb.realm-sync/";
×
111
}
×
112

113
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
114
{
392✔
115
    if (str.size() > cutoff) {
392✔
116
        return "..." + str.substr(str.size() - cutoff);
×
117
    }
×
118
    else {
392✔
119
        return str;
392✔
120
    }
392✔
121
}
392✔
122

123

124
class HttpListHeaderValueParser {
125
public:
126
    HttpListHeaderValueParser(std::string_view string) noexcept
127
        : m_string{string}
874✔
128
    {
2,060✔
129
    }
2,060✔
130
    bool next(std::string_view& elem) noexcept
131
    {
28,840✔
132
        while (m_pos < m_string.size()) {
28,840✔
133
            size_type i = m_pos;
26,780✔
134
            size_type j = m_string.find(',', i);
26,780✔
135
            if (j != std::string_view::npos) {
26,780✔
136
                m_pos = j + 1;
24,720✔
137
            }
24,720✔
138
            else {
2,060✔
139
                j = m_string.size();
2,060✔
140
                m_pos = j;
2,060✔
141
            }
2,060✔
142

143
            // Exclude leading and trailing white space
144
            while (i < j && is_http_lws(m_string[i]))
51,500✔
145
                ++i;
24,720✔
146
            while (j > i && is_http_lws(m_string[j - 1]))
26,780✔
147
                --j;
×
148

149
            if (i != j) {
26,780✔
150
                elem = m_string.substr(i, j - i);
26,780✔
151
                return true;
26,780✔
152
            }
26,780✔
153
        }
26,780✔
154
        return false;
2,060✔
155
    }
28,840✔
156

157
private:
158
    using size_type = std::string_view::size_type;
159
    const std::string_view m_string;
160
    size_type m_pos = 0;
161
    static bool is_http_lws(char ch) noexcept
162
    {
78,280✔
163
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
78,280✔
164
    }
78,280✔
165
};
166

167

168
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
169
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
170
using SteadyTimePoint = SteadyClock::time_point;
171

172
SteadyTimePoint steady_clock_now() noexcept
173
{
149,010✔
174
    return SteadyClock::now();
149,010✔
175
}
149,010✔
176

177
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
178
{
37,284✔
179
    auto duration = end_time - start_time;
37,284✔
180
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
37,284✔
181
    return milliseconds_type(millis_duration);
37,284✔
182
}
37,284✔
183

184

185
bool determine_try_again(ProtocolError error_code) noexcept
186
{
92✔
187
    return (error_code == ProtocolError::connection_closed);
92✔
188
}
92✔
189

190

191
class ServerFile;
192
class ServerImpl;
193
class HTTPConnection;
194
class SyncConnection;
195
class Session;
196

197

198
using Formatter = util::ResettableExpandableBufferOutputStream;
199
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
200

201
using ProtocolVersionRange = std::pair<int, int>;
202

203
class MiscBuffers {
204
public:
205
    Formatter formatter;
206
    OutputBuffer download_message;
207

208
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
209
    ProtocolVersionRanges protocol_version_ranges;
210

211
    std::vector<char> compress;
212

213
    MiscBuffers()
214
    {
1,200✔
215
        formatter.imbue(std::locale::classic());
1,200✔
216
        download_message.imbue(std::locale::classic());
1,200✔
217
    }
1,200✔
218
};
219

220

221
struct DownloadCache {
222
    std::unique_ptr<char[]> body;
223
    std::size_t uncompressed_body_size;
224
    std::size_t compressed_body_size;
225
    bool body_is_compressed;
226
    version_type end_version;
227
    DownloadCursor download_progress;
228
    std::uint_fast64_t downloadable_bytes;
229
    std::size_t num_changesets;
230
    std::size_t accum_original_size;
231
    std::size_t accum_compacted_size;
232
};
233

234

235
// An unblocked work unit is comprised of one Work object for each of the files
236
// that contribute work to the work unit, generally one reference file and a
237
// number of partial files.
238
class Work {
239
public:
240
    // In general, primary work is all forms of modifying work, including file
241
    // deletion.
242
    bool has_primary_work = false;
243

244
    // Only for reference files
245
    bool might_produce_new_sync_version = false;
246

247
    bool produced_new_realm_version = false;
248
    bool produced_new_sync_version = false;
249
    bool expired_reference_version = false;
250

251
    // True if, and only if changesets_from_downstream contains at least one
252
    // changeset.
253
    bool have_changesets_from_downstream = false;
254

255
    FileIdentAllocSlots file_ident_alloc_slots;
256
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
257
    IntegratableChangesets changesets_from_downstream;
258

259
    VersionInfo version_info;
260

261
    // Result of integration of changesets from downstream clients
262
    IntegrationResult integration_result;
263

264
    void reset() noexcept
265
    {
37,180✔
266
        has_primary_work = false;
37,180✔
267

268
        might_produce_new_sync_version = false;
37,180✔
269

270
        produced_new_realm_version = false;
37,180✔
271
        produced_new_sync_version = false;
37,180✔
272
        expired_reference_version = false;
37,180✔
273
        have_changesets_from_downstream = false;
37,180✔
274

275
        file_ident_alloc_slots.clear();
37,180✔
276
        changeset_buffers.clear();
37,180✔
277
        changesets_from_downstream.clear();
37,180✔
278

279
        version_info = {};
37,180✔
280
        integration_result = {};
37,180✔
281
    }
37,180✔
282
};
283

284

285
class WorkerState {
286
public:
287
    FileIdentAllocSlots file_ident_alloc_slots;
288
    util::ScratchMemory scratch_memory;
289
    bool use_file_cache = true;
290
    std::unique_ptr<ServerHistory> reference_hist;
291
    DBRef reference_sg;
292
};
293

294

295
// ============================ SessionQueue ============================
296

297
class SessionQueue {
298
public:
299
    void push_back(Session*) noexcept;
300
    Session* pop_front() noexcept;
301
    void clear() noexcept;
302

303
private:
304
    Session* m_back = nullptr;
305
};
306

307

308
// ============================ FileIdentReceiver ============================
309

310
class FileIdentReceiver {
311
public:
312
    virtual void receive_file_ident(SaltedFileIdent) = 0;
313

314
protected:
315
    ~FileIdentReceiver() {}
5,126✔
316
};
317

318

319
// ============================ WorkerBox =============================
320

321
class WorkerBox {
322
public:
323
    using JobType = util::UniqueFunction<void(WorkerState&)>;
324
    void add_work(WorkerState& state, JobType job)
325
    {
×
326
        std::unique_lock<std::mutex> lock(m_mutex);
×
327
        if (m_jobs.size() >= m_queue_limit) {
×
328
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
329
            // than to queue it.
×
330
            run_a_job(lock, state, job);
×
331
        }
×
332
        else {
×
333
            // Create worker threads on demand (if all existing threads are active):
×
334
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
335
                m_threads.emplace_back([this]() {
×
336
                    WorkerState state;
×
337
                    state.use_file_cache = false;
×
338
                    JobType the_job;
×
339
                    std::unique_lock<std::mutex> lock(m_mutex);
×
340
                    for (;;) {
×
341
                        while (m_jobs.empty() && !m_finish_up)
×
342
                            m_changes.wait(lock);
×
343
                        if (m_finish_up)
×
344
                            break; // terminate thread
×
345
                        the_job = std::move(m_jobs.back());
×
346
                        m_jobs.pop_back();
×
347
                        run_a_job(lock, state, the_job);
×
348
                        m_changes.notify_all();
×
349
                    }
×
350
                });
×
351
            }
×
352

×
353
            // Submit the job for execution:
×
354
            m_jobs.emplace_back(std::move(job));
×
355
            m_changes.notify_all();
×
356
        }
×
357
    }
×
358

359
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
360
    // propagation of exceptions.
361
    void wait_completion(WorkerState& state)
362
    {
×
363
        std::unique_lock<std::mutex> lock(m_mutex);
×
364
        while (!m_jobs.empty() || m_active > 0) {
×
365
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
366
                JobType the_job = std::move(m_jobs.back());
×
367
                m_jobs.pop_back();
×
368
                run_a_job(lock, state, the_job);
×
369
            }
×
370
            else {
×
371
                m_changes.wait(lock);
×
372
            }
×
373
        }
×
374
        if (m_epr) {
×
375
            std::rethrow_exception(m_epr);
×
376
        }
×
377
    }
×
378

379
    WorkerBox(unsigned int num_threads)
380
    {
×
381
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
382
        m_max_num_threads = num_threads;
×
383
    }
×
384

385
    ~WorkerBox()
386
    {
×
387
        {
×
388
            std::unique_lock<std::mutex> lock(m_mutex);
×
389
            m_finish_up = true;
×
390
            m_changes.notify_all();
×
391
        }
×
392
        for (auto& e : m_threads)
×
393
            e.join();
×
394
    }
×
395

396
private:
397
    std::mutex m_mutex;
398
    std::condition_variable m_changes;
399
    std::vector<std::thread> m_threads;
400
    std::vector<JobType> m_jobs;
401
    unsigned int m_active = 0;
402
    bool m_finish_up = false;
403
    unsigned int m_queue_limit = 0;
404
    unsigned int m_max_num_threads = 0;
405
    std::exception_ptr m_epr;
406

407
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
408
    {
×
409
        ++m_active;
×
410
        lock.unlock();
×
411
        try {
×
412
            job(state);
×
413
            lock.lock();
×
414
        }
×
415
        catch (...) {
×
416
            lock.lock();
×
417
            if (!m_epr)
×
418
                m_epr = std::current_exception();
×
419
        }
×
420
        --m_active;
×
421
    }
×
422
};
423

424

425
// ============================ ServerFile ============================
426

427
class ServerFile : public util::RefCountBase {
428
public:
429
    util::PrefixLogger logger;
430

431
    // Logger to be used by the worker thread
432
    util::PrefixLogger wlogger;
433

434
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
435
               bool disable_sync_to_disk);
436
    ~ServerFile() noexcept;
437

438
    void initialize();
439
    void activate();
440

441
    ServerImpl& get_server() noexcept
442
    {
42,418✔
443
        return m_server;
42,418✔
444
    }
42,418✔
445

446
    const std::string& get_real_path() const noexcept
447
    {
×
448
        return m_file.realm_path;
×
449
    }
×
450

451
    const std::string& get_virt_path() const noexcept
452
    {
×
453
        return m_file.virt_path;
×
454
    }
×
455

456
    ServerFileAccessCache::File& access()
457
    {
52,204✔
458
        return m_file.access(); // Throws
52,204✔
459
    }
52,204✔
460

461
    ServerFileAccessCache::File& worker_access()
462
    {
37,158✔
463
        return m_worker_file.access(); // Throws
37,158✔
464
    }
37,158✔
465

466
    version_type get_realm_version() const noexcept
467
    {
×
468
        return m_version_info.realm_version;
×
469
    }
×
470

471
    version_type get_sync_version() const noexcept
472
    {
×
473
        return m_version_info.sync_version.version;
×
474
    }
×
475

476
    SaltedVersion get_salted_sync_version() const noexcept
477
    {
106,950✔
478
        return m_version_info.sync_version;
106,950✔
479
    }
106,950✔
480

481
    DownloadCache& get_download_cache() noexcept;
482

483
    void register_client_access(file_ident_type client_file_ident);
484

485
    using file_ident_request_type = std::int_fast64_t;
486

487
    // Initiate a request for a new client file identifier.
488
    //
489
    // Unless the request is cancelled, the identifier will be delivered to the
490
    // receiver by way of an invocation of
491
    // FileIdentReceiver::receive_file_ident().
492
    //
493
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
494
    // until after request_file_ident() has returned (no callback reentrance).
495
    //
496
    // New client file identifiers will be delivered to receivers in the order
497
    // that they were requested.
498
    //
499
    // The returned value is a nonzero integer that can be used to cancel the
500
    // request before the file identifier is delivered using
501
    // cancel_file_ident_request().
502
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
503

504
    // Cancel the specified file identifier request.
505
    //
506
    // It is an error to call this function after the identifier has been
507
    // delivered.
508
    void cancel_file_ident_request(file_ident_request_type) noexcept;
509

510
    void add_unidentified_session(Session*);
511
    void identify_session(Session*, file_ident_type client_file_ident);
512

513
    void remove_unidentified_session(Session*) noexcept;
514
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
515

516
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
517

518
    bool can_add_changesets_from_downstream() const noexcept;
519
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
520
                                        version_type locked_server_version, const UploadChangeset*,
521
                                        std::size_t num_changesets);
522

523
    // bootstrap_client_session calls the function of same name in server_history
524
    // but corrects the upload_progress with information from pending
525
    // integratable changesets. A situation can occur where a client terminates
526
    // a session and starts a new session and re-uploads changesets that are known
527
    // by the ServerFile object but not by the ServerHistory.
528
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
529
                                            SaltedVersion server_version, ClientType client_type,
530
                                            UploadCursor& upload_progress, version_type& locked_server_version,
531
                                            Logger&);
532

533
    // NOTE: This function is executed by the worker thread
534
    void worker_process_work_unit(WorkerState&);
535

536
    void recognize_external_change();
537

538
private:
539
    ServerImpl& m_server;
540
    ServerFileAccessCache::Slot m_file;
541

542
    // In general, `m_version_info` refers to the last snapshot of the Realm
543
    // file that is supposed to be visible to remote peers engaging in regular
544
    // Realm file synchronization.
545
    VersionInfo m_version_info;
546

547
    file_ident_request_type m_last_file_ident_request = 0;
548

549
    // The set of sessions whose client file identifier is not yet known, i.e.,
550
    // those for which an IDENT message has not yet been received,
551
    std::set<Session*> m_unidentified_sessions;
552

553
    // A map of the sessions whose client file identifier is known, i.e, those
554
    // for which an IDENT message has been received.
555
    std::map<file_ident_type, Session*> m_identified_sessions;
556

557
    // Used when a file used as partial view wants to allocate a client file
558
    // identifier from the reference Realm.
559
    file_ident_request_type m_file_ident_request = 0;
560

561
    struct FileIdentRequestInfo {
562
        FileIdentReceiver* receiver;
563
        file_ident_type proxy_file;
564
        ClientType client_type;
565
    };
566

567
    // When nonempty, it counts towards outstanding blocked work (see
568
    // `m_has_blocked_work`).
569
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
570

571
    // Changesets received from the downstream clients, and waiting to be
572
    // integrated, as well as information about the clients progress in terms of
573
    // integrating changesets received from the server. When nonempty, it counts
574
    // towards outstanding blocked work (see `m_has_blocked_work`).
575
    //
576
    // At any given time, the set of changesets from a particular client-side
577
    // file may be comprised of changesets received via distinct sessions.
578
    //
579
    // See also `m_num_changesets_from_downstream`.
580
    IntegratableChangesets m_changesets_from_downstream;
581

582
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
583
    //
584
    // Its purpose is also to initialize
585
    // `Work::have_changesets_from_downstream`.
586
    std::size_t m_num_changesets_from_downstream = 0;
587

588
    // The total size, in bytes, of the changesets that were received from
589
    // clients, are targeting this file, and are currently part of the blocked
590
    // work unit.
591
    //
592
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
593
    // purpose is to allow the server to keep track of the accumulated size of
594
    // changesets being processed, or waiting to be processed (metric
595
    // `upload.pending.bytes`) (see
596
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
597
    //
598
    // Its purpose is also to enable the "very poor man's" backpressure solution
599
    // (see can_add_changesets_from_downstream()).
600
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
601

602
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
603
    // currently unblocked work unit.
604
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
605

606
    // When nonempty, it counts towards outstanding blocked work (see
607
    // `m_has_blocked_work`).
608
    std::vector<std::string> m_permission_changes;
609

610
    // True iff this file, or any of its associated partial files (when
611
    // applicable), has a nonzero amount of outstanding work that is currently
612
    // held back from being passed to the worker thread because a previously
613
    // accumulated chunk of work related to this file is currently in progress.
614
    bool m_has_blocked_work = false;
615

616
    // A file, that is not a partial file, is considered *exposed to the worker
617
    // thread* from the point in time where it is submitted to the worker
618
    // (Worker::enqueue()) and up until the point in time where
619
    // group_postprocess_stage_1() starts to execute. A partial file is
620
    // considered *exposed to the worker thread* precisely when the associated
621
    // reference file is exposed to the worker thread, but only if it was in
622
    // `m_reference_file->m_work.partial_files` at the point in time where the
623
    // reference file was passed to the worker.
624
    //
625
    // While this file is exposed to the worker thread, all members of `m_work`
626
    // other than `changesets_from_downstream` may be accessed and modified by
627
    // the worker thread only.
628
    //
629
    // While this file is exposed to the worker thread,
630
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
631
    // must not be modified by any thread. This special status of
632
    // `m_work.changesets_from_downstream` is required to allow
633
    // ServerFile::bootstrap_client_session() to read from it at any time.
634
    Work m_work;
635

636
    // For reference files, set to true when work is unblocked, and reset back
637
    // to false when the work finalization process completes
638
    // (group_postprocess_stage_3()). Always zero for partial files.
639
    bool m_has_work_in_progress = 0;
640

641
    // This one must only be accessed by the worker thread.
642
    //
643
    // More specifically, `m_worker_file.access()` must only be called by the
644
    // worker thread, and if it was ever called, it must be closed by the worker
645
    // thread before the ServerFile object is destroyed, if destruction happens
646
    // before the destruction of the server object itself.
647
    ServerFileAccessCache::Slot m_worker_file;
648

649
    std::vector<std::int_fast64_t> m_deleting_connections;
650

651
    DownloadCache m_download_cache;
652

653
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
654
    void on_work_added();
655
    void group_unblock_work();
656
    void unblock_work();
657

658
    /// Resume history scanning in all sessions bound to this file. To be called
659
    /// after a successfull integration of a changeset.
660
    void resume_download() noexcept;
661

662
    // NOTE: These functions are executed by the worker thread
663
    void worker_allocate_file_identifiers();
664
    bool worker_integrate_changes_from_downstream(WorkerState&);
665
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
666
                                           DBRef& sg_ptr);
667
    ServerHistory& get_reference_file_history(WorkerState& state);
668
    void group_postprocess_stage_1();
669
    void group_postprocess_stage_2();
670
    void group_postprocess_stage_3();
671
    void group_finalize_work_stage_1();
672
    void group_finalize_work_stage_2();
673
    void finalize_work_stage_1();
674
    void finalize_work_stage_2();
675
};
676

677

678
inline DownloadCache& ServerFile::get_download_cache() noexcept
679
{
42,010✔
680
    return m_download_cache;
42,010✔
681
}
42,010✔
682

683
inline void ServerFile::group_finalize_work_stage_1()
684
{
36,860✔
685
    finalize_work_stage_1(); // Throws
36,860✔
686
}
36,860✔
687

688
inline void ServerFile::group_finalize_work_stage_2()
689
{
36,860✔
690
    finalize_work_stage_2(); // Throws
36,860✔
691
}
36,860✔
692

693

694
// ============================ Worker ============================
695

696
// All write transaction on server-side Realm files performed on behalf of the
697
// server, must be performed by the worker thread, not the network event loop
698
// thread. This is to ensure that the network event loop thread never gets
699
// blocked waiting for the worker thread to end a long running write
700
// transaction.
701
//
702
// FIXME: Currently, the event loop thread does perform a number of write
703
// transactions, but only on subtier nodes of a star topology server cluster.
704
class Worker : public ServerHistory::Context {
705
public:
706
    std::shared_ptr<util::Logger> logger_ptr;
707
    util::Logger& logger;
708

709
    explicit Worker(ServerImpl&);
710

711
    ServerFileAccessCache& get_file_access_cache() noexcept;
712

713
    void enqueue(ServerFile*);
714

715
    // Overriding members of ServerHistory::Context
716
    std::mt19937_64& server_history_get_random() noexcept override final;
717

718
private:
719
    ServerImpl& m_server;
720
    std::mt19937_64 m_random;
721
    ServerFileAccessCache m_file_access_cache;
722

723
    util::Mutex m_mutex;
724
    util::CondVar m_cond; // Protected by `m_mutex`
725

726
    bool m_stop = false; // Protected by `m_mutex`
727

728
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
729

730
    WorkerState m_state;
731

732
    void run();
733
    void stop() noexcept;
734

735
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
736
};
737

738

739
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
740
{
1,138✔
741
    return m_file_access_cache;
1,138✔
742
}
1,138✔
743

744

745
// ============================ ServerImpl ============================
746

747
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
748
public:
749
    std::uint_fast64_t errors_seen = 0;
750

751
    std::atomic<milliseconds_type> m_par_time;
752
    std::atomic<milliseconds_type> m_seq_time;
753

754
    util::Mutex last_client_accesses_mutex;
755

756
    const std::shared_ptr<util::Logger> logger_ptr;
757
    util::Logger& logger;
758

759
    network::Service& get_service() noexcept
760
    {
51,070✔
761
        return m_service;
51,070✔
762
    }
51,070✔
763

764
    const network::Service& get_service() const noexcept
765
    {
×
766
        return m_service;
×
767
    }
×
768

769
    std::mt19937_64& get_random() noexcept
770
    {
65,082✔
771
        return m_random;
65,082✔
772
    }
65,082✔
773

774
    const Server::Config& get_config() const noexcept
775
    {
113,728✔
776
        return m_config;
113,728✔
777
    }
113,728✔
778

779
    std::size_t get_max_upload_backlog() const noexcept
780
    {
44,464✔
781
        return m_max_upload_backlog;
44,464✔
782
    }
44,464✔
783

784
    const std::string& get_root_dir() const noexcept
785
    {
5,126✔
786
        return m_root_dir;
5,126✔
787
    }
5,126✔
788

789
    network::ssl::Context& get_ssl_context() noexcept
790
    {
48✔
791
        return *m_ssl_context;
48✔
792
    }
48✔
793

794
    const AccessControl& get_access_control() const noexcept
795
    {
×
796
        return m_access_control;
×
797
    }
×
798

799
    ProtocolVersionRange get_protocol_version_range() const noexcept
800
    {
2,060✔
801
        return m_protocol_version_range;
2,060✔
802
    }
2,060✔
803

804
    ServerProtocol& get_server_protocol() noexcept
805
    {
132,278✔
806
        return m_server_protocol;
132,278✔
807
    }
132,278✔
808

809
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
810
    {
4,182✔
811
        return m_compress_memory_arena;
4,182✔
812
    }
4,182✔
813

814
    MiscBuffers& get_misc_buffers() noexcept
815
    {
48,252✔
816
        return m_misc_buffers;
48,252✔
817
    }
48,252✔
818

819
    int_fast64_t get_current_server_session_ident() const noexcept
820
    {
×
821
        return m_current_server_session_ident;
×
822
    }
×
823

824
    util::ScratchMemory& get_scratch_memory() noexcept
825
    {
×
826
        return m_scratch_memory;
×
827
    }
×
828

829
    Worker& get_worker() noexcept
830
    {
39,456✔
831
        return m_worker;
39,456✔
832
    }
39,456✔
833

834
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
835
    {
×
836
        parallel_section = m_par_time;
×
837
        sequential_section = m_seq_time;
×
838
    }
×
839

840
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
841
    ~ServerImpl() noexcept;
842

843
    void start();
844

845
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
846
    {
684✔
847
        m_config.listen_address = listen_address;
684✔
848
        m_config.listen_port = listen_port;
684✔
849
        m_config.reuse_address = reuse_address;
684✔
850

851
        start(); // Throws
684✔
852
    }
684✔
853

854
    network::Endpoint listen_endpoint() const
855
    {
1,254✔
856
        return m_acceptor.local_endpoint();
1,254✔
857
    }
1,254✔
858

859
    void run();
860
    void stop() noexcept;
861

862
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
863

864
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
865
    void remove_sync_connection(int_fast64_t connection_id);
866

867
    size_t get_number_of_http_connections()
868
    {
×
869
        return m_http_connections.size();
×
870
    }
×
871

872
    size_t get_number_of_sync_connections()
873
    {
×
874
        return m_sync_connections.size();
×
875
    }
×
876

877
    bool is_sync_stopped()
878
    {
39,240✔
879
        return m_sync_stopped;
39,240✔
880
    }
39,240✔
881

882
    const std::set<std::string>& get_realm_names() const noexcept
883
    {
×
884
        return m_realm_names;
×
885
    }
×
886

887
    // virt_path must be valid when get_or_create_file() is called.
888
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
889
    {
5,098✔
890
        util::bind_ptr<ServerFile> file = get_file(virt_path);
5,098✔
891
        if (REALM_LIKELY(file))
5,098✔
892
            return file;
3,960✔
893

894
        _impl::VirtualPathComponents virt_path_components =
1,138✔
895
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,138✔
896
        REALM_ASSERT(virt_path_components.is_valid);
1,138✔
897

898
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,138✔
899
        m_realm_names.insert(virt_path);         // Throws
1,138✔
900
        {
1,138✔
901
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,138✔
902
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,138✔
903
                                      disable_sync_to_disk)); // Throws
1,138✔
904
        }
1,138✔
905

906
        file->initialize();
1,138✔
907
        m_files[virt_path] = file; // Throws
1,138✔
908
        file->activate();          // Throws
1,138✔
909
        return file;
1,138✔
910
    }
5,098✔
911

912
    std::unique_ptr<ServerHistory> make_history_for_path()
913
    {
×
914
        return std::make_unique<ServerHistory>(*this);
×
915
    }
×
916

917
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
918
    {
5,098✔
919
        auto i = m_files.find(virt_path);
5,098✔
920
        if (REALM_LIKELY(i != m_files.end()))
5,098✔
921
            return i->second;
3,960✔
922
        return {};
1,138✔
923
    }
5,098✔
924

925
    // Returns the number of seconds since the Epoch of
926
    // std::chrono::system_clock.
927
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
928
    {
×
929
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
930
            return m_config.token_expiration_clock->now();
×
931
        return std::chrono::system_clock::now();
×
932
    }
×
933

934
    void set_connection_reaper_timeout(milliseconds_type);
935

936
    void close_connections();
937
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
938

939
    void recognize_external_change(const std::string& virt_path);
940

941
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
942
                                                  milliseconds_type timeout);
943

944
    // Server global outputbuffers that can be reused.
945
    // The server is single threaded, so there are no
946
    // synchronization issues.
947
    // output_buffers_count is equal to the
948
    // maximum number of buffers needed at any point.
949
    static constexpr int output_buffers_count = 1;
950
    OutputBuffer output_buffers[output_buffers_count];
951

952
    bool is_load_balancing_allowed() const
953
    {
×
954
        return m_allow_load_balancing;
×
955
    }
×
956

957
    // inc_byte_size_for_pending_downstream_changesets() must be called by
958
    // ServerFile objects when changesets from downstream clients have been
959
    // received.
960
    //
961
    // dec_byte_size_for_pending_downstream_changesets() must be called by
962
    // ServerFile objects when changesets from downstream clients have been
963
    // processed or discarded.
964
    //
965
    // ServerImpl uses this information to keep a running tally (metric
966
    // `upload.pending.bytes`) of the total byte size of pending changesets from
967
    // downstream clients.
968
    //
969
    // These functions must be called on the network thread.
970
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
971
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
972

973
    // Overriding member functions in _impl::ServerHistory::Context
974
    std::mt19937_64& server_history_get_random() noexcept override final;
975

976
private:
977
    Server::Config m_config;
978
    network::Service m_service;
979
    std::mt19937_64 m_random;
980
    const std::size_t m_max_upload_backlog;
981
    const std::string m_root_dir;
982
    const AccessControl m_access_control;
983
    const ProtocolVersionRange m_protocol_version_range;
984

985
    // The reserved files will be closed in situations where the server
986
    // runs out of file descriptors.
987
    std::unique_ptr<File> m_reserved_files[5];
988

989
    // The set of all Realm files known to this server, represented by their
990
    // virtual path.
991
    //
992
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
993
    // reported by an invocation of _impl::get_realm_names()), then the
994
    // corresponding virtual path is in `m_realm_names`, assuming no external
995
    // file-system level intervention.
996
    std::set<std::string> m_realm_names;
997

998
    std::unique_ptr<network::ssl::Context> m_ssl_context;
999
    ServerFileAccessCache m_file_access_cache;
1000
    Worker m_worker;
1001
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1002
    network::Acceptor m_acceptor;
1003
    std::int_fast64_t m_next_conn_id = 0;
1004
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1005
    network::Endpoint m_next_http_conn_endpoint;
1006
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1007
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1008
    ServerProtocol m_server_protocol;
1009
    compression::CompressMemoryArena m_compress_memory_arena;
1010
    MiscBuffers m_misc_buffers;
1011
    int_fast64_t m_current_server_session_ident;
1012
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1013
    bool m_allow_load_balancing = false;
1014

1015
    util::Mutex m_mutex;
1016

1017
    bool m_stopped = false; // Protected by `m_mutex`
1018

1019
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1020
    // When m_sync_stopped is true, the server does not perform any sync.
1021
    bool m_sync_stopped = false;
1022

1023
    std::atomic<bool> m_running{false}; // Debugging facility
1024

1025
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1026

1027
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1028

1029
    util::ScratchMemory m_scratch_memory;
1030

1031
    void listen();
1032
    void initiate_accept();
1033
    void handle_accept(std::error_code);
1034

1035
    void reap_connections();
1036
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1037
    void do_close_connections();
1038

1039
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1040
    {
1,200✔
1041
        if (config.max_upload_backlog == 0)
1,200✔
1042
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
1,198✔
1043
        return config.max_upload_backlog;
2✔
1044
    }
1,200✔
1045

1046
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1047
    {
1,200✔
1048
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
1,200✔
1049
        const int actual_max = get_current_protocol_version();
1,200✔
1050
        static_assert(actual_min <= actual_max, "");
1,200✔
1051
        int min = actual_min;
1,200✔
1052
        int max = actual_max;
1,200✔
1053
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
1,200!
1054
            if (config.max_protocol_version < min)
×
1055
                throw Server::NoSupportedProtocolVersions();
×
1056
            max = config.max_protocol_version;
×
1057
        }
×
1058
        return {min, max};
1,200✔
1059
    }
1,200✔
1060

1061
    void do_recognize_external_change(const std::string& virt_path);
1062

1063
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1064
                                                     milliseconds_type timeout);
1065
};
1066

1067
// ============================ SyncConnection ============================
1068

1069
class SyncConnection : public websocket::Config {
1070
public:
1071
    const std::shared_ptr<util::Logger> logger_ptr;
1072
    util::Logger& logger;
1073

1074
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1075
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1076
    // Clients with sync protocol version less than 10 do not support log messages
1077
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1078

1079
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1080
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1081
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1082
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1083
        : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
874✔
1084
                                                          serv.logger_ptr)} // Throws
874✔
1085
        , logger{*logger_ptr}
874✔
1086
        , m_server{serv}
874✔
1087
        , m_id{id}
874✔
1088
        , m_socket{std::move(socket)}
874✔
1089
        , m_ssl_stream{std::move(ssl_stream)}
874✔
1090
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
874✔
1091
        , m_websocket{*this}
874✔
1092
        , m_client_protocol_version{client_protocol_version}
874✔
1093
        , m_client_user_agent{std::move(client_user_agent)}
874✔
1094
        , m_remote_endpoint{std::move(remote_endpoint)}
874✔
1095
        , m_appservices_request_id{std::move(appservices_request_id)}
874✔
1096
        , m_send_trigger{m_server.get_service(), &SyncConnection::send_next_message, this}
874✔
1097
    {
2,060✔
1098
        // Make the output buffer stream throw std::bad_alloc if it fails to
1099
        // expand the buffer
1100
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
2,060✔
1101
    }
2,060✔
1102

1103
    ~SyncConnection() noexcept;
1104

1105
    ServerImpl& get_server() noexcept
1106
    {
116,314✔
1107
        return m_server;
116,314✔
1108
    }
116,314✔
1109

1110
    ServerProtocol& get_server_protocol() noexcept
1111
    {
132,288✔
1112
        return m_server.get_server_protocol();
132,288✔
1113
    }
132,288✔
1114

1115
    int get_client_protocol_version()
1116
    {
103,576✔
1117
        return m_client_protocol_version;
103,576✔
1118
    }
103,576✔
1119

1120
    const std::string& get_client_user_agent() const noexcept
1121
    {
5,098✔
1122
        return m_client_user_agent;
5,098✔
1123
    }
5,098✔
1124

1125
    const std::string& get_remote_endpoint() const noexcept
1126
    {
5,098✔
1127
        return m_remote_endpoint;
5,098✔
1128
    }
5,098✔
1129

1130
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1131
    {
2,060✔
1132
        return logger_ptr;
2,060✔
1133
    }
2,060✔
1134

1135
    std::mt19937_64& websocket_get_random() noexcept final override
1136
    {
63,946✔
1137
        return m_server.get_random();
63,946✔
1138
    }
63,946✔
1139

1140
    bool websocket_binary_message_received(const char* data, size_t size) final override
1141
    {
68,870✔
1142
        using sf = _impl::SimulatedFailure;
68,870✔
1143
        if (sf::check_trigger(sf::sync_server__read_head)) {
68,870✔
1144
            // Suicide
1145
            read_error(sf::sync_server__read_head);
536✔
1146
            return false;
536✔
1147
        }
536✔
1148
        // After a connection level error has occurred, all incoming messages
1149
        // will be ignored. By continuing to read until end of input, the server
1150
        // is able to know when the client closes the connection, which in
1151
        // general means that is has received the ERROR message.
1152
        if (REALM_LIKELY(!m_is_closing)) {
68,334✔
1153
            m_last_activity_at = steady_clock_now();
68,332✔
1154
            handle_message_received(data, size);
68,332✔
1155
        }
68,332✔
1156
        return true;
68,334✔
1157
    }
68,870✔
1158

1159
    bool websocket_ping_message_received(const char* data, size_t size) final override
1160
    {
×
1161
        if (REALM_LIKELY(!m_is_closing)) {
×
1162
            m_last_activity_at = steady_clock_now();
×
1163
            handle_ping_received(data, size);
×
1164
        }
×
1165
        return true;
×
1166
    }
×
1167

1168
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1169
    {
63,946✔
1170
        if (m_ssl_stream) {
63,946✔
1171
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1172
        }
70✔
1173
        else {
63,876✔
1174
            m_socket->async_write(data, size, std::move(handler)); // Throws
63,876✔
1175
        }
63,876✔
1176
    }
63,946✔
1177

1178
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1179
    {
208,150✔
1180
        if (m_ssl_stream) {
208,150✔
1181
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
140✔
1182
        }
140✔
1183
        else {
208,010✔
1184
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
208,010✔
1185
        }
208,010✔
1186
    }
208,150✔
1187

1188
    void async_read_until(char* buffer, size_t size, char delim,
1189
                          websocket::ReadCompletionHandler handler) final override
1190
    {
×
1191
        if (m_ssl_stream) {
×
1192
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1193
                                           std::move(handler)); // Throws
×
1194
        }
×
1195
        else {
×
1196
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1197
                                       std::move(handler)); // Throws
×
1198
        }
×
1199
    }
×
1200

1201
    void websocket_read_error_handler(std::error_code ec) final override
1202
    {
672✔
1203
        read_error(ec);
672✔
1204
    }
672✔
1205

1206
    void websocket_write_error_handler(std::error_code ec) final override
1207
    {
×
1208
        write_error(ec);
×
1209
    }
×
1210

1211
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view) final override
1212
    {
×
1213
        // WebSocket class has already logged a message for this error
1214
        close_due_to_error(ec); // Throws
×
1215
    }
×
1216

1217
    void websocket_protocol_error_handler(std::error_code ec) final override
1218
    {
×
1219
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1220
        close_due_to_error(ec);                                              // Throws
×
1221
    }
×
1222

1223
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1224
    {
×
1225
        // This is not called since we handle HTTP request in handle_request_for_sync()
1226
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
1227
    }
×
1228

1229
    int_fast64_t get_id() const noexcept
1230
    {
×
1231
        return m_id;
×
1232
    }
×
1233

1234
    network::Socket& get_socket() noexcept
1235
    {
×
1236
        return *m_socket;
×
1237
    }
×
1238

1239
    void initiate();
1240

1241
    // Commits suicide
1242
    template <class... Params>
1243
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1244

1245
    // Commits suicide
1246
    void terminate_if_dead(SteadyTimePoint now);
1247

1248
    void enlist_to_send(Session*) noexcept;
1249

1250
    // Sessions should get the output_buffer and insert a message, after which
1251
    // they call initiate_write_output_buffer().
1252
    OutputBuffer& get_output_buffer()
1253
    {
63,944✔
1254
        m_output_buffer.reset();
63,944✔
1255
        return m_output_buffer;
63,944✔
1256
    }
63,944✔
1257

1258
    // More advanced memory strategies can be implemented if needed.
1259
    void release_output_buffer() {}
63,790✔
1260

1261
    // When this function is called, the connection will initiate a write with
1262
    // its output_buffer. Sessions use this method.
1263
    void initiate_write_output_buffer();
1264

1265
    void initiate_pong_output_buffer();
1266

1267
    void handle_protocol_error(Status status);
1268

1269
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1270
                              bool need_client_file_ident, bool is_subserver);
1271

1272
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1273
                               salt_type client_file_ident_salt, version_type scan_server_version,
1274
                               version_type scan_client_version, version_type latest_server_version,
1275
                               salt_type latest_server_version_salt);
1276

1277
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1278
                                version_type progress_server_version, version_type locked_server_version,
1279
                                const UploadChangesets&);
1280

1281
    void receive_mark_message(session_ident_type, request_ident_type);
1282

1283
    void receive_unbind_message(session_ident_type);
1284

1285
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1286

1287
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1288

1289
    void protocol_error(ProtocolError, Session* = nullptr);
1290

1291
    void initiate_soft_close();
1292

1293
    void discard_session(session_ident_type) noexcept;
1294

1295
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1296
                          std::optional<std::string> co_id = std::nullopt);
1297

1298
private:
1299
    ServerImpl& m_server;
1300
    const int_fast64_t m_id;
1301
    std::unique_ptr<network::Socket> m_socket;
1302
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1303
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1304

1305
    websocket::Socket m_websocket;
1306
    std::unique_ptr<char[]> m_input_body_buffer;
1307
    OutputBuffer m_output_buffer;
1308
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1309

1310
    // The protocol version in use by the connected client.
1311
    const int m_client_protocol_version;
1312

1313
    // The user agent description passed by the client.
1314
    const std::string m_client_user_agent;
1315

1316
    const std::string m_remote_endpoint;
1317

1318
    const std::string m_appservices_request_id;
1319

1320
    // A queue of sessions that have enlisted for an opportunity to send a
1321
    // message. Sessions will be served in the order that they enlist. A session
1322
    // can only occur once in this queue (linked list). If the queue is not
1323
    // empty, and no message is currently being written to the socket, the first
1324
    // session is taken out of the queue, and then granted an opportunity to
1325
    // send a message.
1326
    //
1327
    // Sessions will never be destroyed while in this queue. This is ensured
1328
    // because the connection owns the sessions that are associated with it, and
1329
    // the connection only removes a session from m_sessions at points in time
1330
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1331
    // (Connection::send_next_message() and Connection::~Connection()).
1332
    SessionQueue m_sessions_enlisted_to_send;
1333

1334
    Session* m_receiving_session = nullptr;
1335

1336
    bool m_is_sending = false;
1337
    bool m_is_closing = false;
1338

1339
    bool m_send_pong = false;
1340
    bool m_sending_pong = false;
1341

1342
    Trigger<network::Service> m_send_trigger;
1343

1344
    milliseconds_type m_last_ping_timestamp = 0;
1345

1346
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1347
    // set to true (initiation of soft close). Otherwise, if no messages have
1348
    // been received from the client, this is the time at which the connection
1349
    // object was initiated (completion of WebSocket handshake). Otherwise this
1350
    // is the time at which the last message was received from the client.
1351
    SteadyTimePoint m_last_activity_at;
1352

1353
    // These are initialized by do_initiate_soft_close().
1354
    //
1355
    // With recent versions of the protocol (when the version is greater than,
1356
    // or equal to 23), `m_error_session_ident` is always zero.
1357
    ProtocolError m_error_code = {};
1358
    session_ident_type m_error_session_ident = 0;
1359

1360
    struct LogMessage {
1361
        session_ident_type sess_ident;
1362
        util::Logger::Level level;
1363
        std::string message;
1364
        std::optional<std::string> co_id;
1365
    };
1366

1367
    std::mutex m_log_mutex;
1368
    std::queue<LogMessage> m_log_messages;
1369

1370
    static std::string make_logger_prefix(int_fast64_t id)
1371
    {
2,060✔
1372
        std::ostringstream out;
2,060✔
1373
        out.imbue(std::locale::classic());
2,060✔
1374
        out << "Sync Connection[" << id << "]: "; // Throws
2,060✔
1375
        return out.str();                         // Throws
2,060✔
1376
    }
2,060✔
1377

1378
    // The return value of handle_message_received() designates whether
1379
    // message processing should continue. If the connection object is
1380
    // destroyed during execution of handle_message_received(), the return
1381
    // value must be false.
1382
    void handle_message_received(const char* data, size_t size);
1383

1384
    void handle_ping_received(const char* data, size_t size);
1385

1386
    void send_next_message();
1387
    void send_pong(milliseconds_type timestamp);
1388
    void send_log_message(const LogMessage& log_msg);
1389

1390
    void handle_write_output_buffer();
1391
    void handle_pong_output_buffer();
1392

1393
    void initiate_write_error(ProtocolError, session_ident_type);
1394
    void handle_write_error(std::error_code ec);
1395

1396
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1397
    void read_error(std::error_code);
1398
    void write_error(std::error_code);
1399

1400
    void close_due_to_close_by_client(std::error_code);
1401
    void close_due_to_error(std::error_code);
1402

1403
    void terminate_sessions();
1404

1405
    void bad_session_ident(const char* message_type, session_ident_type);
1406
    void message_after_unbind(const char* message_type, session_ident_type);
1407
    void message_before_ident(const char* message_type, session_ident_type);
1408
};
1409

1410

1411
inline void SyncConnection::read_error(std::error_code ec)
1412
{
1,208✔
1413
    REALM_ASSERT(ec != util::error::operation_aborted);
1,208✔
1414
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,208✔
1415
        // Suicide
1416
        close_due_to_close_by_client(ec); // Throws
672✔
1417
        return;
672✔
1418
    }
672✔
1419
    if (ec == util::MiscExtErrors::delim_not_found) {
536✔
1420
        logger.error("Input message head delimited not found"); // Throws
×
1421
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1422
        return;
×
1423
    }
×
1424

1425
    logger.error("Reading failed: %1", ec.message()); // Throws
536✔
1426

1427
    // Suicide
1428
    close_due_to_error(ec); // Throws
536✔
1429
}
536✔
1430

1431
inline void SyncConnection::write_error(std::error_code ec)
1432
{
×
1433
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1434
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1435
        // Suicide
1436
        close_due_to_close_by_client(ec); // Throws
×
1437
        return;
×
1438
    }
×
1439
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1440

1441
    // Suicide
1442
    close_due_to_error(ec); // Throws
×
1443
}
×
1444

1445

1446
// ============================ HTTPConnection ============================
1447

1448
std::string g_user_agent = "User-Agent";
1449

1450
class HTTPConnection {
1451
public:
1452
    const std::shared_ptr<Logger> logger_ptr;
1453
    util::Logger& logger;
1454

1455
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1456
        : logger_ptr{std::make_shared<PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1,444✔
1457
                                                    serv.logger_ptr)} // Throws
1,444✔
1458
        , logger{*logger_ptr}
1,444✔
1459
        , m_server{serv}
1,444✔
1460
        , m_id{id}
1,444✔
1461
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1,444✔
1462
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1,444✔
1463
        , m_http_server{*this, logger_ptr}
1,444✔
1464
    {
3,302✔
1465
        // Make the output buffer stream throw std::bad_alloc if it fails to
1466
        // expand the buffer
1467
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
3,302✔
1468

1469
        if (is_ssl) {
3,302✔
1470
            using namespace network::ssl;
48✔
1471
            Context& ssl_context = serv.get_ssl_context();
48✔
1472
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1473
                                                    Stream::server); // Throws
48✔
1474
        }
48✔
1475
    }
3,302✔
1476

1477
    ServerImpl& get_server() noexcept
1478
    {
×
1479
        return m_server;
×
1480
    }
×
1481

1482
    int_fast64_t get_id() const noexcept
1483
    {
2,102✔
1484
        return m_id;
2,102✔
1485
    }
2,102✔
1486

1487
    network::Socket& get_socket() noexcept
1488
    {
5,056✔
1489
        return *m_socket;
5,056✔
1490
    }
5,056✔
1491

1492
    template <class H>
1493
    void async_write(const char* data, size_t size, H handler)
1494
    {
2,080✔
1495
        if (m_ssl_stream) {
2,080✔
1496
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1497
        }
14✔
1498
        else {
2,066✔
1499
            m_socket->async_write(data, size, std::move(handler)); // Throws
2,066✔
1500
        }
2,066✔
1501
    }
2,080✔
1502

1503
    template <class H>
1504
    void async_read(char* buffer, size_t size, H handler)
1505
    {
8✔
1506
        if (m_ssl_stream) {
8✔
1507
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1508
                                     std::move(handler)); // Throws
×
1509
        }
×
1510
        else {
8✔
1511
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1512
                                 std::move(handler)); // Throws
8✔
1513
        }
8✔
1514
    }
8✔
1515

1516
    template <class H>
1517
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1518
    {
18,612✔
1519
        if (m_ssl_stream) {
18,612✔
1520
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1521
                                           std::move(handler)); // Throws
126✔
1522
        }
126✔
1523
        else {
18,486✔
1524
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
18,486✔
1525
                                       std::move(handler)); // Throws
18,486✔
1526
        }
18,486✔
1527
    }
18,612✔
1528

1529
    void initiate(std::string remote_endpoint)
1530
    {
2,102✔
1531
        m_last_activity_at = steady_clock_now();
2,102✔
1532
        m_remote_endpoint = std::move(remote_endpoint);
2,102✔
1533

1534
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
2,102✔
1535

1536
        if (m_ssl_stream) {
2,102✔
1537
            initiate_ssl_handshake(); // Throws
24✔
1538
        }
24✔
1539
        else {
2,078✔
1540
            initiate_http(); // Throws
2,078✔
1541
        }
2,078✔
1542
    }
2,102✔
1543

1544
    void respond_200_ok()
1545
    {
×
1546
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1547
    }
×
1548

1549
    void respond_404_not_found()
1550
    {
×
1551
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1552
    }
×
1553

1554
    void respond_503_service_unavailable()
1555
    {
×
1556
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1557
    }
×
1558

1559
    // Commits suicide
1560
    template <class... Params>
1561
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1562
    {
42✔
1563
        logger.log(log_level, log_message, log_params...); // Throws
42✔
1564
        m_ssl_stream.reset();
42✔
1565
        m_socket.reset();
42✔
1566
        m_server.remove_http_connection(m_id); // Suicide
42✔
1567
    }
42✔
1568

1569
    // Commits suicide
1570
    void terminate_if_dead(SteadyTimePoint now)
1571
    {
2✔
1572
        milliseconds_type time = steady_duration(m_last_activity_at, now);
2✔
1573
        const Server::Config& config = m_server.get_config();
2✔
1574
        if (m_is_sending) {
2✔
1575
            if (time >= config.http_response_timeout) {
×
1576
                // Suicide
1577
                terminate(Logger::Level::detail,
×
1578
                          "HTTP connection closed (request timeout)"); // Throws
×
1579
            }
×
1580
        }
×
1581
        else {
2✔
1582
            if (time >= config.http_request_timeout) {
2✔
1583
                // Suicide
1584
                terminate(Logger::Level::detail,
×
1585
                          "HTTP connection closed (response timeout)"); // Throws
×
1586
            }
×
1587
        }
2✔
1588
    }
2✔
1589

1590
    std::string get_appservices_request_id() const
1591
    {
2,080✔
1592
        return m_appservices_request_id.to_string();
2,080✔
1593
    }
2,080✔
1594

1595
private:
1596
    ServerImpl& m_server;
1597
    const int_fast64_t m_id;
1598
    const ObjectId m_appservices_request_id = ObjectId::gen();
1599
    std::unique_ptr<network::Socket> m_socket;
1600
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1601
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1602
    HTTPServer<HTTPConnection> m_http_server;
1603
    OutputBuffer m_output_buffer;
1604
    bool m_is_sending = false;
1605
    SteadyTimePoint m_last_activity_at;
1606
    std::string m_remote_endpoint;
1607
    int m_negotiated_protocol_version = 0;
1608

1609
    void initiate_ssl_handshake()
1610
    {
24✔
1611
        auto handler = [this](std::error_code ec) {
24✔
1612
            if (ec != util::error::operation_aborted)
24✔
1613
                handle_ssl_handshake(ec); // Throws
24✔
1614
        };
24✔
1615
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1616
    }
24✔
1617

1618
    void handle_ssl_handshake(std::error_code ec)
1619
    {
24✔
1620
        if (ec) {
24✔
1621
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
10✔
1622
            close_due_to_error(ec);                                         // Throws
10✔
1623
            return;
10✔
1624
        }
10✔
1625
        initiate_http(); // Throws
14✔
1626
    }
14✔
1627

1628
    void initiate_http()
1629
    {
2,092✔
1630
        logger.debug("Connection initiates HTTP receipt");
2,092✔
1631

1632
        auto handler = [this](HTTPRequest request, std::error_code ec) {
2,092✔
1633
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
2,092✔
1634
                return;
×
1635
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
2,092✔
1636
                logger.error("Malformed HTTP request");
×
1637
                close_due_to_error(ec); // Throws
×
1638
                return;
×
1639
            }
×
1640
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
2,092✔
1641
                logger.error("Bad HTTP request");
8✔
1642
                const char* body = "The HTTP request was corrupted";
8✔
1643
                handle_400_bad_request(body); // Throws
8✔
1644
                return;
8✔
1645
            }
8✔
1646
            if (REALM_UNLIKELY(ec)) {
2,084✔
1647
                read_error(ec); // Throws
12✔
1648
                return;
12✔
1649
            }
12✔
1650
            handle_http_request(std::move(request)); // Throws
2,072✔
1651
        };
2,072✔
1652
        m_http_server.async_receive_request(std::move(handler)); // Throws
2,092✔
1653
    }
2,092✔
1654

1655
    void handle_http_request(const HTTPRequest& request)
1656
    {
2,072✔
1657
        StringData path = request.path;
2,072✔
1658

1659
        logger.debug("HTTP request received, request = %1", request);
2,072✔
1660

1661
        m_is_sending = true;
2,072✔
1662
        m_last_activity_at = steady_clock_now();
2,072✔
1663

1664
        // FIXME: When thinking of this function as a switching device, it seem
1665
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
1666
        // supposed to be mandatory, then that check ought to be delegated to
1667
        // handle_request_for_sync(), as that will yield a sharper separation of
1668
        // concerns.
1669
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
2,072✔
1670
            handle_request_for_sync(request); // Throws
2,060✔
1671
        }
2,060✔
1672
        else {
12✔
1673
            handle_404_not_found(request); // Throws
12✔
1674
        }
12✔
1675
    }
2,072✔
1676

1677
    void handle_request_for_sync(const HTTPRequest& request)
1678
    {
2,060✔
1679
        if (m_server.is_sync_stopped()) {
2,060✔
1680
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1681
                         "stopped"); // Throws
×
1682
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1683
                                                    "connections"); // Throws
×
1684
            return;
×
1685
        }
×
1686

1687
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
2,060✔
1688

1689
        // Figure out whether there are any protocol versions supported by both
1690
        // the client and the server, and if so, choose the newest one of them.
1691
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
2,060✔
1692
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
2,060✔
1693
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
2,060✔
1694
        {
2,060✔
1695
            protocol_version_ranges.clear();
2,060✔
1696
            util::MemoryInputStream in;
2,060✔
1697
            in.imbue(std::locale::classic());
2,060✔
1698
            in.unsetf(std::ios_base::skipws);
2,060✔
1699
            std::string_view value;
2,060✔
1700
            if (sec_websocket_protocol)
2,060✔
1701
                value = *sec_websocket_protocol;
2,060✔
1702
            HttpListHeaderValueParser parser{value};
2,060✔
1703
            std::string_view elem;
2,060✔
1704
            while (parser.next(elem)) {
28,840✔
1705
                // FIXME: Use std::string_view::begins_with() in C++20.
1706
                const StringData protocol{elem};
26,780✔
1707
                std::string_view prefix;
26,780✔
1708
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
26,780✔
1709
                    prefix = get_pbs_websocket_protocol_prefix();
26,780✔
1710
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
×
1711
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1712
                if (!prefix.empty()) {
26,780✔
1713
                    auto parse_version = [&](std::string_view str) {
26,780✔
1714
                        in.set_buffer(str.data(), str.data() + str.size());
26,780✔
1715
                        int version = 0;
26,780✔
1716
                        in >> version;
26,780✔
1717
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
26,780✔
1718
                            return version;
26,780✔
1719
                        return -1;
×
1720
                    };
26,780✔
1721
                    int min, max;
26,780✔
1722
                    std::string_view range = elem.substr(prefix.size());
26,780✔
1723
                    auto i = range.find('-');
26,780✔
1724
                    if (i != std::string_view::npos) {
26,780✔
1725
                        min = parse_version(range.substr(0, i));
×
1726
                        max = parse_version(range.substr(i + 1));
×
1727
                    }
×
1728
                    else {
26,780✔
1729
                        min = parse_version(range);
26,780✔
1730
                        max = min;
26,780✔
1731
                    }
26,780✔
1732
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
26,780✔
1733
                        protocol_version_ranges.emplace_back(min, max); // Throws
26,780✔
1734
                        continue;
26,780✔
1735
                    }
26,780✔
1736
                    logger.error("Protocol version negotiation failed: Client sent malformed "
×
1737
                                 "specification of supported protocol versions: '%1'",
×
1738
                                 elem); // Throws
×
1739
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
×
1740
                                           "specification of supported protocol "
×
1741
                                           "versions\n"); // Throws
×
1742
                    return;
×
1743
                }
26,780✔
1744
                logger.warn("Unrecognized protocol token in HTTP response header "
×
1745
                            "Sec-WebSocket-Protocol: '%1'",
×
1746
                            elem); // Throws
×
1747
            }
×
1748
            if (protocol_version_ranges.empty()) {
2,060✔
1749
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1750
                             "specification of supported protocol versions"); // Throws
×
1751
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1752
                                       "of supported protocol versions\n"); // Throws
×
1753
                return;
×
1754
            }
×
1755
        }
2,060✔
1756
        {
2,060✔
1757
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
2,060✔
1758
            int server_min = server_range.first;
2,060✔
1759
            int server_max = server_range.second;
2,060✔
1760
            int best_match = 0;
2,060✔
1761
            int overall_client_min = std::numeric_limits<int>::max();
2,060✔
1762
            int overall_client_max = std::numeric_limits<int>::min();
2,060✔
1763
            for (const auto& range : protocol_version_ranges) {
26,780✔
1764
                int client_min = range.first;
26,780✔
1765
                int client_max = range.second;
26,780✔
1766
                if (client_max >= server_min && client_min <= server_max) {
26,780✔
1767
                    // Overlap
1768
                    int version = std::min(client_max, server_max);
26,780✔
1769
                    if (version > best_match) {
26,780✔
1770
                        best_match = version;
2,060✔
1771
                    }
2,060✔
1772
                }
26,780✔
1773
                if (client_min < overall_client_min)
26,780✔
1774
                    overall_client_min = client_min;
26,780✔
1775
                if (client_max > overall_client_max)
26,780✔
1776
                    overall_client_max = client_max;
2,060✔
1777
            }
26,780✔
1778
            Formatter& formatter = misc_buffers.formatter;
2,060✔
1779
            if (REALM_UNLIKELY(best_match == 0)) {
2,060✔
1780
                const char* elaboration = "No version supported by both client and server";
×
1781
                auto format_ranges = [&](const auto& list) {
×
1782
                    bool nonfirst = false;
×
1783
                    for (auto range : list) {
×
1784
                        if (nonfirst)
×
1785
                            formatter << ", "; // Throws
×
1786
                        int min = range.first, max = range.second;
×
1787
                        REALM_ASSERT(min <= max);
×
1788
                        formatter << min;
×
1789
                        if (max != min)
×
1790
                            formatter << "-" << max;
×
1791
                        nonfirst = true;
×
1792
                    }
×
1793
                };
×
1794
                using Range = ProtocolVersionRange;
×
1795
                formatter.reset();
×
1796
                format_ranges(protocol_version_ranges); // Throws
×
1797
                logger.error("Protocol version negotiation failed: %1 "
×
1798
                             "(client supports: %2)",
×
1799
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1800
                formatter.reset();
×
1801
                formatter << "Protocol version negotiation failed: "
×
1802
                             ""
×
1803
                          << elaboration << ".\n\n";                                   // Throws
×
1804
                formatter << "Server supports: ";                                      // Throws
×
1805
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1806
                formatter << "\n";                                                     // Throws
×
1807
                formatter << "Client supports: ";                                      // Throws
×
1808
                format_ranges(protocol_version_ranges);                                // Throws
×
1809
                formatter << "\n";                                                     // Throws
×
1810
                handle_400_bad_request({formatter.data(), formatter.size()});          // Throws
×
1811
                return;
×
1812
            }
×
1813
            m_negotiated_protocol_version = best_match;
2,060✔
1814
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
2,060✔
1815
                         m_negotiated_protocol_version); // Throws
2,060✔
1816
            formatter.reset();
2,060✔
1817
        }
2,060✔
1818

1819
        std::string sec_websocket_protocol_2;
×
1820
        {
2,060✔
1821
            std::string_view prefix =
2,060✔
1822
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
2,060✔
1823
                    ? get_old_pbs_websocket_protocol_prefix()
2,060✔
1824
                    : get_pbs_websocket_protocol_prefix();
2,060✔
1825
            std::ostringstream out;
2,060✔
1826
            out.imbue(std::locale::classic());
2,060✔
1827
            out << prefix << m_negotiated_protocol_version; // Throws
2,060✔
1828
            sec_websocket_protocol_2 = std::move(out).str();
2,060✔
1829
        }
2,060✔
1830

1831
        std::error_code ec;
2,060✔
1832
        util::Optional<HTTPResponse> response =
2,060✔
1833
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
2,060✔
1834

1835
        if (ec) {
2,060✔
1836
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1837
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1838
            }
×
1839
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1840
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1841
            }
×
1842
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1843
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1844
            }
×
1845
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1846
                logger.error("The header Sec-WebSocket-Key is missing");
×
1847
            }
×
1848

1849
            logger.error("The HTTP request with the error is:\n%1", request);
×
1850
            logger.error("Check the proxy configuration and make sure that the "
×
1851
                         "HTTP request is a valid Websocket request.");
×
1852
            close_due_to_error(ec);
×
1853
            return;
×
1854
        }
×
1855
        REALM_ASSERT(response);
2,060✔
1856
        add_common_http_response_headers(*response);
2,060✔
1857

1858
        std::string user_agent;
2,060✔
1859
        {
2,060✔
1860
            auto i = request.headers.find(g_user_agent);
2,060✔
1861
            if (i != request.headers.end())
2,060✔
1862
                user_agent = i->second; // Throws (copy)
2,060✔
1863
        }
2,060✔
1864

1865
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
2,060✔
1866
                        this](std::error_code ec) {
2,060✔
1867
            // If the operation is aborted, the socket object may have been destroyed.
1868
            if (ec != util::error::operation_aborted) {
2,060✔
1869
                if (ec) {
2,060✔
1870
                    write_error(ec);
×
1871
                    return;
×
1872
                }
×
1873

1874
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
2,060✔
1875
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
2,060✔
1876
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
2,060✔
1877
                    get_appservices_request_id()); // Throws
2,060✔
1878
                SyncConnection& sync_conn_ref = *sync_conn;
2,060✔
1879
                m_server.add_sync_connection(m_id, std::move(sync_conn));
2,060✔
1880
                m_server.remove_http_connection(m_id);
2,060✔
1881
                sync_conn_ref.initiate();
2,060✔
1882
            }
2,060✔
1883
        };
2,060✔
1884
        m_http_server.async_send_response(*response, std::move(handler));
2,060✔
1885
    }
2,060✔
1886

1887
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1888
    {
20✔
1889
        std::string body_2 = std::string(body); // Throws
20✔
1890

1891
        HTTPResponse response;
20✔
1892
        response.status = http_status;
20✔
1893
        add_common_http_response_headers(response);
20✔
1894
        response.headers["Connection"] = "close";
20✔
1895

1896
        if (!body_2.empty()) {
20✔
1897
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1898
            response.body = std::move(body_2);
20✔
1899
        }
20✔
1900

1901
        auto handler = [this](std::error_code ec) {
20✔
1902
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1903
                return;
×
1904
            if (REALM_UNLIKELY(ec)) {
20✔
1905
                write_error(ec);
×
1906
                return;
×
1907
            }
×
1908
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1909
        };
20✔
1910
        m_http_server.async_send_response(response, std::move(handler));
20✔
1911
    }
20✔
1912

1913
    void handle_400_bad_request(std::string_view body)
1914
    {
8✔
1915
        logger.detail("400 Bad Request");
8✔
1916
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1917
    }
8✔
1918

1919
    void handle_404_not_found(const HTTPRequest&)
1920
    {
12✔
1921
        logger.detail("404 Not Found"); // Throws
12✔
1922
        handle_text_response(HTTPStatus::NotFound,
12✔
1923
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1924
    }
12✔
1925

1926
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1927
    {
×
1928
        logger.debug("503 Service Unavailable");                       // Throws
×
1929
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1930
    }
×
1931

1932
    void add_common_http_response_headers(HTTPResponse& response)
1933
    {
2,080✔
1934
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
2,080✔
1935
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
2,080✔
1936
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
1937
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1938
        }
20✔
1939
    }
2,080✔
1940

1941
    void read_error(std::error_code ec)
1942
    {
12✔
1943
        REALM_ASSERT(ec != util::error::operation_aborted);
12✔
1944
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
12!
1945
            // Suicide
1946
            close_due_to_close_by_client(ec); // Throws
12✔
1947
            return;
12✔
1948
        }
12✔
1949
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1950
            logger.error("Input message head delimited not found"); // Throws
×
1951
            close_due_to_error(ec);                                 // Throws
×
1952
            return;
×
1953
        }
×
1954

1955
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1956

1957
        // Suicide
1958
        close_due_to_error(ec); // Throws
×
1959
    }
×
1960

1961
    void write_error(std::error_code ec)
1962
    {
×
1963
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1964
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1965
            // Suicide
1966
            close_due_to_close_by_client(ec); // Throws
×
1967
            return;
×
1968
        }
×
1969
        logger.error("Writing failed: %1", ec.message()); // Throws
×
1970

1971
        // Suicide
1972
        close_due_to_error(ec); // Throws
×
1973
    }
×
1974

1975
    void close_due_to_close_by_client(std::error_code ec)
1976
    {
12✔
1977
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
12✔
1978
        // Suicide
1979
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
12✔
1980
    }
12✔
1981

1982
    void close_due_to_error(std::error_code ec)
1983
    {
10✔
1984
        // Suicide
1985
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
10✔
1986
                  ec.message()); // Throws
10✔
1987
    }
10✔
1988

1989
    static std::string make_logger_prefix(int_fast64_t id)
1990
    {
3,302✔
1991
        std::ostringstream out;
3,302✔
1992
        out.imbue(std::locale::classic());
3,302✔
1993
        out << "HTTP Connection[" << id << "]: "; // Throws
3,302✔
1994
        return out.str();                         // Throws
3,302✔
1995
    }
3,302✔
1996
};
1997

1998

1999
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2000
public:
2001
    std::size_t num_changesets = 0;
2002
    std::size_t accum_original_size = 0;
2003
    std::size_t accum_compacted_size = 0;
2004

2005
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2006
        : m_protocol{protocol}
18,880✔
2007
        , m_buffer{buffer}
18,880✔
2008
        , m_logger{logger}
18,880✔
2009
    {
42,010✔
2010
    }
42,010✔
2011

2012
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2013
    {
42,880✔
2014
        version_type client_version = entry.remote_version;
42,880✔
2015
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
42,880✔
2016
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
42,880✔
2017
        ++num_changesets;
42,880✔
2018
        accum_original_size += original_size;
42,880✔
2019
        accum_compacted_size += entry.changeset.size();
42,880✔
2020
    }
42,880✔
2021

2022
private:
2023
    ServerProtocol& m_protocol;
2024
    OutputBuffer& m_buffer;
2025
    util::Logger& m_logger;
2026
};
2027

2028

2029
// ============================ Session ============================
2030

2031
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2032
//   Protocol             ent file    IDENT    message   message   Error     message
2033
//   state                identifier  message  received  received  occurred  sent
2034
// ---------------------------------------------------------------------------------
2035
//   AllocatingIdent      yes         yes      no        no        no        no
2036
//   SendIdent            no          yes      no        no        no        no
2037
//   WaitForIdent         no          no       no        no        no        no
2038
//   WaitForUnbind        maybe       no       yes       no        no        no
2039
//   SendError            maybe       maybe    maybe     no        yes       no
2040
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2041
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2042
//
2043
//
2044
//   Condition                      Expression
2045
// ----------------------------------------------------------
2046
//   Need client file identifier    need_client_file_ident()
2047
//   Send IDENT message             must_send_ident_message()
2048
//   IDENT message received         ident_message_received()
2049
//   UNBIND message received        unbind_message_received()
2050
//   Error occurred                 error_occurred()
2051
//   ERROR message sent             m_error_message_sent
2052
//
2053
//
2054
//   Protocol
2055
//   state                Will send              Can receive
2056
// -----------------------------------------------------------------------
2057
//   AllocatingIdent      none                   UNBIND
2058
//   SendIdent            IDENT                  UNBIND
2059
//   WaitForIdent         none                   IDENT, UNBIND
2060
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2061
//                        MARK, ALLOC            ALLOC, UNBIND
2062
//   SendError            ERROR                  any
2063
//   WaitForUnbindErr     none                   any
2064
//   SendUnbound          UNBOUND                none
2065
//
2066
class Session final : private FileIdentReceiver {
2067
public:
2068
    util::PrefixLogger logger;
2069

2070
    Session(SyncConnection& conn, session_ident_type session_ident)
2071
        : logger{util::LogCategory::server, make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2,358✔
2072
        , m_connection{conn}
2,358✔
2073
        , m_session_ident{session_ident}
2,358✔
2074
    {
5,126✔
2075
    }
5,126✔
2076

2077
    ~Session() noexcept
2078
    {
5,126✔
2079
        REALM_ASSERT(!is_enlisted_to_send());
5,126✔
2080
        detach_from_server_file();
5,126✔
2081
    }
5,126✔
2082

2083
    SyncConnection& get_connection() noexcept
2084
    {
42,028✔
2085
        return m_connection;
42,028✔
2086
    }
42,028✔
2087

2088
    const Optional<std::array<char, 64>>& get_encryption_key()
2089
    {
×
2090
        return m_connection.get_server().get_config().encryption_key;
×
2091
    }
×
2092

2093
    session_ident_type get_session_ident() const noexcept
2094
    {
156✔
2095
        return m_session_ident;
156✔
2096
    }
156✔
2097

2098
    ServerProtocol& get_server_protocol() noexcept
2099
    {
57,592✔
2100
        return m_connection.get_server_protocol();
57,592✔
2101
    }
57,592✔
2102

2103
    bool need_client_file_ident() const noexcept
2104
    {
6,978✔
2105
        return (m_file_ident_request != 0);
6,978✔
2106
    }
6,978✔
2107

2108
    bool must_send_ident_message() const noexcept
2109
    {
4,276✔
2110
        return m_send_ident_message;
4,276✔
2111
    }
4,276✔
2112

2113
    bool ident_message_received() const noexcept
2114
    {
344,530✔
2115
        return m_client_file_ident != 0;
344,530✔
2116
    }
344,530✔
2117

2118
    bool unbind_message_received() const noexcept
2119
    {
346,240✔
2120
        return m_unbind_message_received;
346,240✔
2121
    }
346,240✔
2122

2123
    bool error_occurred() const noexcept
2124
    {
339,714✔
2125
        return int(m_error_code) != 0;
339,714✔
2126
    }
339,714✔
2127

2128
    bool relayed_alloc_request_in_progress() const noexcept
2129
    {
×
2130
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2131
    }
×
2132

2133
    // Returns the file identifier (always a nonzero value) of the client side
2134
    // file if ident_message_received() returns true. Otherwise it returns zero.
2135
    file_ident_type get_client_file_ident() const noexcept
2136
    {
×
2137
        return m_client_file_ident;
×
2138
    }
×
2139

2140
    void initiate()
2141
    {
5,126✔
2142
        logger.detail("Session initiated", m_session_ident); // Throws
5,126✔
2143
    }
5,126✔
2144

2145
    void terminate()
2146
    {
4,120✔
2147
        logger.detail("Session terminated", m_session_ident); // Throws
4,120✔
2148
    }
4,120✔
2149

2150
    // Initiate the deactivation process, if it has not been initiated already
2151
    // by the client.
2152
    //
2153
    // IMPORTANT: This function must not be called with protocol versions
2154
    // earlier than 23.
2155
    //
2156
    // The deactivation process will eventually lead to termination of the
2157
    // session.
2158
    //
2159
    // The session will detach itself from the server file when the deactivation
2160
    // process is initiated, regardless of whether it is initiated by the
2161
    // client, or by calling this function.
2162
    void initiate_deactivation(ProtocolError error_code)
2163
    {
78✔
2164
        REALM_ASSERT(is_session_level_error(error_code));
78✔
2165
        REALM_ASSERT(!error_occurred()); // Must only be called once
78✔
2166

2167
        // If the UNBIND message has been received, then the client has
2168
        // initiated the deactivation process already.
2169
        if (REALM_LIKELY(!unbind_message_received())) {
78✔
2170
            detach_from_server_file();
78✔
2171
            m_error_code = error_code;
78✔
2172
            // Protocol state is now SendError
2173
            ensure_enlisted_to_send();
78✔
2174
            return;
78✔
2175
        }
78✔
2176
        // Protocol state was SendUnbound, and remains unchanged
2177
    }
78✔
2178

2179
    bool is_enlisted_to_send() const noexcept
2180
    {
276,318✔
2181
        return m_next != nullptr;
276,318✔
2182
    }
276,318✔
2183

2184
    void ensure_enlisted_to_send() noexcept
2185
    {
53,508✔
2186
        if (!is_enlisted_to_send())
53,508✔
2187
            enlist_to_send();
52,496✔
2188
    }
53,508✔
2189

2190
    void enlist_to_send() noexcept
2191
    {
110,748✔
2192
        m_connection.enlist_to_send(this);
110,748✔
2193
    }
110,748✔
2194

2195
    // Overriding memeber function in FileIdentReceiver
2196
    void receive_file_ident(SaltedFileIdent file_ident) override final
2197
    {
1,352✔
2198
        // Protocol state must be AllocatingIdent or WaitForUnbind
2199
        if (!ident_message_received()) {
1,352✔
2200
            REALM_ASSERT(need_client_file_ident());
1,350✔
2201
            REALM_ASSERT(m_send_ident_message);
1,350✔
2202
        }
1,350✔
2203
        else {
2✔
2204
            REALM_ASSERT(!m_send_ident_message);
2✔
2205
        }
2✔
2206
        REALM_ASSERT(!unbind_message_received());
1,352✔
2207
        REALM_ASSERT(!error_occurred());
1,352✔
2208
        REALM_ASSERT(!m_error_message_sent);
1,352✔
2209

2210
        m_file_ident_request = 0;
1,352✔
2211
        m_allocated_file_ident = file_ident;
1,352✔
2212

2213
        // If the protocol state was AllocatingIdent, it is now SendIdent,
2214
        // otherwise it continues to be WaitForUnbind.
2215

2216
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,352✔
2217
                     file_ident.salt); // Throws
1,352✔
2218

2219
        ensure_enlisted_to_send();
1,352✔
2220
    }
1,352✔
2221

2222
    // Called by the associated connection object when this session is granted
2223
    // an opportunity to initiate the sending of a message.
2224
    //
2225
    // This function may lead to the destruction of the session object
2226
    // (suicide).
2227
    void send_message()
2228
    {
110,544✔
2229
        if (REALM_LIKELY(!unbind_message_received())) {
110,544✔
2230
            if (REALM_LIKELY(!error_occurred())) {
108,382✔
2231
                if (REALM_LIKELY(ident_message_received())) {
108,306✔
2232
                    // State is WaitForUnbind.
2233
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
106,952✔
2234
                    if (REALM_LIKELY(!relayed_alloc)) {
106,952✔
2235
                        // Send DOWNLOAD or MARK.
2236
                        continue_history_scan(); // Throws
106,950✔
2237
                        // Session object may have been
2238
                        // destroyed at this point (suicide)
2239
                        return;
106,950✔
2240
                    }
106,950✔
2241
                    send_alloc_message(); // Throws
2✔
2242
                    return;
2✔
2243
                }
106,952✔
2244
                // State is SendIdent
2245
                send_ident_message(); // Throws
1,354✔
2246
                return;
1,354✔
2247
            }
108,306✔
2248
            // State is SendError
2249
            send_error_message(); // Throws
76✔
2250
            return;
76✔
2251
        }
108,382✔
2252
        // State is SendUnbound
2253
        send_unbound_message(); // Throws
2,162✔
2254
        terminate();            // Throws
2,162✔
2255
        m_connection.discard_session(m_session_ident);
2,162✔
2256
        // This session is now destroyed!
2257
    }
2,162✔
2258

2259
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2260
                              bool is_subserver, ProtocolError& error)
2261
    {
5,126✔
2262
        if (logger.would_log(util::Logger::Level::info)) {
5,126✔
2263
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
364✔
2264
                          "need_client_file_ident=%3, is_subserver=%4)",
364✔
2265
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
364✔
2266
                          int(is_subserver)); // Throws
364✔
2267
        }
364✔
2268

2269
        ServerImpl& server = m_connection.get_server();
5,126✔
2270
        _impl::VirtualPathComponents virt_path_components =
5,126✔
2271
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
5,126✔
2272

2273
        if (!virt_path_components.is_valid) {
5,126✔
2274
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2275
                         "signed_user_token='%2')",
28✔
2276
                         path,
28✔
2277
                         short_token_fmt(signed_user_token)); // Throws
28✔
2278
            error = ProtocolError::illegal_realm_path;
28✔
2279
            return false;
28✔
2280
        }
28✔
2281

2282
        // The user has proper permissions at this stage.
2283

2284
        m_server_file = server.get_or_create_file(path); // Throws
5,098✔
2285

2286
        m_server_file->add_unidentified_session(this); // Throws
5,098✔
2287

2288
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
5,098✔
2289
                    m_connection.get_client_protocol_version(),
5,098✔
2290
                    m_connection.get_client_user_agent()); // Throws
5,098✔
2291

2292
        m_is_subserver = is_subserver;
5,098✔
2293
        if (REALM_LIKELY(!need_client_file_ident)) {
5,098✔
2294
            // Protocol state is now WaitForUnbind
2295
            return true;
3,694✔
2296
        }
3,694✔
2297

2298
        // FIXME: We must make a choice about client file ident for read only
2299
        // sessions. They should have a special read-only client file ident.
2300
        file_ident_type proxy_file = 0; // No proxy
1,404✔
2301
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
1,404✔
2302
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
1,404✔
2303
        m_send_ident_message = true;
1,404✔
2304
        // Protocol state is now AllocatingIdent
2305

2306
        return true;
1,404✔
2307
    }
5,098✔
2308

2309
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2310
                               version_type scan_server_version, version_type scan_client_version,
2311
                               version_type latest_server_version, salt_type latest_server_version_salt,
2312
                               ProtocolError& error)
2313
    {
4,278✔
2314
        // Protocol state must be WaitForIdent
2315
        REALM_ASSERT(!need_client_file_ident());
4,278✔
2316
        REALM_ASSERT(!m_send_ident_message);
4,278✔
2317
        REALM_ASSERT(!ident_message_received());
4,278✔
2318
        REALM_ASSERT(!unbind_message_received());
4,278✔
2319
        REALM_ASSERT(!error_occurred());
4,278✔
2320
        REALM_ASSERT(!m_error_message_sent);
4,278✔
2321

2322
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
4,278✔
2323
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
4,278✔
2324
                     "latest_server_version_salt=%6)",
4,278✔
2325
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
4,278✔
2326
                     latest_server_version, latest_server_version_salt); // Throws
4,278✔
2327

2328
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
4,278✔
2329
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
4,278✔
2330
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
4,278✔
2331
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
4,278✔
2332
        UploadCursor upload_threshold = {0, 0};
4,278✔
2333
        version_type locked_server_version = 0;
4,278✔
2334
        BootstrapError error_2 =
4,278✔
2335
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
4,278✔
2336
                                                    client_type, upload_threshold, locked_server_version,
4,278✔
2337
                                                    logger); // Throws
4,278✔
2338
        switch (error_2) {
4,278✔
2339
            case BootstrapError::no_error:
4,244✔
2340
                break;
4,244✔
2341
            case BootstrapError::client_file_expired:
✔
2342
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2343
                error = ProtocolError::client_file_expired;
×
2344
                return false;
×
2345
            case BootstrapError::bad_client_file_ident:
✔
2346
                logger.error("Bad client file ident (%1) in IDENT message",
×
2347
                             client_file_ident); // Throws
×
2348
                error = ProtocolError::bad_client_file_ident;
×
2349
                return false;
×
2350
            case BootstrapError::bad_client_file_ident_salt:
4✔
2351
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2352
                             client_file_ident_salt); // Throws
4✔
2353
                error = ProtocolError::diverging_histories;
4✔
2354
                return false;
4✔
2355
            case BootstrapError::bad_download_server_version:
✔
2356
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2357
                error = ProtocolError::bad_server_version;
×
2358
                return false;
×
2359
            case BootstrapError::bad_download_client_version:
4✔
2360
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2361
                error = ProtocolError::bad_client_version;
4✔
2362
                return false;
4✔
2363
            case BootstrapError::bad_server_version:
20✔
2364
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2365
                error = ProtocolError::bad_server_version;
20✔
2366
                return false;
20✔
2367
            case BootstrapError::bad_server_version_salt:
4✔
2368
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2369
                error = ProtocolError::diverging_histories;
4✔
2370
                return false;
4✔
2371
            case BootstrapError::bad_client_type:
✔
2372
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2373
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2374
                                                              // `bad_client_type`.
2375
                return false;
×
2376
        }
4,278✔
2377

2378
        // Make sure there is no other session currently associcated with the
2379
        // same client-side file
2380
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
4,244✔
2381
            SyncConnection& other_conn = other_sess->get_connection();
×
2382
            // It is a protocol violation if the other session is associated
2383
            // with the same connection
2384
            if (&other_conn == &m_connection) {
×
2385
                logger.error("Client file already bound in other session associated with "
×
2386
                             "the same connection"); // Throws
×
2387
                error = ProtocolError::bound_in_other_session;
×
2388
                return false;
×
2389
            }
×
2390
            // When the other session is associated with a different connection
2391
            // (`other_conn`), the clash may be due to the server not yet having
2392
            // realized that the other connection has been closed by the
2393
            // client. If so, the other connention is a "zombie". In the
2394
            // interest of getting rid of zombie connections as fast as
2395
            // possible, we shall assume that a clash with a session in another
2396
            // connection is always due to that other connection being a
2397
            // zombie. And when such a situation is detected, we want to close
2398
            // the zombie connection immediately.
2399
            auto log_level = Logger::Level::detail;
×
2400
            other_conn.terminate(log_level,
×
2401
                                 "Sync connection closed (superseded session)"); // Throws
×
2402
        }
×
2403

2404
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
4,244✔
2405

2406
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
4,244✔
2407
                                                                  m_session_ident, client_file_ident));
4,244✔
2408

2409
        m_server_file->identify_session(this, client_file_ident); // Throws
4,244✔
2410

2411
        m_client_file_ident = client_file_ident;
4,244✔
2412
        m_download_progress = download_progress;
4,244✔
2413
        m_upload_threshold = upload_threshold;
4,244✔
2414
        m_locked_server_version = locked_server_version;
4,244✔
2415

2416
        ServerImpl& server = m_connection.get_server();
4,244✔
2417
        const Server::Config& config = server.get_config();
4,244✔
2418
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
4,244✔
2419

2420
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
4,244✔
2421
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2422
                                              client_file_ident); // Throws
×
2423
        }
×
2424

2425
        // Protocol  state is now WaitForUnbind
2426
        enlist_to_send();
4,244✔
2427
        return true;
4,244✔
2428
    }
4,244✔
2429

2430
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2431
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2432
                                ProtocolError& error)
2433
    {
44,464✔
2434
        // Protocol state must be WaitForUnbind
2435
        REALM_ASSERT(!m_send_ident_message);
44,464✔
2436
        REALM_ASSERT(ident_message_received());
44,464✔
2437
        REALM_ASSERT(!unbind_message_received());
44,464✔
2438
        REALM_ASSERT(!error_occurred());
44,464✔
2439
        REALM_ASSERT(!m_error_message_sent);
44,464✔
2440

2441
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
44,464✔
2442
                      "locked_server_version=%3, num_changesets=%4)",
44,464✔
2443
                      progress_client_version, progress_server_version, locked_server_version,
44,464✔
2444
                      upload_changesets.size()); // Throws
44,464✔
2445

2446
        // We are unable to reproduce the cursor object for the upload progress
2447
        // when the protocol version is less than 29, because the client does
2448
        // not provide the required information. When the protocol version is
2449
        // less than 25, we can always get a consistent cursor by taking it from
2450
        // the changeset that was uploaded last, but in protocol versions 25,
2451
        // 26, 27, and 28, things are more complicated. Here, we receive new
2452
        // values for `last_integrated_server_version` which we cannot afford to
2453
        // ignore, but we do not know what client versions they correspond
2454
        // to. Fortunately, we can produce a cursor that works, and is mutually
2455
        // consistent with previous cursors, by simply bumping
2456
        // `upload_progress.client_version` when
2457
        // `upload_progress.last_intgerated_server_version` grows.
2458
        //
2459
        // To see that this scheme works, consider the last changeset, A, that
2460
        // will have already been uploaded and integrated at the beginning of
2461
        // the next session, and the first changeset, B, that follows A in the
2462
        // client side history, and is not upload skippable (of local origin and
2463
        // nonempty). We then need to show that A will be skipped, if uploaded
2464
        // in the next session, but B will not.
2465
        //
2466
        // Let V be the client version produced by A, and let T be the value of
2467
        // `upload_progress.client_version` as determined in this session, which
2468
        // is used as threshold in the next session. Then we know that A is
2469
        // skipped during the next session if V is less than, or equal to T. If
2470
        // the protocol version is at least 29, the protocol requires that T is
2471
        // greater than, or equal to V. If the protocol version is less than 25,
2472
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
2473
        // or 28, we construct T such that it is always greater than, or equal
2474
        // to V, so in all cases, A will be skipped during the next session.
2475
        //
2476
        // Let W be the client version on which B is based. We then know that B
2477
        // will be retained if, and only if W is greater than, or equalto T. If
2478
        // the protocol version is at least 29, we know that T is less than, or
2479
        // equal to W, since B is not integrated until the next session. If the
2480
        // protocol version is less tahn 25, we know that T is V. Since V must
2481
        // be less than, or equal to W, we again know that T is less than, or
2482
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
2483
        // construct T such that it is equal to V + N, where N is the number of
2484
        // observed increments in `last_integrated_server_version` since the
2485
        // client version prodiced by A. For each of these observed increments,
2486
        // there must have been a distinct new client version, but all these
2487
        // client versions must be less than, or equal to W, since B is not
2488
        // integrated until the next session. Therefore, we know that T = V + N
2489
        // is less than, or qual to W. So, in all cases, B will not skipped
2490
        // during the next session.
2491
        int protocol_version = m_connection.get_client_protocol_version();
44,464✔
2492
        static_cast<void>(protocol_version); // No protocol diversion (yet)
44,464✔
2493

2494
        UploadCursor upload_progress;
44,464✔
2495
        upload_progress = {progress_client_version, progress_server_version};
44,464✔
2496

2497
        // `upload_progress.client_version` must be nondecreasing across the
2498
        // session.
2499
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
44,464✔
2500
        if (REALM_UNLIKELY(!good_1)) {
44,464✔
2501
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2502
                         m_upload_progress.client_version); // Throws
×
2503
            error = ProtocolError::bad_client_version;
×
2504
            return false;
×
2505
        }
×
2506
        // `upload_progress.last_integrated_server_version` must be a version
2507
        // that the client can have heard about.
2508
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
44,464✔
2509
        if (REALM_UNLIKELY(!good_2)) {
44,464✔
2510
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2511
                         upload_progress.last_integrated_server_version,
×
2512
                         m_download_progress.server_version); // Throws
×
2513
            error = ProtocolError::bad_server_version;
×
2514
            return false;
×
2515
        }
×
2516

2517
        // `upload_progress` must be consistent.
2518
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
44,464✔
2519
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2520
                         upload_progress.last_integrated_server_version); // Throws
×
2521
            error = ProtocolError::bad_server_version;
×
2522
            return false;
×
2523
        }
×
2524
        // `upload_progress` and `m_upload_threshold` must be mutually
2525
        // consistent.
2526
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
44,464✔
2527
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2528
                         "threshold (%3, %4)",
×
2529
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2530
                         m_upload_threshold.client_version,
×
2531
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2532
            error = ProtocolError::bad_server_version;
×
2533
            return false;
×
2534
        }
×
2535
        // `upload_progress` and `m_upload_progress` must be mutually
2536
        // consistent.
2537
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
44,464✔
2538
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2539
                         "upload progress (%3, %4)",
×
2540
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2541
                         m_upload_progress.client_version,
×
2542
                         m_upload_progress.last_integrated_server_version); // Throws
×
2543
            error = ProtocolError::bad_server_version;
×
2544
            return false;
×
2545
        }
×
2546

2547
        version_type locked_server_version_2 = locked_server_version;
44,464✔
2548

2549
        // `locked_server_version_2` must be nondecreasing over the lifetime of
2550
        // the client-side file.
2551
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
44,464✔
2552
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2553
                         m_locked_server_version); // Throws
×
2554
            error = ProtocolError::bad_server_version;
×
2555
            return false;
×
2556
        }
×
2557
        // `locked_server_version_2` must be a version that the client can have
2558
        // heard about.
2559
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
44,464✔
2560
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2561
                         m_download_progress.server_version); // Throws
×
2562
            error = ProtocolError::bad_server_version;
×
2563
            return false;
×
2564
        }
×
2565

2566
        std::size_t num_previously_integrated_changesets = 0;
44,464✔
2567
        if (!upload_changesets.empty()) {
44,464✔
2568
            UploadCursor up = m_upload_progress;
24,512✔
2569
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
36,904✔
2570
                // `uc.upload_cursor.client_version` must be increasing across
2571
                // all the changesets in this UPLOAD message, and all must be
2572
                // greater than upload_progress.client_version of previous
2573
                // UPLOAD message.
2574
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
36,904✔
2575
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2576
                                 "changeset (%1 <= %2)",
×
2577
                                 uc.upload_cursor.client_version,
×
2578
                                 up.client_version); // Throws
×
2579
                    error = ProtocolError::bad_client_version;
×
2580
                    return false;
×
2581
                }
×
2582
                // `uc.upload_progress` must be consistent.
2583
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
36,904✔
2584
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2585
                                 uc.upload_cursor.client_version,
×
2586
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2587
                    error = ProtocolError::bad_server_version;
×
2588
                    return false;
×
2589
                }
×
2590
                // `uc.upload_progress` must be mutually consistent with
2591
                // previous upload cursor.
2592
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
36,904✔
2593
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2594
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2595
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2596
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2597
                    error = ProtocolError::bad_server_version;
×
2598
                    return false;
×
2599
                }
×
2600
                // `uc.upload_progress` must be mutually consistent with
2601
                // threshold, that is, for changesets that have not previously
2602
                // been integrated, it is important that the specified value of
2603
                // `last_integrated_server_version` is greater than, or equal to
2604
                // the reciprocal history base version.
2605
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
36,904✔
2606
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
36,904✔
2607
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2608
                                 "inconsistent with threshold (%3, %4)",
×
2609
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2610
                                 m_upload_threshold.client_version,
×
2611
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2612
                    error = ProtocolError::bad_server_version;
×
2613
                    return false;
×
2614
                }
×
2615
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
36,904✔
2616
                if (previously_integrated)
36,904✔
2617
                    ++num_previously_integrated_changesets;
2,448✔
2618
                up = uc.upload_cursor;
36,904✔
2619
            }
36,904✔
2620
            // `upload_progress.client_version` must be greater than, or equal
2621
            // to client versions produced by each of the changesets in this
2622
            // UPLOAD message.
2623
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
24,512✔
2624
                logger.error("Upload progress less than client version produced by uploaded "
×
2625
                             "changeset (%1 > %2)",
×
2626
                             up.client_version,
×
2627
                             upload_progress.client_version); // Throws
×
2628
                error = ProtocolError::bad_client_version;
×
2629
                return false;
×
2630
            }
×
2631
            // The upload cursor of last uploaded changeset must be mutually
2632
            // consistent with the reported upload progress.
2633
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
24,512✔
2634
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2635
                             "inconsistent with upload progress (%3, %4)",
×
2636
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2637
                             upload_progress.last_integrated_server_version); // Throws
×
2638
                error = ProtocolError::bad_server_version;
×
2639
                return false;
×
2640
            }
×
2641
        }
24,512✔
2642

2643
        // FIXME: Part of a very poor man's substitute for a proper backpressure
2644
        // scheme.
2645
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
44,464✔
2646
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2647
            // Using this exact error code, because it causes `try_again` flag
2648
            // to be set to true, which causes the client to wait for about 5
2649
            // minuites before trying to connect again.
2650
            error = ProtocolError::connection_closed;
×
2651
            return false;
×
2652
        }
×
2653

2654
        m_upload_progress = upload_progress;
44,464✔
2655

2656
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
44,464✔
2657
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
44,464✔
2658

2659
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
44,464✔
2660
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
44,464✔
2661

2662
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
44,464✔
2663
        if (!have_anything_to_do)
44,464✔
2664
            return true;
242✔
2665

2666
        if (!have_real_upload_progress)
44,222✔
2667
            upload_progress = m_upload_threshold;
×
2668

2669
        if (num_previously_integrated_changesets > 0) {
44,222✔
2670
            logger.detail("Ignoring %1 previously integrated changesets",
830✔
2671
                          num_previously_integrated_changesets); // Throws
830✔
2672
        }
830✔
2673
        if (num_changesets_to_integrate > 0) {
44,222✔
2674
            logger.detail("Initiate integration of %1 remote changesets",
24,134✔
2675
                          num_changesets_to_integrate); // Throws
24,134✔
2676
        }
24,134✔
2677

2678
        REALM_ASSERT(m_server_file);
44,222✔
2679
        ServerFile& file = *m_server_file;
44,222✔
2680
        std::size_t offset = num_previously_integrated_changesets;
44,222✔
2681
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
44,222✔
2682
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
44,222✔
2683

2684
        m_locked_server_version = locked_server_version_2;
44,222✔
2685
        return true;
44,222✔
2686
    }
44,464✔
2687

2688
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2689
    {
12,016✔
2690
        // Protocol state must be WaitForUnbind
2691
        REALM_ASSERT(!m_send_ident_message);
12,016✔
2692
        REALM_ASSERT(ident_message_received());
12,016✔
2693
        REALM_ASSERT(!unbind_message_received());
12,016✔
2694
        REALM_ASSERT(!error_occurred());
12,016✔
2695
        REALM_ASSERT(!m_error_message_sent);
12,016✔
2696

2697
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
12,016✔
2698

2699
        m_download_completion_request = request_ident;
12,016✔
2700

2701
        ensure_enlisted_to_send();
12,016✔
2702
        return true;
12,016✔
2703
    }
12,016✔
2704

2705
    // Returns true if the deactivation process has been completed, at which
2706
    // point the caller (SyncConnection::receive_unbind_message()) should
2707
    // terminate the session.
2708
    //
2709
    // CAUTION: This function may commit suicide!
2710
    void receive_unbind_message()
2711
    {
2,200✔
2712
        // Protocol state may be anything but SendUnbound
2713
        REALM_ASSERT(!m_unbind_message_received);
2,200✔
2714

2715
        logger.detail("Received: UNBIND"); // Throws
2,200✔
2716

2717
        detach_from_server_file();
2,200✔
2718
        m_unbind_message_received = true;
2,200✔
2719

2720
        // Detect completion of the deactivation process
2721
        if (m_error_message_sent) {
2,200✔
2722
            // Deactivation process completed
2723
            terminate(); // Throws
30✔
2724
            m_connection.discard_session(m_session_ident);
30✔
2725
            // This session is now destroyed!
2726
            return;
30✔
2727
        }
30✔
2728

2729
        // Protocol state is now SendUnbound
2730
        ensure_enlisted_to_send();
2,170✔
2731
    }
2,170✔
2732

2733
    void receive_error_message(session_ident_type, int, std::string_view)
2734
    {
×
2735
        REALM_ASSERT(!m_unbind_message_received);
×
2736

2737
        logger.detail("Received: ERROR"); // Throws
×
2738
    }
×
2739

2740
private:
2741
    SyncConnection& m_connection;
2742

2743
    const session_ident_type m_session_ident;
2744

2745
    // Not null if, and only if this session is in
2746
    // m_connection.m_sessions_enlisted_to_send.
2747
    Session* m_next = nullptr;
2748

2749
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2750
    // reset to null when the deactivation process is initiated, either when the
2751
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2752
    util::bind_ptr<ServerFile> m_server_file;
2753

2754
    bool m_disable_download = false;
2755
    bool m_is_subserver = false;
2756

2757
    using file_ident_request_type = ServerFile::file_ident_request_type;
2758

2759
    // When nonzero, this session has an outstanding request for a client file
2760
    // identifier.
2761
    file_ident_request_type m_file_ident_request = 0;
2762

2763
    // Payload for next outgoing ALLOC message.
2764
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2765

2766
    // Zero until the session receives an IDENT message from the client.
2767
    file_ident_type m_client_file_ident = 0;
2768

2769
    // Zero until initiate_deactivation() is called.
2770
    ProtocolError m_error_code = {};
2771

2772
    // The current point of progression of the download process. Set to (<server
2773
    // version>, <client version>) of the IDENT message when the IDENT message
2774
    // is received. At the time of return from continue_history_scan(), it
2775
    // points to the latest server version such that all preceding changesets in
2776
    // the server-side history have been downloaded, are currently being
2777
    // downloaded, or are *download excluded*.
2778
    DownloadCursor m_download_progress = {0, 0};
2779

2780
    request_ident_type m_download_completion_request = 0;
2781

2782
    // Records the progress of the upload process. Used to check that the client
2783
    // uploads changesets in order. Also, when m_upload_progress >
2784
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2785
    // version of the upload progress.
2786
    UploadCursor m_upload_progress = {0, 0};
2787

2788
    // Initialized on reception of the IDENT message. Specifies the actual
2789
    // upload progress (as recorded on the server-side) at the beginning of the
2790
    // session, and it remains fixed throughout the session.
2791
    //
2792
    // m_upload_threshold includes the progress resulting from the received
2793
    // changesets that have not yet been integrated (only relevant for
2794
    // synchronous backup).
2795
    UploadCursor m_upload_threshold = {0, 0};
2796

2797
    // Works partially as a cache of the persisted value, and partially as a way
2798
    // of checking that the client respects that it can never decrease.
2799
    version_type m_locked_server_version = 0;
2800

2801
    bool m_send_ident_message = false;
2802
    bool m_unbind_message_received = false;
2803
    bool m_error_message_sent = false;
2804

2805
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2806
    /// has been sent in the current session. The variable is used to ensure
2807
    /// that a DOWNLOAD message is always sent in a session. The received
2808
    /// DOWNLOAD message is needed by the client to ensure that its current
2809
    /// download progress is up to date.
2810
    bool m_one_download_message_sent = false;
2811

2812
    static std::string make_logger_prefix(session_ident_type session_ident)
2813
    {
5,126✔
2814
        std::ostringstream out;
5,126✔
2815
        out.imbue(std::locale::classic());
5,126✔
2816
        out << "Session[" << session_ident << "]: "; // Throws
5,126✔
2817
        return out.str();                            // Throws
5,126✔
2818
    }
5,126✔
2819

2820
    // Scan the history for changesets to be downloaded.
2821
    // If the history is longer than the end point of the previous scan,
2822
    // a DOWNLOAD message will be sent.
2823
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2824
    // requested to be notified about download completion.
2825
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2826
    //
2827
    // This function may lead to the destruction of the session object
2828
    // (suicide).
2829
    void continue_history_scan()
2830
    {
106,952✔
2831
        // Protocol state must be WaitForUnbind
2832
        REALM_ASSERT(!m_send_ident_message);
106,952✔
2833
        REALM_ASSERT(ident_message_received());
106,952✔
2834
        REALM_ASSERT(!unbind_message_received());
106,952✔
2835
        REALM_ASSERT(!error_occurred());
106,952✔
2836
        REALM_ASSERT(!m_error_message_sent);
106,952✔
2837
        REALM_ASSERT(!is_enlisted_to_send());
106,952✔
2838

2839
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
106,952✔
2840
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
106,952✔
2841

2842
        ServerImpl& server = m_connection.get_server();
106,952✔
2843
        const Server::Config& config = server.get_config();
106,952✔
2844
        if (REALM_UNLIKELY(m_disable_download))
106,952✔
2845
            return;
×
2846

2847
        bool have_more_to_scan =
106,952✔
2848
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
106,952✔
2849
        if (have_more_to_scan) {
106,952✔
2850
            m_server_file->register_client_access(m_client_file_ident);     // Throws
42,008✔
2851
            const ServerHistory& history = m_server_file->access().history; // Throws
42,008✔
2852
            const char* body;
42,008✔
2853
            std::size_t uncompressed_body_size;
42,008✔
2854
            std::size_t compressed_body_size = 0;
42,008✔
2855
            bool body_is_compressed = false;
42,008✔
2856
            version_type end_version = last_server_version.version;
42,008✔
2857
            DownloadCursor download_progress;
42,008✔
2858
            UploadCursor upload_progress = {0, 0};
42,008✔
2859
            std::uint_fast64_t downloadable_bytes = 0;
42,008✔
2860
            std::size_t num_changesets;
42,008✔
2861
            std::size_t accum_original_size;
42,008✔
2862
            std::size_t accum_compacted_size;
42,008✔
2863
            ServerProtocol& protocol = get_server_protocol();
42,008✔
2864
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
42,008!
2865
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
42,008!
2866
            DownloadCache& cache = m_server_file->get_download_cache();
42,008✔
2867
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
42,008!
2868
            if (fetch_from_cache) {
42,008✔
2869
                body = cache.body.get();
×
2870
                uncompressed_body_size = cache.uncompressed_body_size;
×
2871
                compressed_body_size = cache.compressed_body_size;
×
2872
                body_is_compressed = cache.body_is_compressed;
×
2873
                download_progress = cache.download_progress;
×
2874
                downloadable_bytes = cache.downloadable_bytes;
×
2875
                num_changesets = cache.num_changesets;
×
2876
                accum_original_size = cache.accum_original_size;
×
2877
                accum_compacted_size = cache.accum_compacted_size;
×
2878
            }
×
2879
            else {
42,008✔
2880
                // Discard the old cached DOWNLOAD body before generating a new
2881
                // one to be cached. This can make a big difference because the
2882
                // size of that body can be very large (10GiB has been seen in a
2883
                // real-world case).
2884
                if (enable_cache)
42,008✔
2885
                    cache.body = {};
×
2886

2887
                OutputBuffer& out = server.get_misc_buffers().download_message;
42,008✔
2888
                out.reset();
42,008✔
2889
                download_progress = m_download_progress;
42,008✔
2890
                auto fetch_and_compress = [&](std::size_t max_download_size) {
42,010✔
2891
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
42,010✔
2892
                    std::uint_fast64_t cumulative_byte_size_current;
42,010✔
2893
                    std::uint_fast64_t cumulative_byte_size_total;
42,010✔
2894
                    bool not_expired = history.fetch_download_info(
42,010✔
2895
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
42,010✔
2896
                        cumulative_byte_size_current, cumulative_byte_size_total,
42,010✔
2897
                        max_download_size); // Throws
42,010✔
2898
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
42,010✔
2899
                    SyncConnection& conn = get_connection();
42,010✔
2900
                    if (REALM_UNLIKELY(!not_expired)) {
42,010✔
2901
                        logger.debug("History scanning failed: Client file entry "
×
2902
                                     "expired during session"); // Throws
×
2903
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2904
                        // Session object may have been destroyed at this point
2905
                        // (suicide).
2906
                        return false;
×
2907
                    }
×
2908

2909
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
42,010✔
2910
                    uncompressed_body_size = out.size();
42,010✔
2911
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
42,010✔
2912
                    body = uncompressed.data();
42,010✔
2913
                    std::size_t max_uncompressed = 1024;
42,010✔
2914
                    if (uncompressed.size() > max_uncompressed) {
42,010✔
2915
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,182✔
2916
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,182✔
2917
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,182✔
2918
                        if (buffer.size() < uncompressed.size()) {
4,182✔
2919
                            body = buffer.data();
4,182✔
2920
                            compressed_body_size = buffer.size();
4,182✔
2921
                            body_is_compressed = true;
4,182✔
2922
                        }
4,182✔
2923
                    }
4,182✔
2924
                    num_changesets = handler.num_changesets;
42,010✔
2925
                    accum_original_size = handler.accum_original_size;
42,010✔
2926
                    accum_compacted_size = handler.accum_compacted_size;
42,010✔
2927
                    return true;
42,010✔
2928
                };
42,010✔
2929
                if (enable_cache) {
42,008✔
2930
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2931
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2932
                        // Session object may have been destroyed at this point
2933
                        // (suicide).
2934
                        return;
×
2935
                    }
×
2936
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2937
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2938
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2939
                    std::copy(body, body + body_size, cache.body.get());
×
2940
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2941
                    cache.compressed_body_size = compressed_body_size;
×
2942
                    cache.body_is_compressed = body_is_compressed;
×
2943
                    cache.end_version = end_version;
×
2944
                    cache.download_progress = download_progress;
×
2945
                    cache.downloadable_bytes = downloadable_bytes;
×
2946
                    cache.num_changesets = num_changesets;
×
2947
                    cache.accum_original_size = accum_original_size;
×
2948
                    cache.accum_compacted_size = accum_compacted_size;
×
2949
                }
×
2950
                else {
42,008✔
2951
                    std::size_t max_download_size = config.max_download_size;
42,008✔
2952
                    if (!fetch_and_compress(max_download_size)) { // Throws
42,008✔
2953
                        // Session object may have been destroyed at this point
2954
                        // (suicide).
2955
                        return;
×
2956
                    }
×
2957
                }
42,008✔
2958
            }
42,008✔
2959

2960
            OutputBuffer& out = m_connection.get_output_buffer();
42,008✔
2961
            protocol.make_download_message(
42,008✔
2962
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
42,008✔
2963
                download_progress.last_integrated_client_version, last_server_version.version,
42,008✔
2964
                last_server_version.salt, upload_progress.client_version,
42,008✔
2965
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
42,008✔
2966
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
42,008✔
2967

2968
            m_download_progress = download_progress;
42,008✔
2969
            logger.debug("Setting of m_download_progress.server_version = %1",
42,008✔
2970
                         m_download_progress.server_version); // Throws
42,008✔
2971
            send_download_message();
42,008✔
2972
            m_one_download_message_sent = true;
42,008✔
2973

2974
            enlist_to_send();
42,008✔
2975
        }
42,008✔
2976
        else if (m_download_completion_request) {
64,944✔
2977
            // Send a MARK message
2978
            request_ident_type request_ident = m_download_completion_request;
11,994✔
2979
            send_mark_message(request_ident);  // Throws
11,994✔
2980
            m_download_completion_request = 0; // Request handled
11,994✔
2981
            enlist_to_send();
11,994✔
2982
        }
11,994✔
2983
    }
106,952✔
2984

2985
    void send_ident_message()
2986
    {
1,352✔
2987
        // Protocol state must be SendIdent
2988
        REALM_ASSERT(!need_client_file_ident());
1,352✔
2989
        REALM_ASSERT(m_send_ident_message);
1,352✔
2990
        REALM_ASSERT(!ident_message_received());
1,352✔
2991
        REALM_ASSERT(!unbind_message_received());
1,352✔
2992
        REALM_ASSERT(!error_occurred());
1,352✔
2993
        REALM_ASSERT(!m_error_message_sent);
1,352✔
2994

2995
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,352✔
2996

2997
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,352✔
2998
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,352✔
2999

3000
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,352✔
3001
                     client_file_ident_salt); // Throws
1,352✔
3002

3003
        ServerProtocol& protocol = get_server_protocol();
1,352✔
3004
        OutputBuffer& out = m_connection.get_output_buffer();
1,352✔
3005
        int protocol_version = m_connection.get_client_protocol_version();
1,352✔
3006
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,352✔
3007
                                    client_file_ident_salt); // Throws
1,352✔
3008
        m_connection.initiate_write_output_buffer();         // Throws
1,352✔
3009

3010
        m_allocated_file_ident.ident = 0; // Consumed
1,352✔
3011
        m_send_ident_message = false;
1,352✔
3012
        // Protocol state is now WaitForStateRequest or WaitForIdent
3013
    }
1,352✔
3014

3015
    void send_download_message()
3016
    {
42,010✔
3017
        m_connection.initiate_write_output_buffer(); // Throws
42,010✔
3018
    }
42,010✔
3019

3020
    void send_mark_message(request_ident_type request_ident)
3021
    {
11,992✔
3022
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
11,992✔
3023

3024
        ServerProtocol& protocol = get_server_protocol();
11,992✔
3025
        OutputBuffer& out = m_connection.get_output_buffer();
11,992✔
3026
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
11,992✔
3027
        m_connection.initiate_write_output_buffer();                     // Throws
11,992✔
3028
    }
11,992✔
3029

3030
    void send_alloc_message()
3031
    {
×
3032
        // Protocol state must be WaitForUnbind
3033
        REALM_ASSERT(!m_send_ident_message);
×
3034
        REALM_ASSERT(ident_message_received());
×
3035
        REALM_ASSERT(!unbind_message_received());
×
3036
        REALM_ASSERT(!error_occurred());
×
3037
        REALM_ASSERT(!m_error_message_sent);
×
3038

3039
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3040

3041
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3042
        REALM_ASSERT(false);
×
3043

3044
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3045

3046
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3047

3048
        ServerProtocol& protocol = get_server_protocol();
×
3049
        OutputBuffer& out = m_connection.get_output_buffer();
×
3050
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3051
        m_connection.initiate_write_output_buffer();                   // Throws
×
3052

3053
        m_allocated_file_ident.ident = 0; // Consumed
×
3054

3055
        // Other messages may be waiting to be sent.
3056
        enlist_to_send();
×
3057
    }
×
3058

3059
    void send_unbound_message()
3060
    {
2,160✔
3061
        // Protocol state must be SendUnbound
3062
        REALM_ASSERT(unbind_message_received());
2,160✔
3063
        REALM_ASSERT(!m_error_message_sent);
2,160✔
3064

3065
        logger.debug("Sending: UNBOUND"); // Throws
2,160✔
3066

3067
        ServerProtocol& protocol = get_server_protocol();
2,160✔
3068
        OutputBuffer& out = m_connection.get_output_buffer();
2,160✔
3069
        protocol.make_unbound_message(out, m_session_ident); // Throws
2,160✔
3070
        m_connection.initiate_write_output_buffer();         // Throws
2,160✔
3071
    }
2,160✔
3072

3073
    void send_error_message()
3074
    {
76✔
3075
        // Protocol state must be SendError
3076
        REALM_ASSERT(!unbind_message_received());
76✔
3077
        REALM_ASSERT(error_occurred());
76✔
3078
        REALM_ASSERT(!m_error_message_sent);
76✔
3079

3080
        REALM_ASSERT(is_session_level_error(m_error_code));
76✔
3081

3082
        ProtocolError error_code = m_error_code;
76✔
3083
        const char* message = get_protocol_error_message(int(error_code));
76✔
3084
        std::size_t message_size = std::strlen(message);
76✔
3085
        bool try_again = determine_try_again(error_code);
76✔
3086

3087
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
76✔
3088
                      try_again); // Throws
76✔
3089

3090
        ServerProtocol& protocol = get_server_protocol();
76✔
3091
        OutputBuffer& out = m_connection.get_output_buffer();
76✔
3092
        int protocol_version = m_connection.get_client_protocol_version();
76✔
3093
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
76✔
3094
                                    m_session_ident); // Throws
76✔
3095
        m_connection.initiate_write_output_buffer();  // Throws
76✔
3096

3097
        m_error_message_sent = true;
76✔
3098
        // Protocol state is now WaitForUnbindErr
3099
    }
76✔
3100

3101
    void send_log_message(util::Logger::Level level, const std::string&& message)
3102
    {
4,248✔
3103
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
4,248✔
3104
            return logger.log(level, message.c_str());
×
3105
        }
×
3106

3107
        m_connection.send_log_message(level, std::move(message), m_session_ident);
4,248✔
3108
    }
4,248✔
3109

3110
    // Idempotent
3111
    void detach_from_server_file() noexcept
3112
    {
7,402✔
3113
        if (!m_server_file)
7,402✔
3114
            return;
2,304✔
3115
        ServerFile& file = *m_server_file;
5,098✔
3116
        if (ident_message_received()) {
5,098✔
3117
            file.remove_identified_session(m_client_file_ident);
4,248✔
3118
        }
4,248✔
3119
        else {
850✔
3120
            file.remove_unidentified_session(this);
850✔
3121
        }
850✔
3122
        if (m_file_ident_request != 0)
5,098✔
3123
            file.cancel_file_ident_request(m_file_ident_request);
52✔
3124
        m_server_file.reset();
5,098✔
3125
    }
5,098✔
3126

3127
    friend class SessionQueue;
3128
};
3129

3130

3131
// ============================ SessionQueue implementation ============================
3132

3133
void SessionQueue::push_back(Session* sess) noexcept
3134
{
110,748✔
3135
    REALM_ASSERT(!sess->m_next);
110,748✔
3136
    if (m_back) {
110,748✔
3137
        sess->m_next = m_back->m_next;
39,890✔
3138
        m_back->m_next = sess;
39,890✔
3139
    }
39,890✔
3140
    else {
70,858✔
3141
        sess->m_next = sess;
70,858✔
3142
    }
70,858✔
3143
    m_back = sess;
110,748✔
3144
}
110,748✔
3145

3146

3147
Session* SessionQueue::pop_front() noexcept
3148
{
160,336✔
3149
    Session* sess = nullptr;
160,336✔
3150
    if (m_back) {
160,336✔
3151
        sess = m_back->m_next;
110,542✔
3152
        if (sess != m_back) {
110,542✔
3153
            m_back->m_next = sess->m_next;
39,820✔
3154
        }
39,820✔
3155
        else {
70,722✔
3156
            m_back = nullptr;
70,722✔
3157
        }
70,722✔
3158
        sess->m_next = nullptr;
110,542✔
3159
    }
110,542✔
3160
    return sess;
160,336✔
3161
}
160,336✔
3162

3163

3164
void SessionQueue::clear() noexcept
3165
{
3,304✔
3166
    if (m_back) {
3,304✔
3167
        Session* sess = m_back;
134✔
3168
        for (;;) {
198✔
3169
            Session* next = sess->m_next;
198✔
3170
            sess->m_next = nullptr;
198✔
3171
            if (next == m_back)
198✔
3172
                break;
134✔
3173
            sess = next;
64✔
3174
        }
64✔
3175
        m_back = nullptr;
134✔
3176
    }
134✔
3177
}
3,304✔
3178

3179

3180
// ============================ ServerFile implementation ============================
3181

3182
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3183
                       std::string real_path, bool disable_sync_to_disk)
3184
    : logger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.logger_ptr}               // Throws
446✔
3185
    , wlogger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.get_worker().logger_ptr} // Throws
446✔
3186
    , m_server{server}
446✔
3187
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
446✔
3188
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
446✔
3189
{
1,138✔
3190
}
1,138✔
3191

3192

3193
ServerFile::~ServerFile() noexcept
3194
{
1,138✔
3195
    REALM_ASSERT(m_unidentified_sessions.empty());
1,138✔
3196
    REALM_ASSERT(m_identified_sessions.empty());
1,138✔
3197
    REALM_ASSERT(m_file_ident_request == 0);
1,138✔
3198
}
1,138✔
3199

3200

3201
void ServerFile::initialize()
3202
{
1,138✔
3203
    const ServerHistory& history = access().history; // Throws
1,138✔
3204
    file_ident_type partial_file_ident = 0;
1,138✔
3205
    version_type partial_progress_reference_version = 0;
1,138✔
3206
    bool has_upstream_sync_status;
1,138✔
3207
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,138✔
3208
                       partial_progress_reference_version); // Throws
1,138✔
3209
    REALM_ASSERT(!has_upstream_sync_status);
1,138✔
3210
    REALM_ASSERT(partial_file_ident == 0);
1,138✔
3211
}
1,138✔
3212

3213

3214
void ServerFile::activate() {}
1,138✔
3215

3216

3217
// This function must be called only after a completed invocation of
3218
// initialize(). Both functinos must only ever be called by the network event
3219
// loop thread.
3220
void ServerFile::register_client_access(file_ident_type) {}
90,476✔
3221

3222

3223
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file,
3224
                                    ClientType client_type) -> file_ident_request_type
3225
{
1,404✔
3226
    auto request = ++m_last_file_ident_request;
1,404✔
3227
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
1,404✔
3228

3229
    on_work_added(); // Throws
1,404✔
3230
    return request;
1,404✔
3231
}
1,404✔
3232

3233

3234
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3235
{
52✔
3236
    auto i = m_file_ident_requests.find(request);
52✔
3237
    REALM_ASSERT(i != m_file_ident_requests.end());
52✔
3238
    FileIdentRequestInfo& info = i->second;
52✔
3239
    REALM_ASSERT(info.receiver);
52✔
3240
    info.receiver = nullptr;
52✔
3241
}
52✔
3242

3243

3244
void ServerFile::add_unidentified_session(Session* sess)
3245
{
5,098✔
3246
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
5,098✔
3247
    m_unidentified_sessions.insert(sess); // Throws
5,098✔
3248
}
5,098✔
3249

3250

3251
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3252
{
4,248✔
3253
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
4,248✔
3254
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
4,248✔
3255

3256
    m_identified_sessions[client_file_ident] = sess; // Throws
4,248✔
3257
    m_unidentified_sessions.erase(sess);
4,248✔
3258
}
4,248✔
3259

3260

3261
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3262
{
850✔
3263
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
850✔
3264
    m_unidentified_sessions.erase(sess);
850✔
3265
}
850✔
3266

3267

3268
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3269
{
4,248✔
3270
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
4,248✔
3271
    m_identified_sessions.erase(client_file_ident);
4,248✔
3272
}
4,248✔
3273

3274

3275
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3276
{
4,244✔
3277
    auto i = m_identified_sessions.find(client_file_ident);
4,244✔
3278
    if (i == m_identified_sessions.end())
4,244✔
3279
        return nullptr;
4,244✔
UNCOV
3280
    return i->second;
×
3281
}
4,244✔
3282

3283
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3284
{
44,464✔
3285
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
44,464✔
3286
}
44,464✔
3287

3288

3289
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3290
                                                version_type locked_server_version, const UploadChangeset* changesets,
3291
                                                std::size_t num_changesets)
3292
{
44,224✔
3293
    register_client_access(client_file_ident); // Throws
44,224✔
3294

3295
    bool dirty = false;
44,224✔
3296

3297
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
44,224✔
3298
    std::size_t num_bytes = 0;
44,224✔
3299
    for (std::size_t i = 0; i < num_changesets; ++i) {
78,678✔
3300
        const UploadChangeset& uc = changesets[i];
34,454✔
3301
        auto& changesets = list.changesets;
34,454✔
3302
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,454✔
3303
                                uc.changeset); // Throws
34,454✔
3304
        num_bytes += uc.changeset.size();
34,454✔
3305
        dirty = true;
34,454✔
3306
    }
34,454✔
3307

3308
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
44,224✔
3309
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
44,224✔
3310
    if (upload_progress.client_version > list.upload_progress.client_version) {
44,224✔
3311
        list.upload_progress = upload_progress;
44,220✔
3312
        dirty = true;
44,220✔
3313
    }
44,220✔
3314

3315
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
44,224✔
3316
    if (locked_server_version > list.locked_server_version) {
44,224✔
3317
        list.locked_server_version = locked_server_version;
38,920✔
3318
        dirty = true;
38,920✔
3319
    }
38,920✔
3320

3321
    if (REALM_LIKELY(dirty)) {
44,224✔
3322
        if (num_changesets > 0) {
44,222✔
3323
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
24,134✔
3324
        }
24,134✔
3325
        else {
20,088✔
3326
            on_work_added(); // Throws
20,088✔
3327
        }
20,088✔
3328
    }
44,222✔
3329
}
44,224✔
3330

3331

3332
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3333
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3334
                                                    ClientType client_type, UploadCursor& upload_progress,
3335
                                                    version_type& locked_server_version, Logger& logger)
3336
{
4,278✔
3337
    // The Realm file may contain a later snapshot than the one reflected by
3338
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
3339
    if (server_version.version > m_version_info.sync_version.version)
4,278✔
3340
        return BootstrapError::bad_server_version;
20✔
3341

3342
    const ServerHistory& hist = access().history; // Throws
4,258✔
3343
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
4,258✔
3344
                                                         client_type, upload_progress, locked_server_version,
4,258✔
3345
                                                         logger); // Throws
4,258✔
3346

3347
    // FIXME: Rather than taking previously buffered changesets from the same
3348
    // client file into account when determining the upload progress, and then
3349
    // allowing for an error during the integration of those changesets to be
3350
    // reported to, and terminate the new session, consider to instead postpone
3351
    // the bootstrapping of the new session until all previously buffered
3352
    // changesets from same client file have been fully processed.
3353

3354
    if (error == BootstrapError::no_error) {
4,258✔
3355
        register_client_access(client_file_ident.ident); // Throws
4,248✔
3356

3357
        // If upload, or releaseing of server versions progressed further during
3358
        // previous sessions than the persisted points, take that into account
3359
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
4,248✔
3360
        if (i != m_work.changesets_from_downstream.end()) {
4,248✔
3361
            const IntegratableChangesetList& list = i->second;
1,286✔
3362
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,286✔
3363
            upload_progress = list.upload_progress;
1,286✔
3364
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,286✔
3365
            locked_server_version = list.locked_server_version;
1,286✔
3366
        }
1,286✔
3367
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
4,248✔
3368
        if (j != m_changesets_from_downstream.end()) {
4,248✔
3369
            const IntegratableChangesetList& list = j->second;
76✔
3370
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
76✔
3371
            upload_progress = list.upload_progress;
76✔
3372
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
76✔
3373
            locked_server_version = list.locked_server_version;
76✔
3374
        }
76✔
3375
    }
4,248✔
3376

3377
    return error;
4,258✔
3378
}
4,278✔
3379

3380
// NOTE: This function is executed by the worker thread
3381
void ServerFile::worker_process_work_unit(WorkerState& state)
3382
{
37,154✔
3383
    SteadyTimePoint start_time = steady_clock_now();
37,154✔
3384
    milliseconds_type parallel_time = 0;
37,154✔
3385

3386
    Work& work = m_work;
37,154✔
3387
    wlogger.debug("Work unit execution started"); // Throws
37,154✔
3388

3389
    if (work.has_primary_work) {
37,154✔
3390
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
37,154✔
3391
            worker_allocate_file_identifiers(); // Throws
1,348✔
3392

3393
        if (!m_work.changesets_from_downstream.empty())
37,154✔
3394
            worker_integrate_changes_from_downstream(state); // Throws
35,810✔
3395
    }
37,154✔
3396

3397
    wlogger.debug("Work unit execution completed"); // Throws
37,154✔
3398

3399
    milliseconds_type time = steady_duration(start_time);
37,154✔
3400
    milliseconds_type seq_time = time - parallel_time;
37,154✔
3401
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
37,154✔
3402
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
37,154✔
3403

3404
    // Pass control back to the network event loop thread
3405
    network::Service& service = m_server.get_service();
37,154✔
3406
    service.post([this](Status) {
37,154✔
3407
        // FIXME: The safety of capturing `this` here, relies on the fact
3408
        // that ServerFile objects currently are not destroyed until the
3409
        // server object is destroyed.
3410
        group_postprocess_stage_1(); // Throws
36,860✔
3411
        // Suicide may have happened at this point
3412
    }); // Throws
36,860✔
3413
}
37,154✔
3414

3415

3416
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3417
{
24,134✔
3418
    m_num_changesets_from_downstream += num_changesets;
24,134✔
3419

3420
    if (num_bytes > 0) {
24,134✔
3421
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
24,134✔
3422
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
24,134✔
3423
    }
24,134✔
3424

3425
    on_work_added(); // Throws
24,134✔
3426
}
24,134✔
3427

3428

3429
void ServerFile::on_work_added()
3430
{
45,628✔
3431
    if (m_has_blocked_work)
45,628✔
3432
        return;
8,408✔
3433
    m_has_blocked_work = true;
37,220✔
3434
    // Reference file
3435
    if (m_has_work_in_progress)
37,220✔
3436
        return;
12,650✔
3437
    group_unblock_work(); // Throws
24,570✔
3438
}
24,570✔
3439

3440

3441
void ServerFile::group_unblock_work()
3442
{
37,180✔
3443
    REALM_ASSERT(!m_has_work_in_progress);
37,180✔
3444
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
37,180✔
3445
        unblock_work(); // Throws
37,180✔
3446
        const Work& work = m_work;
37,180✔
3447
        if (REALM_LIKELY(work.has_primary_work)) {
37,180✔
3448
            logger.trace("Work unit unblocked"); // Throws
37,176✔
3449
            m_has_work_in_progress = true;
37,176✔
3450
            Worker& worker = m_server.get_worker();
37,176✔
3451
            worker.enqueue(this); // Throws
37,176✔
3452
        }
37,176✔
3453
    }
37,180✔
3454
}
37,180✔
3455

3456

3457
void ServerFile::unblock_work()
3458
{
37,180✔
3459
    REALM_ASSERT(m_has_blocked_work);
37,180✔
3460

3461
    m_work.reset();
37,180✔
3462

3463
    // Discard requests for file identifiers whose receiver is no longer
3464
    // waiting.
3465
    {
37,180✔
3466
        auto i = m_file_ident_requests.begin();
37,180✔
3467
        auto end = m_file_ident_requests.end();
37,180✔
3468
        while (i != end) {
38,584✔
3469
            auto j = i++;
1,404✔
3470
            const FileIdentRequestInfo& info = j->second;
1,404✔
3471
            if (!info.receiver)
1,404✔
3472
                m_file_ident_requests.erase(j);
6✔
3473
        }
1,404✔
3474
    }
37,180✔
3475
    std::size_t n = m_file_ident_requests.size();
37,180✔
3476
    if (n > 0) {
37,180✔
3477
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,360✔
3478
        std::size_t i = 0;
1,360✔
3479
        for (const auto& pair : m_file_ident_requests) {
1,398✔
3480
            const FileIdentRequestInfo& info = pair.second;
1,398✔
3481
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,398✔
3482
            slot.proxy_file = info.proxy_file;
1,398✔
3483
            slot.client_type = info.client_type;
1,398✔
3484
            ++i;
1,398✔
3485
        }
1,398✔
3486
        m_work.has_primary_work = true;
1,360✔
3487
    }
1,360✔
3488

3489
    // FIXME: `ServerFile::m_changesets_from_downstream` and
3490
    // `Work::changesets_from_downstream` should be renamed to something else,
3491
    // as it may contain kinds of data other than changesets.
3492

3493
    using std::swap;
37,180✔
3494
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
37,180✔
3495
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
37,180✔
3496
    bool has_changesets = !m_work.changesets_from_downstream.empty();
37,180✔
3497
    if (has_changesets) {
37,180✔
3498
        m_work.has_primary_work = true;
35,818✔
3499
    }
35,818✔
3500

3501
    // Keep track of the size of pending changesets
3502
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
37,180✔
3503
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
37,180✔
3504
    m_blocked_changesets_from_downstream_byte_size = 0;
37,180✔
3505

3506
    m_num_changesets_from_downstream = 0;
37,180✔
3507
    m_has_blocked_work = false;
37,180✔
3508
}
37,180✔
3509

3510

3511
void ServerFile::resume_download() noexcept
3512
{
23,064✔
3513
    for (const auto& entry : m_identified_sessions) {
37,892✔
3514
        Session& sess = *entry.second;
37,892✔
3515
        sess.ensure_enlisted_to_send();
37,892✔
3516
    }
37,892✔
3517
}
23,064✔
3518

3519

3520
void ServerFile::recognize_external_change()
3521
{
4,800✔
3522
    VersionInfo prev_version_info = m_version_info;
4,800✔
3523
    const ServerHistory& history = access().history;       // Throws
4,800✔
3524
    bool has_upstream_status;                              // Dummy
4,800✔
3525
    sync::file_ident_type partial_file_ident;              // Dummy
4,800✔
3526
    sync::version_type partial_progress_reference_version; // Dummy
4,800✔
3527
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,800✔
3528
                       partial_progress_reference_version); // Throws
4,800✔
3529

3530
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,800✔
3531
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,800✔
3532
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,800✔
3533
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,800✔
3534
        resume_download();
4,800✔
3535
    }
4,800✔
3536
}
4,800✔
3537

3538

3539
// NOTE: This function is executed by the worker thread
3540
void ServerFile::worker_allocate_file_identifiers()
3541
{
1,348✔
3542
    Work& work = m_work;
1,348✔
3543
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,348✔
3544
    ServerHistory& hist = worker_access().history;                                      // Throws
1,348✔
3545
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,348✔
3546
    m_work.produced_new_realm_version = true;
1,348✔
3547
}
1,348✔
3548

3549

3550
// Returns true when, and only when this function produces a new sync version
3551
// (adds a new entry to the sync history).
3552
//
3553
// NOTE: This function is executed by the worker thread
3554
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3555
{
35,810✔
3556
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
35,810✔
3557

3558
    std::unique_ptr<ServerHistory> hist_ptr;
35,810✔
3559
    DBRef sg_ptr;
35,810✔
3560
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
35,810✔
3561
    bool backup_whole_realm = false;
35,810✔
3562
    bool produced_new_realm_version = hist.integrate_client_changesets(
35,810✔
3563
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
35,810✔
3564
        wlogger); // Throws
35,810✔
3565
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
35,810✔
3566
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
35,810✔
3567
    if (produced_new_realm_version) {
35,810✔
3568
        m_work.produced_new_realm_version = true;
35,786✔
3569
        if (produced_new_sync_version) {
35,786✔
3570
            m_work.produced_new_sync_version = true;
18,282✔
3571
        }
18,282✔
3572
    }
35,786✔
3573
    return produced_new_sync_version;
35,810✔
3574
}
35,810✔
3575

3576
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3577
                                                   DBRef& sg_ptr)
3578
{
35,810✔
3579
    if (state.use_file_cache)
35,810✔
3580
        return worker_access().history; // Throws
35,810✔
UNCOV
3581
    const std::string& path = m_worker_file.realm_path;
×
UNCOV
3582
    hist_ptr = m_server.make_history_for_path();                   // Throws
×
UNCOV
3583
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
×
UNCOV
3584
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
×
UNCOV
3585
    sg_ptr->claim_sync_agent();                                    // Throws
×
UNCOV
3586
    return *hist_ptr;                                              // Throws
×
3587
}
35,810✔
3588

3589

3590
// When worker thread finishes work unit.
3591
void ServerFile::group_postprocess_stage_1()
3592
{
36,860✔
3593
    REALM_ASSERT(m_has_work_in_progress);
36,860✔
3594

3595
    group_finalize_work_stage_1(); // Throws
36,860✔
3596
    group_finalize_work_stage_2(); // Throws
36,860✔
3597
    group_postprocess_stage_2();   // Throws
36,860✔
3598
}
36,860✔
3599

3600

3601
void ServerFile::group_postprocess_stage_2()
3602
{
36,860✔
3603
    REALM_ASSERT(m_has_work_in_progress);
36,860✔
3604
    group_postprocess_stage_3(); // Throws
36,860✔
3605
    // Suicide may have happened at this point
3606
}
36,860✔
3607

3608

3609
// When all files, including the reference file, have been backed up.
3610
void ServerFile::group_postprocess_stage_3()
3611
{
36,860✔
3612
    REALM_ASSERT(m_has_work_in_progress);
36,860✔
3613
    m_has_work_in_progress = false;
36,860✔
3614

3615
    logger.trace("Work unit postprocessing complete"); // Throws
36,860✔
3616
    if (m_has_blocked_work)
36,860✔
3617
        group_unblock_work(); // Throws
12,610✔
3618
}
36,860✔
3619

3620

3621
void ServerFile::finalize_work_stage_1()
3622
{
36,860✔
3623
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
36,860✔
3624
        // Report the byte size of completed downstream changesets.
3625
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
18,284✔
3626
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
18,284✔
3627
        m_unblocked_changesets_from_downstream_byte_size = 0;
18,284✔
3628
    }
18,284✔
3629

3630
    // Deal with errors (bad changesets) pertaining to downstream clients
3631
    std::size_t num_changesets_removed = 0;
36,860✔
3632
    std::size_t num_bytes_removed = 0;
36,860✔
3633
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
36,860✔
3634
        file_ident_type client_file_ident = entry.first;
18✔
3635
        ExtendedIntegrationError error = entry.second;
18✔
3636
        ProtocolError error_2 = ProtocolError::other_session_error;
18✔
3637
        switch (error) {
18✔
3638
            case ExtendedIntegrationError::client_file_expired:
✔
3639
                logger.debug("Changeset integration failed: Client file entry "
×
3640
                             "expired during session"); // Throws
×
3641
                error_2 = ProtocolError::client_file_expired;
×
3642
                break;
×
3643
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3644
                error_2 = ProtocolError::bad_origin_file_ident;
×
3645
                break;
×
3646
            case ExtendedIntegrationError::bad_changeset:
18✔
3647
                error_2 = ProtocolError::bad_changeset;
18✔
3648
                break;
18✔
3649
        }
18✔
3650
        auto i = m_identified_sessions.find(client_file_ident);
18✔
3651
        if (i != m_identified_sessions.end()) {
18✔
3652
            Session& sess = *i->second;
18✔
3653
            SyncConnection& conn = sess.get_connection();
18✔
3654
            conn.protocol_error(error_2, &sess); // Throws
18✔
3655
        }
18✔
3656
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
18✔
3657
        std::size_t num_changesets = list.changesets.size();
18✔
3658
        std::size_t num_bytes = 0;
18✔
3659
        for (const IntegratableChangeset& ic : list.changesets)
18✔
3660
            num_bytes += ic.changeset.size();
×
3661
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
18✔
3662
                    client_file_ident); // Throws
18✔
3663
        num_changesets_removed += num_changesets;
18✔
3664
        num_bytes_removed += num_bytes;
18✔
3665
        m_changesets_from_downstream.erase(client_file_ident);
18✔
3666
    }
18✔
3667

3668
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
36,860✔
3669
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
36,860✔
3670

3671
    if (num_changesets_removed == 0)
36,860✔
3672
        return;
36,860✔
3673

UNCOV
3674
    m_num_changesets_from_downstream -= num_changesets_removed;
×
3675

3676
    // The byte size of the blocked changesets must be decremented.
UNCOV
3677
    if (num_bytes_removed > 0) {
×
3678
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3679
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3680
    }
×
UNCOV
3681
}
×
3682

3683

3684
void ServerFile::finalize_work_stage_2()
3685
{
36,858✔
3686
    // Expose new snapshot to remote peers
3687
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
36,858✔
3688
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
36,858✔
3689
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
36,840✔
3690
        m_version_info = m_work.version_info;
36,840✔
3691
    }
36,840✔
3692

3693
    bool resume_download_and_upload = m_work.produced_new_sync_version;
36,858✔
3694

3695
    // Deliver allocated file identifiers to requesters
3696
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
36,858✔
3697
    auto begin = m_file_ident_requests.begin();
36,858✔
3698
    auto i = begin;
36,858✔
3699
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
36,858✔
3700
        FileIdentRequestInfo& info = i->second;
1,374✔
3701
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,374✔
3702
        REALM_ASSERT(info.client_type == slot.client_type);
1,374✔
3703
        if (FileIdentReceiver* receiver = info.receiver) {
1,374✔
3704
            info.receiver = nullptr;
1,350✔
3705
            receiver->receive_file_ident(slot.file_ident); // Throws
1,350✔
3706
        }
1,350✔
3707
        ++i;
1,374✔
3708
    }
1,374✔
3709
    m_file_ident_requests.erase(begin, i);
36,858✔
3710

3711
    // Resume download to downstream clients
3712
    if (resume_download_and_upload) {
36,858✔
3713
        resume_download();
18,264✔
3714
    }
18,264✔
3715
}
36,858✔
3716

3717
// ============================ Worker implementation ============================
3718

3719
Worker::Worker(ServerImpl& server)
3720
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, "Worker: ", server.logger_ptr)}
548✔
3721
    // Throws
3722
    , logger(*logger_ptr)
548✔
3723
    , m_server{server}
548✔
3724
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
548✔
3725
{
1,200✔
3726
    util::seed_prng_nondeterministically(m_random); // Throws
1,200✔
3727
}
1,200✔
3728

3729

3730
void Worker::enqueue(ServerFile* file)
3731
{
37,180✔
3732
    util::LockGuard lock{m_mutex};
37,180✔
3733
    m_queue.push_back(file); // Throws
37,180✔
3734
    m_cond.notify_all();
37,180✔
3735
}
37,180✔
3736

3737

3738
std::mt19937_64& Worker::server_history_get_random() noexcept
3739
{
2,418✔
3740
    return m_random;
2,418✔
3741
}
2,418✔
3742

3743

3744
void Worker::run()
3745
{
1,144✔
3746
    for (;;) {
38,296✔
3747
        ServerFile* file = nullptr;
38,296✔
3748
        {
38,296✔
3749
            util::LockGuard lock{m_mutex};
38,296✔
3750
            for (;;) {
76,060✔
3751
                if (REALM_UNLIKELY(m_stop))
76,060✔
3752
                    return;
1,144✔
3753
                if (!m_queue.empty()) {
74,916✔
3754
                    file = m_queue.front();
37,154✔
3755
                    m_queue.pop_front();
37,154✔
3756
                    break;
37,154✔
3757
                }
37,154✔
3758
                m_cond.wait(lock);
37,762✔
3759
            }
37,762✔
3760
        }
38,296✔
3761
        file->worker_process_work_unit(m_state); // Throws
37,152✔
3762
    }
37,152✔
3763
}
1,144✔
3764

3765

3766
void Worker::stop() noexcept
3767
{
1,144✔
3768
    util::LockGuard lock{m_mutex};
1,144✔
3769
    m_stop = true;
1,144✔
3770
    m_cond.notify_all();
1,144✔
3771
}
1,144✔
3772

3773

3774
// ============================ ServerImpl implementation ============================
3775

3776
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3777
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::server, std::move(config.logger))}
548✔
3778
    , logger{*logger_ptr}
548✔
3779
    , m_config{std::move(config)}
548✔
3780
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
548✔
3781
    , m_root_dir{root_dir} // Throws
548✔
3782
    , m_access_control{std::move(pkey)}
548✔
3783
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
548✔
3784
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
548✔
3785
    , m_worker{*this}                                                                    // Throws
548✔
3786
    , m_acceptor{get_service()}
548✔
3787
    , m_server_protocol{}       // Throws
548✔
3788
    , m_compress_memory_arena{} // Throws
548✔
3789
{
1,200✔
3790
    if (m_config.ssl) {
1,200✔
3791
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3792
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3793
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3794
    }
24✔
3795
}
1,200✔
3796

3797

3798
ServerImpl::~ServerImpl() noexcept
3799
{
1,200✔
3800
    bool server_destroyed_while_still_running = m_running;
1,200✔
3801
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
1,200✔
3802
}
1,200✔
3803

3804

3805
void ServerImpl::start()
3806
{
1,200✔
3807
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
1,200✔
3808
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
1,200✔
3809
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
1,200✔
3810
                m_protocol_version_range.first,
1,200✔
3811
                m_protocol_version_range.second); // Throws
1,200✔
3812
    logger.info("Platform: %1", util::get_platform_info());
1,200✔
3813
    bool is_debug_build = false;
1,200✔
3814
#if REALM_DEBUG
1,200✔
3815
    is_debug_build = true;
1,200✔
3816
#endif
1,200✔
3817
    {
1,200✔
3818
        const char* lead_text = "Build mode";
1,200✔
3819
        if (is_debug_build) {
1,200✔
3820
            logger.info("%1: Debug", lead_text); // Throws
1,200✔
3821
        }
1,200✔
3822
        else {
×
3823
            logger.info("%1: Release", lead_text); // Throws
×
3824
        }
×
3825
    }
1,200✔
3826
    if (is_debug_build) {
1,200✔
3827
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
1,200✔
3828
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
1,200✔
3829
    }
1,200✔
3830
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
1,200✔
3831
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
1,200✔
3832
    {
1,200✔
3833
        const char* lead_text = "Encryption";
1,200✔
3834
        if (m_config.encryption_key) {
1,200✔
3835
            logger.info("%1: Yes", lead_text); // Throws
4✔
3836
        }
4✔
3837
        else {
1,196✔
3838
            logger.info("%1: No", lead_text); // Throws
1,196✔
3839
        }
1,196✔
3840
    }
1,200✔
3841
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
1,200✔
3842
    {
1,200✔
3843
        const char* lead_text = "Disable sync to disk";
1,200✔
3844
        if (m_config.disable_sync_to_disk) {
1,200✔
3845
            logger.info("%1: All files", lead_text); // Throws
496✔
3846
        }
496✔
3847
        else {
704✔
3848
            logger.info("%1: No", lead_text); // Throws
704✔
3849
        }
704✔
3850
    }
1,200✔
3851
    if (m_config.disable_sync_to_disk) {
1,200✔
3852
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
496✔
3853
                    "never do this in production!"); // Throws
496✔
3854
    }
496✔
3855
    logger.info("Download bootstrap caching: %1",
1,200✔
3856
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
1,200✔
3857
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
1,200✔
3858
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
1,200✔
3859
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
1,200✔
3860
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
1,200✔
3861
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
1,200✔
3862
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
1,200✔
3863
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
1,200✔
3864
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
1,200✔
3865

3866
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
1,200✔
3867

3868
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
1,200✔
3869

3870
    listen(); // Throws
1,200✔
3871
}
1,200✔
3872

3873

3874
void ServerImpl::run()
3875
{
1,144✔
3876
    auto ta = util::make_temp_assign(m_running, true);
1,144✔
3877

3878
    {
1,144✔
3879
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
1,144✔
3880
        std::string name;
1,144✔
3881
        if (util::Thread::get_name(name)) {
1,144✔
3882
            name += "-worker";
622✔
3883
            worker_thread.start_with_signals_blocked(name); // Throws
622✔
3884
        }
622✔
3885
        else {
522✔
3886
            worker_thread.start_with_signals_blocked(); // Throws
522✔
3887
        }
522✔
3888

3889
        m_service.run(); // Throws
1,144✔
3890

3891
        worker_thread.stop_and_rethrow(); // Throws
1,144✔
3892
    }
1,144✔
3893

3894
    logger.info("Realm sync server stopped");
1,144✔
3895
}
1,144✔
3896

3897

3898
void ServerImpl::stop() noexcept
3899
{
2,054✔
3900
    util::LockGuard lock{m_mutex};
2,054✔
3901
    if (m_stopped)
2,054✔
3902
        return;
854✔
3903
    m_stopped = true;
1,200✔
3904
    m_wait_or_service_stopped_cond.notify_all();
1,200✔
3905
    m_service.stop();
1,200✔
3906
}
1,200✔
3907

3908

3909
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3910
{
24,134✔
3911
    m_pending_changesets_from_downstream_byte_size += byte_size;
24,134✔
3912
    logger.debug("Byte size for pending downstream changesets incremented by "
24,134✔
3913
                 "%1 to reach a total of %2",
24,134✔
3914
                 byte_size,
24,134✔
3915
                 m_pending_changesets_from_downstream_byte_size); // Throws
24,134✔
3916
}
24,134✔
3917

3918

3919
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3920
{
18,282✔
3921
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
18,282✔
3922
    m_pending_changesets_from_downstream_byte_size -= byte_size;
18,282✔
3923
    logger.debug("Byte size for pending downstream changesets decremented by "
18,282✔
3924
                 "%1 to reach a total of %2",
18,282✔
3925
                 byte_size,
18,282✔
3926
                 m_pending_changesets_from_downstream_byte_size); // Throws
18,282✔
3927
}
18,282✔
3928

3929

3930
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3931
{
1,138✔
3932
    return get_random();
1,138✔
3933
}
1,138✔
3934

3935

3936
void ServerImpl::listen()
3937
{
1,200✔
3938
    network::Resolver resolver{get_service()};
1,200✔
3939
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
1,200✔
3940
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
1,200✔
3941
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
1,200✔
3942

3943
    auto i = endpoints.begin();
1,200✔
3944
    auto end = endpoints.end();
1,200✔
3945
    for (;;) {
1,200✔
3946
        std::error_code ec;
1,200✔
3947
        m_acceptor.open(i->protocol(), ec);
1,200✔
3948
        if (!ec) {
1,200✔
3949
            using SocketBase = network::SocketBase;
1,200✔
3950
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
1,200✔
3951
            if (!ec) {
1,200✔
3952
                m_acceptor.bind(*i, ec);
1,200✔
3953
                if (!ec)
1,200✔
3954
                    break;
1,200✔
3955
            }
1,200✔
3956
            m_acceptor.close();
×
3957
        }
×
3958
        if (i + 1 == end) {
×
3959
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
3960
                // FIXME: We don't have the error code for previous attempts, so
3961
                // can't print a nice message.
3962
                logger.error("Failed to bind to %1:%2", i2->address(),
×
3963
                             i2->port()); // Throws
×
3964
            }
×
3965
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
3966
                         ec.message()); // Throws
×
3967
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
3968
        }
×
3969
    }
×
3970

3971
    m_acceptor.listen(m_config.listen_backlog);
1,200✔
3972

3973
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
1,200✔
3974
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
1,200✔
3975
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
1,200✔
3976
                m_config.listen_backlog, ssl_mode); // Throws
1,200✔
3977

3978
    initiate_accept();
1,200✔
3979
}
1,200✔
3980

3981

3982
void ServerImpl::initiate_accept()
3983
{
3,302✔
3984
    auto handler = [this](std::error_code ec) {
3,302✔
3985
        if (ec != util::error::operation_aborted)
2,102✔
3986
            handle_accept(ec);
2,102✔
3987
    };
2,102✔
3988
    bool is_ssl = bool(m_ssl_context);
3,302✔
3989
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
3,302✔
3990
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
3,302✔
3991
}
3,302✔
3992

3993

3994
void ServerImpl::handle_accept(std::error_code ec)
3995
{
2,102✔
3996
    if (ec) {
2,102✔
3997
        if (ec != util::error::connection_aborted) {
×
3998
            REALM_ASSERT(ec != util::error::operation_aborted);
×
3999

4000
            // We close the reserved files to get a few extra file descriptors.
4001
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4002
                m_reserved_files[i].reset();
×
4003
            }
×
4004

4005
            // FIXME: There are probably errors that need to be treated
4006
            // specially, and not cause the server to "crash".
4007

4008
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4009
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4010
                             "consider increasing the limit in your system config"); // Throws
×
4011
                throw OutOfFilesError(ec);
×
4012
            }
×
4013
            else {
×
4014
                throw std::system_error(ec);
×
4015
            }
×
4016
        }
×
4017
        logger.debug("Skipping aborted connection"); // Throws
×
4018
    }
×
4019
    else {
2,102✔
4020
        HTTPConnection& conn = *m_next_http_conn;
2,102✔
4021
        if (m_config.tcp_no_delay)
2,102✔
4022
            conn.get_socket().set_option(network::SocketBase::no_delay(true));  // Throws
1,754✔
4023
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn)); // Throws
2,102✔
4024
        Formatter& formatter = m_misc_buffers.formatter;
2,102✔
4025
        formatter.reset();
2,102✔
4026
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
2,102✔
4027
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
2,102✔
4028
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
2,102✔
4029
    }
2,102✔
4030
    initiate_accept(); // Throws
2,102✔
4031
}
2,102✔
4032

4033

4034
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4035
{
2,102✔
4036
    m_http_connections.erase(conn_id);
2,102✔
4037
}
2,102✔
4038

4039

4040
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4041
{
2,060✔
4042
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
2,060✔
4043
}
2,060✔
4044

4045

4046
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4047
{
1,212✔
4048
    m_sync_connections.erase(connection_id);
1,212✔
4049
}
1,212✔
4050

4051

4052
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4053
{
4✔
4054
    get_service().post([this, timeout](Status) {
4✔
4055
        m_config.connection_reaper_timeout = timeout;
4✔
4056
    });
4✔
4057
}
4✔
4058

4059

4060
void ServerImpl::close_connections()
4061
{
16✔
4062
    get_service().post([this](Status) {
16✔
4063
        do_close_connections(); // Throws
16✔
4064
    });
16✔
4065
}
16✔
4066

4067

4068
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4069
{
72✔
4070
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4071
}
72✔
4072

4073

4074
void ServerImpl::recognize_external_change(const std::string& virt_path)
4075
{
4,800✔
4076
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4077
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4078
        do_recognize_external_change(virt_path); // Throws
4,800✔
4079
    });                                          // Throws
4,800✔
4080
}
4,800✔
4081

4082

4083
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4084
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4085
{
×
4086
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4087
                "timeout = %1",
×
4088
                timeout); // Throws
×
4089

4090
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4091
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4092
                                                    timeout); // Throws
×
4093
    });
×
4094
}
×
4095

4096

4097
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4098
{
1,334✔
4099
    m_connection_reaper_timer.emplace(get_service());
1,334✔
4100
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
1,334✔
4101
        if (status != ErrorCodes::OperationAborted) {
134✔
4102
            reap_connections();                        // Throws
134✔
4103
            initiate_connection_reaper_timer(timeout); // Throws
134✔
4104
        }
134✔
4105
    }); // Throws
134✔
4106
}
1,334✔
4107

4108

4109
void ServerImpl::reap_connections()
4110
{
134✔
4111
    logger.debug("Discarding dead connections"); // Throws
134✔
4112
    SteadyTimePoint now = steady_clock_now();
134✔
4113
    {
134✔
4114
        auto end = m_http_connections.end();
134✔
4115
        auto i = m_http_connections.begin();
134✔
4116
        while (i != end) {
136✔
4117
            HTTPConnection& conn = *i->second;
2✔
4118
            ++i;
2✔
4119
            // Suicide
4120
            conn.terminate_if_dead(now); // Throws
2✔
4121
        }
2✔
4122
    }
134✔
4123
    {
134✔
4124
        auto end = m_sync_connections.end();
134✔
4125
        auto i = m_sync_connections.begin();
134✔
4126
        while (i != end) {
262✔
4127
            SyncConnection& conn = *i->second;
128✔
4128
            ++i;
128✔
4129
            // Suicide
4130
            conn.terminate_if_dead(now); // Throws
128✔
4131
        }
128✔
4132
    }
134✔
4133
}
134✔
4134

4135

4136
void ServerImpl::do_close_connections()
4137
{
16✔
4138
    for (auto& entry : m_sync_connections) {
16✔
4139
        SyncConnection& conn = *entry.second;
16✔
4140
        conn.initiate_soft_close(); // Throws
16✔
4141
    }
16✔
4142
}
16✔
4143

4144

4145
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4146
{
4,800✔
4147
    auto i = m_files.find(virt_path);
4,800✔
4148
    if (i == m_files.end())
4,800✔
4149
        return;
×
4150
    ServerFile& file = *i->second;
4,800✔
4151
    file.recognize_external_change();
4,800✔
4152
}
4,800✔
4153

4154

4155
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4156
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4157
{
×
4158
    static_cast<void>(timeout);
×
4159
    if (m_sync_stopped)
×
4160
        return;
×
4161
    do_close_connections(); // Throws
×
4162
    m_sync_stopped = true;
×
4163
    bool completion_reached = false;
×
4164
    completion_handler(completion_reached); // Throws
×
4165
}
×
4166

4167

4168
// ============================ SyncConnection implementation ============================
4169

4170
SyncConnection::~SyncConnection() noexcept
4171
{
2,060✔
4172
    m_sessions_enlisted_to_send.clear();
2,060✔
4173
    m_sessions.clear();
2,060✔
4174
}
2,060✔
4175

4176

4177
void SyncConnection::initiate()
4178
{
2,060✔
4179
    m_last_activity_at = steady_clock_now();
2,060✔
4180
    logger.debug("Sync Connection initiated");
2,060✔
4181
    m_websocket.initiate_server_websocket_after_handshake();
2,060✔
4182
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
2,060✔
4183
                     m_appservices_request_id);
2,060✔
4184
}
2,060✔
4185

4186

4187
template <class... Params>
4188
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4189
{
1,212✔
4190
    terminate_sessions();                              // Throws
1,212✔
4191
    logger.log(log_level, log_message, log_params...); // Throws
1,212✔
4192
    m_websocket.stop();
1,212✔
4193
    m_ssl_stream.reset();
1,212✔
4194
    m_socket.reset();
1,212✔
4195
    // Suicide
4196
    m_server.remove_sync_connection(m_id);
1,212✔
4197
}
1,212✔
4198

4199

4200
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4201
{
128✔
4202
    milliseconds_type time = steady_duration(m_last_activity_at, now);
128✔
4203
    const Server::Config& config = m_server.get_config();
128✔
4204
    if (m_is_closing) {
128✔
4205
        if (time >= config.soft_close_timeout) {
×
4206
            // Suicide
4207
            terminate(Logger::Level::detail,
×
4208
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4209
        }
×
4210
    }
×
4211
    else {
128✔
4212
        if (time >= config.connection_reaper_timeout) {
128✔
4213
            // Suicide
4214
            terminate(Logger::Level::detail,
4✔
4215
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4216
        }
4✔
4217
    }
128✔
4218
}
128✔
4219

4220

4221
void SyncConnection::enlist_to_send(Session* sess) noexcept
4222
{
110,748✔
4223
    REALM_ASSERT(!m_is_closing);
110,748✔
4224
    REALM_ASSERT(!sess->is_enlisted_to_send());
110,748✔
4225
    m_sessions_enlisted_to_send.push_back(sess);
110,748✔
4226
    m_send_trigger.trigger();
110,748✔
4227
}
110,748✔
4228

4229

4230
void SyncConnection::handle_protocol_error(Status status)
4231
{
×
4232
    logger.error("%1", status);
×
4233
    switch (status.code()) {
×
4234
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4235
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4236
            break;
×
4237
        case ErrorCodes::LimitExceeded:
×
4238
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4239
            break;
×
4240
        default:
×
4241
            protocol_error(ProtocolError::other_error);
×
4242
            break;
×
4243
    }
×
4244
}
×
4245

4246
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4247
                                          std::string signed_user_token, bool need_client_file_ident,
4248
                                          bool is_subserver)
4249
{
5,126✔
4250
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
5,126✔
4251
    bool was_inserted = p.second;
5,126✔
4252
    if (REALM_UNLIKELY(!was_inserted)) {
5,126✔
4253
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4254
                     session_ident);                           // Throws
×
4255
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4256
        return;
×
4257
    }
×
4258
    try {
5,126✔
4259
        p.first->second.reset(new Session(*this, session_ident)); // Throws
5,126✔
4260
    }
5,126✔
4261
    catch (...) {
5,126✔
4262
        m_sessions.erase(p.first);
×
4263
        throw;
×
4264
    }
×
4265

4266
    Session& sess = *p.first->second;
5,124✔
4267
    sess.initiate(); // Throws
5,124✔
4268
    ProtocolError error;
5,124✔
4269
    bool success =
5,124✔
4270
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
5,124✔
4271
                                  error); // Throws
5,124✔
4272
    if (REALM_UNLIKELY(!success))         // Throws
5,124✔
4273
        protocol_error(error, &sess);     // Throws
28✔
4274
}
5,124✔
4275

4276

4277
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4278
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4279
                                           version_type scan_client_version, version_type latest_server_version,
4280
                                           salt_type latest_server_version_salt)
4281
{
4,294✔
4282
    auto i = m_sessions.find(session_ident);
4,294✔
4283
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,294✔
4284
        bad_session_ident("IDENT", session_ident); // Throws
×
4285
        return;
×
4286
    }
×
4287
    Session& sess = *i->second;
4,294✔
4288
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,294✔
4289
        message_after_unbind("IDENT", session_ident); // Throws
×
4290
        return;
×
4291
    }
×
4292
    if (REALM_UNLIKELY(sess.error_occurred())) {
4,294✔
4293
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4294
        // messages, other than UNBIND, must be ignored.
4295
        return;
16✔
4296
    }
16✔
4297
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
4,278✔
4298
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4299
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4300
        return;
×
4301
    }
×
4302
    if (REALM_UNLIKELY(sess.ident_message_received())) {
4,278✔
4303
        logger.error("Received second IDENT message for session"); // Throws
×
4304
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4305
        return;
×
4306
    }
×
4307

4308
    ProtocolError error = {};
4,278✔
4309
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
4,278✔
4310
                                              scan_client_version, latest_server_version, latest_server_version_salt,
4,278✔
4311
                                              error); // Throws
4,278✔
4312
    if (REALM_UNLIKELY(!success))                     // Throws
4,278✔
4313
        protocol_error(error, &sess);                 // Throws
32✔
4314
}
4,278✔
4315

4316
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4317
                                            version_type progress_server_version, version_type locked_server_version,
4318
                                            const UploadChangesets& upload_changesets)
4319
{
44,466✔
4320
    auto i = m_sessions.find(session_ident);
44,466✔
4321
    if (REALM_UNLIKELY(i == m_sessions.end())) {
44,466✔
4322
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4323
        return;
×
4324
    }
×
4325
    Session& sess = *i->second;
44,466✔
4326
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
44,466✔
4327
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4328
        return;
×
4329
    }
×
4330
    if (REALM_UNLIKELY(sess.error_occurred())) {
44,466✔
4331
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4332
        // messages, other than UNBIND, must be ignored.
4333
        return;
×
4334
    }
×
4335
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
44,466✔
4336
        message_before_ident("UPLOAD", session_ident); // Throws
×
4337
        return;
×
4338
    }
×
4339

4340
    ProtocolError error = {};
44,466✔
4341
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
44,466✔
4342
                                               locked_server_version, upload_changesets, error); // Throws
44,466✔
4343
    if (REALM_UNLIKELY(!success))                                                                // Throws
44,466✔
4344
        protocol_error(error, &sess);                                                            // Throws
×
4345
}
44,466✔
4346

4347

4348
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4349
{
12,060✔
4350
    auto i = m_sessions.find(session_ident);
12,060✔
4351
    if (REALM_UNLIKELY(i == m_sessions.end())) {
12,060✔
4352
        bad_session_ident("MARK", session_ident);
×
4353
        return;
×
4354
    }
×
4355
    Session& sess = *i->second;
12,060✔
4356
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
12,060✔
4357
        message_after_unbind("MARK", session_ident); // Throws
×
4358
        return;
×
4359
    }
×
4360
    if (REALM_UNLIKELY(sess.error_occurred())) {
12,060✔
4361
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4362
        // messages, other than UNBIND, must be ignored.
4363
        return;
46✔
4364
    }
46✔
4365
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
12,014✔
4366
        message_before_ident("MARK", session_ident); // Throws
×
4367
        return;
×
4368
    }
×
4369

4370
    ProtocolError error;
12,014✔
4371
    bool success = sess.receive_mark_message(request_ident, error); // Throws
12,014✔
4372
    if (REALM_UNLIKELY(!success))                                   // Throws
12,014✔
4373
        protocol_error(error, &sess);                               // Throws
×
4374
}
12,014✔
4375

4376

4377
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4378
{
2,198✔
4379
    auto i = m_sessions.find(session_ident); // Throws
2,198✔
4380
    if (REALM_UNLIKELY(i == m_sessions.end())) {
2,198✔
4381
        bad_session_ident("UNBIND", session_ident); // Throws
×
4382
        return;
×
4383
    }
×
4384
    Session& sess = *i->second;
2,198✔
4385
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
2,198✔
4386
        message_after_unbind("UNBIND", session_ident); // Throws
×
4387
        return;
×
4388
    }
×
4389

4390
    sess.receive_unbind_message(); // Throws
2,198✔
4391
    // NOTE: The session might have gotten destroyed at this time!
4392
}
2,198✔
4393

4394

4395
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4396
{
200✔
4397
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
200✔
4398
    m_send_pong = true;
200✔
4399
    m_last_ping_timestamp = timestamp;
200✔
4400
    if (!m_is_sending)
200✔
4401
        send_next_message();
198✔
4402
}
200✔
4403

4404

4405
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4406
                                           std::string_view error_body)
4407
{
×
4408
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4409
                 session_ident); // Throws
×
4410
    auto i = m_sessions.find(session_ident);
×
4411
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4412
        bad_session_ident("ERROR", session_ident);
×
4413
        return;
×
4414
    }
×
4415
    Session& sess = *i->second;
×
4416
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4417
        message_after_unbind("ERROR", session_ident); // Throws
×
4418
        return;
×
4419
    }
×
4420

4421
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4422
}
×
4423

4424
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4425
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4426
{
6,308✔
4427
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
6,308✔
4428
        return logger.log(level, message.c_str());
×
4429
    }
×
4430

4431
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
6,308✔
4432
    {
6,308✔
4433
        std::lock_guard lock(m_log_mutex);
6,308✔
4434
        m_log_messages.push(std::move(log_msg));
6,308✔
4435
    }
6,308✔
4436
    m_send_trigger.trigger();
6,308✔
4437
}
6,308✔
4438

4439

4440
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4441
{
×
4442
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4443
                 session_ident);                      // Throws
×
4444
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4445
}
×
4446

4447

4448
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4449
{
×
4450
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4451
                 session_ident);                      // Throws
×
4452
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4453
}
×
4454

4455

4456
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4457
{
×
4458
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4459
                 session_ident);                      // Throws
×
4460
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4461
}
×
4462

4463

4464
void SyncConnection::handle_message_received(const char* data, size_t size)
4465
{
68,340✔
4466
    // parse_message_received() parses the message and calls the
4467
    // proper handler on the SyncConnection object (this).
4468
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
68,340✔
4469
    return;
68,340✔
4470
}
68,340✔
4471

4472

4473
void SyncConnection::handle_ping_received(const char* data, size_t size)
4474
{
×
4475
    // parse_message_received() parses the message and calls the
4476
    // proper handler on the SyncConnection object (this).
4477
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4478
    return;
×
4479
}
×
4480

4481

4482
void SyncConnection::send_next_message()
4483
{
162,640✔
4484
    if (m_is_sending)
162,640✔
4485
        return;
55,054✔
4486
    REALM_ASSERT(!m_sending_pong);
107,586✔
4487
    if (m_send_pong) {
107,586✔
4488
        send_pong(m_last_ping_timestamp);
200✔
4489
        if (m_sending_pong)
200✔
4490
            return;
200✔
4491
    }
200✔
4492
    for (;;) {
160,334✔
4493
        Session* sess = m_sessions_enlisted_to_send.pop_front();
160,334✔
4494
        if (!sess) {
160,334✔
4495
            // No sessions were enlisted to send
4496
            if (REALM_LIKELY(!m_is_closing))
49,800✔
4497
                break; // Check to see if there are any log messages to go out
49,784✔
4498
            // Send a connection level ERROR
4499
            REALM_ASSERT(!is_session_level_error(m_error_code));
16✔
4500
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
16✔
4501
            return;
16✔
4502
        }
49,800✔
4503
        sess->send_message(); // Throws
110,534✔
4504
        // NOTE: The session might have gotten destroyed at this time!
4505

4506
        // At this point, `m_is_sending` is true if, and only if the session
4507
        // chose to send a message. If it chose to not send a message, we must
4508
        // loop back and give the next session in `m_sessions_enlisted_to_send`
4509
        // a chance.
4510
        if (m_is_sending)
110,534✔
4511
            return;
57,594✔
4512
    }
110,534✔
4513
    {
49,776✔
4514
        std::lock_guard lock(m_log_mutex);
49,776✔
4515
        if (!m_log_messages.empty()) {
49,776✔
4516
            send_log_message(m_log_messages.front());
6,136✔
4517
            m_log_messages.pop();
6,136✔
4518
        }
6,136✔
4519
    }
49,776✔
4520
    // Otherwise, nothing to do
4521
}
49,776✔
4522

4523

4524
void SyncConnection::initiate_write_output_buffer()
4525
{
63,732✔
4526
    auto handler = [this](std::error_code ec, size_t) {
63,732✔
4527
        if (!ec) {
63,718✔
4528
            handle_write_output_buffer();
63,594✔
4529
        }
63,594✔
4530
    };
63,718✔
4531

4532
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
63,732✔
4533
                                   std::move(handler)); // Throws
63,732✔
4534
    m_is_sending = true;
63,732✔
4535
}
63,732✔
4536

4537

4538
void SyncConnection::initiate_pong_output_buffer()
4539
{
200✔
4540
    auto handler = [this](std::error_code ec, size_t) {
200✔
4541
        if (!ec) {
198✔
4542
            handle_pong_output_buffer();
198✔
4543
        }
198✔
4544
    };
198✔
4545

4546
    REALM_ASSERT(!m_is_sending);
200✔
4547
    REALM_ASSERT(!m_sending_pong);
200✔
4548
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
200✔
4549
                                   std::move(handler)); // Throws
200✔
4550

4551
    m_is_sending = true;
200✔
4552
    m_sending_pong = true;
200✔
4553
}
200✔
4554

4555

4556
void SyncConnection::send_pong(milliseconds_type timestamp)
4557
{
200✔
4558
    REALM_ASSERT(m_send_pong);
200✔
4559
    REALM_ASSERT(!m_sending_pong);
200✔
4560
    m_send_pong = false;
200✔
4561
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
200✔
4562

4563
    OutputBuffer& out = get_output_buffer();
200✔
4564
    get_server_protocol().make_pong(out, timestamp); // Throws
200✔
4565

4566
    initiate_pong_output_buffer(); // Throws
200✔
4567
}
200✔
4568

4569
void SyncConnection::send_log_message(const LogMessage& log_msg)
4570
{
6,134✔
4571
    OutputBuffer& out = get_output_buffer();
6,134✔
4572
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
6,134✔
4573
                                           log_msg.co_id); // Throws
6,134✔
4574

4575
    initiate_write_output_buffer(); // Throws
6,134✔
4576
}
6,134✔
4577

4578

4579
void SyncConnection::handle_write_output_buffer()
4580
{
63,594✔
4581
    release_output_buffer();
63,594✔
4582
    m_is_sending = false;
63,594✔
4583
    send_next_message(); // Throws
63,594✔
4584
}
63,594✔
4585

4586

4587
void SyncConnection::handle_pong_output_buffer()
4588
{
198✔
4589
    release_output_buffer();
198✔
4590
    REALM_ASSERT(m_is_sending);
198✔
4591
    REALM_ASSERT(m_sending_pong);
198✔
4592
    m_is_sending = false;
198✔
4593
    m_sending_pong = false;
198✔
4594
    send_next_message(); // Throws
198✔
4595
}
198✔
4596

4597

4598
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4599
{
16✔
4600
    const char* message = get_protocol_error_message(int(error_code));
16✔
4601
    std::size_t message_size = std::strlen(message);
16✔
4602
    bool try_again = determine_try_again(error_code);
16✔
4603

4604
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
16✔
4605
                  message_size, try_again, session_ident); // Throws
16✔
4606

4607
    OutputBuffer& out = get_output_buffer();
16✔
4608
    int protocol_version = get_client_protocol_version();
16✔
4609
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
16✔
4610
                                             session_ident); // Throws
16✔
4611

4612
    auto handler = [this](std::error_code ec, size_t) {
16✔
4613
        handle_write_error(ec); // Throws
16✔
4614
    };
16✔
4615
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
16✔
4616
    m_is_sending = true;
16✔
4617
}
16✔
4618

4619

4620
void SyncConnection::handle_write_error(std::error_code ec)
4621
{
16✔
4622
    m_is_sending = false;
16✔
4623
    REALM_ASSERT(m_is_closing);
16✔
4624
    if (!m_ssl_stream) {
16✔
4625
        m_socket->shutdown(network::Socket::shutdown_send, ec);
16✔
4626
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
16!
4627
            throw std::system_error(ec);
×
4628
    }
16✔
4629
}
16✔
4630

4631

4632
// For connection level errors, `sess` is ignored. For session level errors, a
4633
// session must be specified.
4634
//
4635
// If a session is specified, that session object will have been detached from
4636
// the ServerFile object (and possibly destroyed) upon return from
4637
// protocol_error().
4638
//
4639
// If a session is specified for a protocol level error, that session object
4640
// will have been destroyed upon return from protocol_error(). For session level
4641
// errors, the specified session will have been destroyed upon return from
4642
// protocol_error() if, and only if the negotiated protocol version is less than
4643
// 23.
4644
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4645
{
78✔
4646
    REALM_ASSERT(!m_is_closing);
78✔
4647
    bool session_level = is_session_level_error(error_code);
78✔
4648
    REALM_ASSERT(!session_level || sess);
78✔
4649
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
78✔
4650
    if (logger.would_log(util::Logger::Level::debug)) {
78✔
4651
        const char* message = get_protocol_error_message(int(error_code));
×
4652
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4653
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4654
    }
×
4655
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
78✔
4656
    if (session_level) {
78✔
4657
        sess->initiate_deactivation(error_code); // Throws
78✔
4658
        return;
78✔
4659
    }
78✔
4660
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4661
}
×
4662

4663

4664
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4665
{
16✔
4666
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
16✔
4667

4668
    // With recent versions of the protocol (when the version is greater than,
4669
    // or equal to 23), this function will only be called for connection level
4670
    // errors, never for session specific errors. However, for the purpose of
4671
    // emulating earlier protocol versions, this function might be called for
4672
    // session specific errors too.
4673
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
16✔
4674
    REALM_ASSERT(!is_session_level_error(error_code));
16✔
4675

4676
    REALM_ASSERT(!m_is_closing);
16✔
4677
    m_is_closing = true;
16✔
4678

4679
    m_error_code = error_code;
16✔
4680
    m_error_session_ident = session_ident;
16✔
4681

4682
    // Don't waste time and effort sending any other messages
4683
    m_send_pong = false;
16✔
4684
    m_sessions_enlisted_to_send.clear();
16✔
4685

4686
    m_receiving_session = nullptr;
16✔
4687

4688
    terminate_sessions(); // Throws
16✔
4689

4690
    m_send_trigger.trigger();
16✔
4691
}
16✔
4692

4693

4694
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4695
{
672✔
4696
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
672✔
4697
    // Suicide
4698
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
672✔
4699
}
672✔
4700

4701

4702
void SyncConnection::close_due_to_error(std::error_code ec)
4703
{
536✔
4704
    // Suicide
4705
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
536✔
4706
              ec.message()); // Throws
536✔
4707
}
536✔
4708

4709

4710
void SyncConnection::terminate_sessions()
4711
{
1,228✔
4712
    for (auto& entry : m_sessions) {
1,930✔
4713
        Session& sess = *entry.second;
1,930✔
4714
        sess.terminate(); // Throws
1,930✔
4715
    }
1,930✔
4716
    m_sessions_enlisted_to_send.clear();
1,228✔
4717
    m_sessions.clear();
1,228✔
4718
}
1,228✔
4719

4720

4721
void SyncConnection::initiate_soft_close()
4722
{
16✔
4723
    if (!m_is_closing) {
16✔
4724
        session_ident_type session_ident = 0;                                    // Not session specific
16✔
4725
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
16✔
4726
    }
16✔
4727
}
16✔
4728

4729

4730
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4731
{
2,192✔
4732
    m_sessions.erase(session_ident);
2,192✔
4733
}
2,192✔
4734

4735
} // anonymous namespace
4736

4737

4738
// ============================ sync::Server implementation ============================
4739

4740
class Server::Implementation : public ServerImpl {
4741
public:
4742
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4743
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
548✔
4744
    {
1,200✔
4745
    }
1,200✔
4746
    virtual ~Implementation() {}
1,200✔
4747
};
4748

4749

4750
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4751
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
542✔
4752
{
1,194✔
4753
}
1,194✔
4754

4755

4756
Server::Server(Server&& serv) noexcept
4757
    : m_impl{std::move(serv.m_impl)}
4758
{
×
4759
}
×
4760

4761

4762
Server::~Server() noexcept {}
1,200✔
4763

4764

4765
void Server::start()
4766
{
516✔
4767
    m_impl->start(); // Throws
516✔
4768
}
516✔
4769

4770

4771
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4772
{
684✔
4773
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
684✔
4774
}
684✔
4775

4776

4777
network::Endpoint Server::listen_endpoint() const
4778
{
1,254✔
4779
    return m_impl->listen_endpoint(); // Throws
1,254✔
4780
}
1,254✔
4781

4782

4783
void Server::run()
4784
{
1,144✔
4785
    m_impl->run(); // Throws
1,144✔
4786
}
1,144✔
4787

4788

4789
void Server::stop() noexcept
4790
{
2,054✔
4791
    m_impl->stop();
2,054✔
4792
}
2,054✔
4793

4794

4795
uint_fast64_t Server::errors_seen() const noexcept
4796
{
684✔
4797
    return m_impl->errors_seen;
684✔
4798
}
684✔
4799

4800

4801
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4802
                                                      milliseconds_type timeout)
4803
{
×
4804
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4805
}
×
4806

4807

4808
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4809
{
4✔
4810
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4811
}
4✔
4812

4813

4814
void Server::close_connections()
4815
{
16✔
4816
    m_impl->close_connections();
16✔
4817
}
16✔
4818

4819

4820
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4821
{
72✔
4822
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4823
}
72✔
4824

4825

4826
void Server::recognize_external_change(const std::string& virt_path)
4827
{
4,800✔
4828
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4829
}
4,800✔
4830

4831

4832
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4833
{
×
4834
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4835
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc