• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2293

02 May 2024 08:09PM UTC coverage: 90.759% (+0.01%) from 90.747%
2293

push

Evergreen

web-flow
Fix a deadlock when accessing current user from inside an App listener (#7671)

App::switch_user() emitted changes without first releasing the lock on
m_user_mutex, leading to a deadlock if anyone inside the listener tried to
acquire the mutex. The rest of the places where we emitted changes were
correct.

The newly added wrapper catches this error when building with clang.

101946 of 180246 branches covered (56.56%)

14 of 17 new or added lines in 2 files covered. (82.35%)

67 existing lines in 15 files now uncovered.

212564 of 234207 relevant lines covered (90.76%)

5790527.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.11
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/load_file.hpp>
29
#include <realm/util/memory_stream.hpp>
30
#include <realm/util/optional.hpp>
31
#include <realm/util/platform_info.hpp>
32
#include <realm/util/random.hpp>
33
#include <realm/util/safe_int_ops.hpp>
34
#include <realm/util/scope_exit.hpp>
35
#include <realm/util/scratch_allocator.hpp>
36
#include <realm/util/thread.hpp>
37
#include <realm/util/thread_exec_guard.hpp>
38
#include <realm/util/value_reset_guard.hpp>
39
#include <realm/version.hpp>
40

41
#include <algorithm>
42
#include <atomic>
43
#include <cctype>
44
#include <chrono>
45
#include <cmath>
46
#include <condition_variable>
47
#include <cstdint>
48
#include <cstdio>
49
#include <cstring>
50
#include <functional>
51
#include <locale>
52
#include <map>
53
#include <memory>
54
#include <queue>
55
#include <sstream>
56
#include <stdexcept>
57
#include <thread>
58
#include <vector>
59

60
// NOTE: The protocol specification is in `/doc/protocol.md`
61

62

63
// FIXME: Verify that session identifier spoofing cannot be used to get access
64
// to sessions belonging to other network conections in any way.
65
// FIXME: Seems that server must close connection with zero sessions after a
66
// certain timeout.
67

68

69
using namespace realm;
70
using namespace realm::sync;
71
using namespace realm::util;
72

73
// clang-format off
74
using ServerHistory         = _impl::ServerHistory;
75
using ServerProtocol        = _impl::ServerProtocol;
76
using ServerFileAccessCache = _impl::ServerFileAccessCache;
77
using ServerImplBase = _impl::ServerImplBase;
78

79
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
80
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
81
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
82
using IntegrationResult = ServerHistory::IntegrationResult;
83
using BootstrapError = ServerHistory::BootstrapError;
84
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
85
using ClientType = ServerHistory::ClientType;
86
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
87
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
88

89
using UploadChangeset = ServerProtocol::UploadChangeset;
90
// clang-format on
91

92

93
using UploadChangesets = std::vector<UploadChangeset>;
94

95
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
96

97

98
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
99
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
100
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
101
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
102

103

104
namespace {
105

106
enum class SchedStatus { done = 0, pending, in_progress };
107

108
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
109
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
110
{
×
111
    return "com.mongodb.realm-sync/";
×
112
}
×
113

114
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
115
{
374✔
116
    if (str.size() > cutoff) {
374✔
117
        return "..." + str.substr(str.size() - cutoff);
×
118
    }
×
119
    else {
374✔
120
        return str;
374✔
121
    }
374✔
122
}
374✔
123

124

125
class HttpListHeaderValueParser {
126
public:
127
    HttpListHeaderValueParser(std::string_view string) noexcept
128
        : m_string{string}
916✔
129
    {
2,024✔
130
    }
2,024✔
131
    bool next(std::string_view& elem) noexcept
132
    {
24,286✔
133
        while (m_pos < m_string.size()) {
24,286✔
134
            size_type i = m_pos;
22,262✔
135
            size_type j = m_string.find(',', i);
22,262✔
136
            if (j != std::string_view::npos) {
22,262✔
137
                m_pos = j + 1;
20,236✔
138
            }
20,236✔
139
            else {
2,026✔
140
                j = m_string.size();
2,026✔
141
                m_pos = j;
2,026✔
142
            }
2,026✔
143

144
            // Exclude leading and trailing white space
145
            while (i < j && is_http_lws(m_string[i]))
42,498✔
146
                ++i;
20,236✔
147
            while (j > i && is_http_lws(m_string[j - 1]))
22,262✔
148
                --j;
×
149

150
            if (i != j) {
22,262✔
151
                elem = m_string.substr(i, j - i);
22,262✔
152
                return true;
22,262✔
153
            }
22,262✔
154
        }
22,262✔
155
        return false;
2,024✔
156
    }
24,286✔
157

158
private:
159
    using size_type = std::string_view::size_type;
160
    const std::string_view m_string;
161
    size_type m_pos = 0;
162
    static bool is_http_lws(char ch) noexcept
163
    {
64,748✔
164
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
64,748✔
165
    }
64,748✔
166
};
167

168

169
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
170
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
171
using SteadyTimePoint = SteadyClock::time_point;
172

173
SteadyTimePoint steady_clock_now() noexcept
174
{
154,628✔
175
    return SteadyClock::now();
154,628✔
176
}
154,628✔
177

178
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
179
{
38,874✔
180
    auto duration = end_time - start_time;
38,874✔
181
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
38,874✔
182
    return milliseconds_type(millis_duration);
38,874✔
183
}
38,874✔
184

185

186
bool determine_try_again(ProtocolError error_code) noexcept
187
{
100✔
188
    return (error_code == ProtocolError::connection_closed);
100✔
189
}
100✔
190

191

192
class ServerFile;
193
class ServerImpl;
194
class HTTPConnection;
195
class SyncConnection;
196
class Session;
197

198

199
using Formatter = util::ResettableExpandableBufferOutputStream;
200
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
201

202
using ProtocolVersionRange = std::pair<int, int>;
203

204
class MiscBuffers {
205
public:
206
    Formatter formatter;
207
    OutputBuffer download_message;
208

209
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
210
    ProtocolVersionRanges protocol_version_ranges;
211

212
    std::vector<char> compress;
213

214
    MiscBuffers()
215
    {
1,192✔
216
        formatter.imbue(std::locale::classic());
1,192✔
217
        download_message.imbue(std::locale::classic());
1,192✔
218
    }
1,192✔
219
};
220

221

222
struct DownloadCache {
223
    std::unique_ptr<char[]> body;
224
    std::size_t uncompressed_body_size;
225
    std::size_t compressed_body_size;
226
    bool body_is_compressed;
227
    version_type end_version;
228
    DownloadCursor download_progress;
229
    std::uint_fast64_t downloadable_bytes;
230
    std::size_t num_changesets;
231
    std::size_t accum_original_size;
232
    std::size_t accum_compacted_size;
233
};
234

235

236
// An unblocked work unit is comprised of one Work object for each of the files
237
// that contribute work to the work unit, generally one reference file and a
238
// number of partial files.
239
class Work {
240
public:
241
    // In general, primary work is all forms of modifying work, including file
242
    // deletion.
243
    bool has_primary_work = false;
244

245
    // Only for reference files
246
    bool might_produce_new_sync_version = false;
247

248
    bool produced_new_realm_version = false;
249
    bool produced_new_sync_version = false;
250
    bool expired_reference_version = false;
251

252
    // True if, and only if changesets_from_downstream contains at least one
253
    // changeset.
254
    bool have_changesets_from_downstream = false;
255

256
    FileIdentAllocSlots file_ident_alloc_slots;
257
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
258
    IntegratableChangesets changesets_from_downstream;
259

260
    VersionInfo version_info;
261

262
    // Result of integration of changesets from downstream clients
263
    IntegrationResult integration_result;
264

265
    void reset() noexcept
266
    {
38,794✔
267
        has_primary_work = false;
38,794✔
268

269
        might_produce_new_sync_version = false;
38,794✔
270

271
        produced_new_realm_version = false;
38,794✔
272
        produced_new_sync_version = false;
38,794✔
273
        expired_reference_version = false;
38,794✔
274
        have_changesets_from_downstream = false;
38,794✔
275

276
        file_ident_alloc_slots.clear();
38,794✔
277
        changeset_buffers.clear();
38,794✔
278
        changesets_from_downstream.clear();
38,794✔
279

280
        version_info = {};
38,794✔
281
        integration_result = {};
38,794✔
282
    }
38,794✔
283
};
284

285

286
class WorkerState {
287
public:
288
    FileIdentAllocSlots file_ident_alloc_slots;
289
    util::ScratchMemory scratch_memory;
290
    bool use_file_cache = true;
291
    std::unique_ptr<ServerHistory> reference_hist;
292
    DBRef reference_sg;
293
};
294

295

296
// ============================ SessionQueue ============================
297

298
class SessionQueue {
299
public:
300
    void push_back(Session*) noexcept;
301
    Session* pop_front() noexcept;
302
    void clear() noexcept;
303

304
private:
305
    Session* m_back = nullptr;
306
};
307

308

309
// ============================ FileIdentReceiver ============================
310

311
class FileIdentReceiver {
312
public:
313
    virtual void receive_file_ident(SaltedFileIdent) = 0;
314

315
protected:
316
    ~FileIdentReceiver() {}
5,640✔
317
};
318

319

320
// ============================ WorkerBox =============================
321

322
class WorkerBox {
323
public:
324
    using JobType = util::UniqueFunction<void(WorkerState&)>;
325
    void add_work(WorkerState& state, JobType job)
326
    {
×
327
        std::unique_lock<std::mutex> lock(m_mutex);
×
328
        if (m_jobs.size() >= m_queue_limit) {
×
329
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
330
            // than to queue it.
×
331
            run_a_job(lock, state, job);
×
332
        }
×
333
        else {
×
334
            // Create worker threads on demand (if all existing threads are active):
×
335
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
336
                m_threads.emplace_back([this]() {
×
337
                    WorkerState state;
×
338
                    state.use_file_cache = false;
×
339
                    JobType the_job;
×
340
                    std::unique_lock<std::mutex> lock(m_mutex);
×
341
                    for (;;) {
×
342
                        while (m_jobs.empty() && !m_finish_up)
×
343
                            m_changes.wait(lock);
×
344
                        if (m_finish_up)
×
345
                            break; // terminate thread
×
346
                        the_job = std::move(m_jobs.back());
×
347
                        m_jobs.pop_back();
×
348
                        run_a_job(lock, state, the_job);
×
349
                        m_changes.notify_all();
×
350
                    }
×
351
                });
×
352
            }
×
353

×
354
            // Submit the job for execution:
×
355
            m_jobs.emplace_back(std::move(job));
×
356
            m_changes.notify_all();
×
357
        }
×
358
    }
×
359

360
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
361
    // propagation of exceptions.
362
    void wait_completion(WorkerState& state)
363
    {
×
364
        std::unique_lock<std::mutex> lock(m_mutex);
×
365
        while (!m_jobs.empty() || m_active > 0) {
×
366
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
367
                JobType the_job = std::move(m_jobs.back());
×
368
                m_jobs.pop_back();
×
369
                run_a_job(lock, state, the_job);
×
370
            }
×
371
            else {
×
372
                m_changes.wait(lock);
×
373
            }
×
374
        }
×
375
        if (m_epr) {
×
376
            std::rethrow_exception(m_epr);
×
377
        }
×
378
    }
×
379

380
    WorkerBox(unsigned int num_threads)
381
    {
×
382
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
383
        m_max_num_threads = num_threads;
×
384
    }
×
385

386
    ~WorkerBox()
387
    {
×
388
        {
×
389
            std::unique_lock<std::mutex> lock(m_mutex);
×
390
            m_finish_up = true;
×
391
            m_changes.notify_all();
×
392
        }
×
393
        for (auto& e : m_threads)
×
394
            e.join();
×
395
    }
×
396

397
private:
398
    std::mutex m_mutex;
399
    std::condition_variable m_changes;
400
    std::vector<std::thread> m_threads;
401
    std::vector<JobType> m_jobs;
402
    unsigned int m_active = 0;
403
    bool m_finish_up = false;
404
    unsigned int m_queue_limit = 0;
405
    unsigned int m_max_num_threads = 0;
406
    std::exception_ptr m_epr;
407

408
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
409
    {
×
410
        ++m_active;
×
411
        lock.unlock();
×
412
        try {
×
413
            job(state);
×
414
            lock.lock();
×
415
        }
×
416
        catch (...) {
×
417
            lock.lock();
×
418
            if (!m_epr)
×
419
                m_epr = std::current_exception();
×
420
        }
×
421
        --m_active;
×
422
    }
×
423
};
424

425

426
// ============================ ServerFile ============================
427

428
class ServerFile : public util::RefCountBase {
429
public:
430
    util::PrefixLogger logger;
431

432
    // Logger to be used by the worker thread
433
    util::PrefixLogger wlogger;
434

435
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
436
               bool disable_sync_to_disk);
437
    ~ServerFile() noexcept;
438

439
    void initialize();
440
    void activate();
441

442
    ServerImpl& get_server() noexcept
443
    {
42,600✔
444
        return m_server;
42,600✔
445
    }
42,600✔
446

447
    const std::string& get_real_path() const noexcept
448
    {
×
449
        return m_file.realm_path;
×
450
    }
×
451

452
    const std::string& get_virt_path() const noexcept
453
    {
×
454
        return m_file.virt_path;
×
455
    }
×
456

457
    ServerFileAccessCache::File& access()
458
    {
52,724✔
459
        return m_file.access(); // Throws
52,724✔
460
    }
52,724✔
461

462
    ServerFileAccessCache::File& worker_access()
463
    {
38,732✔
464
        return m_worker_file.access(); // Throws
38,732✔
465
    }
38,732✔
466

467
    version_type get_realm_version() const noexcept
468
    {
×
469
        return m_version_info.realm_version;
×
470
    }
×
471

472
    version_type get_sync_version() const noexcept
473
    {
×
474
        return m_version_info.sync_version.version;
×
475
    }
×
476

477
    SaltedVersion get_salted_sync_version() const noexcept
478
    {
108,034✔
479
        return m_version_info.sync_version;
108,034✔
480
    }
108,034✔
481

482
    DownloadCache& get_download_cache() noexcept;
483

484
    void register_client_access(file_ident_type client_file_ident);
485

486
    using file_ident_request_type = std::int_fast64_t;
487

488
    // Initiate a request for a new client file identifier.
489
    //
490
    // Unless the request is cancelled, the identifier will be delivered to the
491
    // receiver by way of an invocation of
492
    // FileIdentReceiver::receive_file_ident().
493
    //
494
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
495
    // until after request_file_ident() has returned (no callback reentrance).
496
    //
497
    // New client file identifiers will be delivered to receivers in the order
498
    // that they were requested.
499
    //
500
    // The returned value is a nonzero integer that can be used to cancel the
501
    // request before the file identifier is delivered using
502
    // cancel_file_ident_request().
503
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
504

505
    // Cancel the specified file identifier request.
506
    //
507
    // It is an error to call this function after the identifier has been
508
    // delivered.
509
    void cancel_file_ident_request(file_ident_request_type) noexcept;
510

511
    void add_unidentified_session(Session*);
512
    void identify_session(Session*, file_ident_type client_file_ident);
513

514
    void remove_unidentified_session(Session*) noexcept;
515
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
516

517
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
518

519
    bool can_add_changesets_from_downstream() const noexcept;
520
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
521
                                        version_type locked_server_version, const UploadChangeset*,
522
                                        std::size_t num_changesets);
523

524
    // bootstrap_client_session calls the function of same name in server_history
525
    // but corrects the upload_progress with information from pending
526
    // integratable changesets. A situation can occur where a client terminates
527
    // a session and starts a new session and re-uploads changesets that are known
528
    // by the ServerFile object but not by the ServerHistory.
529
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
530
                                            SaltedVersion server_version, ClientType client_type,
531
                                            UploadCursor& upload_progress, version_type& locked_server_version,
532
                                            Logger&);
533

534
    // NOTE: This function is executed by the worker thread
535
    void worker_process_work_unit(WorkerState&);
536

537
    void recognize_external_change();
538

539
private:
540
    ServerImpl& m_server;
541
    ServerFileAccessCache::Slot m_file;
542

543
    // In general, `m_version_info` refers to the last snapshot of the Realm
544
    // file that is supposed to be visible to remote peers engaging in regular
545
    // Realm file synchronization.
546
    VersionInfo m_version_info;
547

548
    file_ident_request_type m_last_file_ident_request = 0;
549

550
    // The set of sessions whose client file identifier is not yet known, i.e.,
551
    // those for which an IDENT message has not yet been received,
552
    std::set<Session*> m_unidentified_sessions;
553

554
    // A map of the sessions whose client file identifier is known, i.e, those
555
    // for which an IDENT message has been received.
556
    std::map<file_ident_type, Session*> m_identified_sessions;
557

558
    // Used when a file used as partial view wants to allocate a client file
559
    // identifier from the reference Realm.
560
    file_ident_request_type m_file_ident_request = 0;
561

562
    struct FileIdentRequestInfo {
563
        FileIdentReceiver* receiver;
564
        file_ident_type proxy_file;
565
        ClientType client_type;
566
    };
567

568
    // When nonempty, it counts towards outstanding blocked work (see
569
    // `m_has_blocked_work`).
570
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
571

572
    // Changesets received from the downstream clients, and waiting to be
573
    // integrated, as well as information about the clients progress in terms of
574
    // integrating changesets received from the server. When nonempty, it counts
575
    // towards outstanding blocked work (see `m_has_blocked_work`).
576
    //
577
    // At any given time, the set of changesets from a particular client-side
578
    // file may be comprised of changesets received via distinct sessions.
579
    //
580
    // See also `m_num_changesets_from_downstream`.
581
    IntegratableChangesets m_changesets_from_downstream;
582

583
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
584
    //
585
    // Its purpose is also to initialize
586
    // `Work::have_changesets_from_downstream`.
587
    std::size_t m_num_changesets_from_downstream = 0;
588

589
    // The total size, in bytes, of the changesets that were received from
590
    // clients, are targeting this file, and are currently part of the blocked
591
    // work unit.
592
    //
593
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
594
    // purpose is to allow the server to keep track of the accumulated size of
595
    // changesets being processed, or waiting to be processed (metric
596
    // `upload.pending.bytes`) (see
597
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
598
    //
599
    // Its purpose is also to enable the "very poor man's" backpressure solution
600
    // (see can_add_changesets_from_downstream()).
601
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
602

603
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
604
    // currently unblocked work unit.
605
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
606

607
    // When nonempty, it counts towards outstanding blocked work (see
608
    // `m_has_blocked_work`).
609
    std::vector<std::string> m_permission_changes;
610

611
    // True iff this file, or any of its associated partial files (when
612
    // applicable), has a nonzero amount of outstanding work that is currently
613
    // held back from being passed to the worker thread because a previously
614
    // accumulated chunk of work related to this file is currently in progress.
615
    bool m_has_blocked_work = false;
616

617
    // A file, that is not a partial file, is considered *exposed to the worker
618
    // thread* from the point in time where it is submitted to the worker
619
    // (Worker::enqueue()) and up until the point in time where
620
    // group_postprocess_stage_1() starts to execute. A partial file is
621
    // considered *exposed to the worker thread* precisely when the associated
622
    // reference file is exposed to the worker thread, but only if it was in
623
    // `m_reference_file->m_work.partial_files` at the point in time where the
624
    // reference file was passed to the worker.
625
    //
626
    // While this file is exposed to the worker thread, all members of `m_work`
627
    // other than `changesets_from_downstream` may be accessed and modified by
628
    // the worker thread only.
629
    //
630
    // While this file is exposed to the worker thread,
631
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
632
    // must not be modified by any thread. This special status of
633
    // `m_work.changesets_from_downstream` is required to allow
634
    // ServerFile::bootstrap_client_session() to read from it at any time.
635
    Work m_work;
636

637
    // For reference files, set to true when work is unblocked, and reset back
638
    // to false when the work finalization process completes
639
    // (group_postprocess_stage_3()). Always zero for partial files.
640
    bool m_has_work_in_progress = 0;
641

642
    // This one must only be accessed by the worker thread.
643
    //
644
    // More specifically, `m_worker_file.access()` must only be called by the
645
    // worker thread, and if it was ever called, it must be closed by the worker
646
    // thread before the ServerFile object is destroyed, if destruction happens
647
    // before the destruction of the server object itself.
648
    ServerFileAccessCache::Slot m_worker_file;
649

650
    std::vector<std::int_fast64_t> m_deleting_connections;
651

652
    DownloadCache m_download_cache;
653

654
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
655
    void on_work_added();
656
    void group_unblock_work();
657
    void unblock_work();
658

659
    /// Resume history scanning in all sessions bound to this file. To be called
660
    /// after a successfull integration of a changeset.
661
    void resume_download() noexcept;
662

663
    // NOTE: These functions are executed by the worker thread
664
    void worker_allocate_file_identifiers();
665
    bool worker_integrate_changes_from_downstream(WorkerState&);
666
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
667
                                           DBRef& sg_ptr);
668
    ServerHistory& get_reference_file_history(WorkerState& state);
669
    void group_postprocess_stage_1();
670
    void group_postprocess_stage_2();
671
    void group_postprocess_stage_3();
672
    void group_finalize_work_stage_1();
673
    void group_finalize_work_stage_2();
674
    void finalize_work_stage_1();
675
    void finalize_work_stage_2();
676
};
677

678

679
inline DownloadCache& ServerFile::get_download_cache() noexcept
680
{
42,486✔
681
    return m_download_cache;
42,486✔
682
}
42,486✔
683

684
inline void ServerFile::group_finalize_work_stage_1()
685
{
38,336✔
686
    finalize_work_stage_1(); // Throws
38,336✔
687
}
38,336✔
688

689
inline void ServerFile::group_finalize_work_stage_2()
690
{
38,336✔
691
    finalize_work_stage_2(); // Throws
38,336✔
692
}
38,336✔
693

694

695
// ============================ Worker ============================
696

697
// All write transaction on server-side Realm files performed on behalf of the
698
// server, must be performed by the worker thread, not the network event loop
699
// thread. This is to ensure that the network event loop thread never gets
700
// blocked waiting for the worker thread to end a long running write
701
// transaction.
702
//
703
// FIXME: Currently, the event loop thread does perform a number of write
704
// transactions, but only on subtier nodes of a star topology server cluster.
705
class Worker : public ServerHistory::Context {
706
public:
707
    std::shared_ptr<util::Logger> logger_ptr;
708
    util::Logger& logger;
709

710
    explicit Worker(ServerImpl&);
711

712
    ServerFileAccessCache& get_file_access_cache() noexcept;
713

714
    void enqueue(ServerFile*);
715

716
    // Overriding members of ServerHistory::Context
717
    std::mt19937_64& server_history_get_random() noexcept override final;
718

719
private:
720
    ServerImpl& m_server;
721
    std::mt19937_64 m_random;
722
    ServerFileAccessCache m_file_access_cache;
723

724
    util::Mutex m_mutex;
725
    util::CondVar m_cond; // Protected by `m_mutex`
726

727
    bool m_stop = false; // Protected by `m_mutex`
728

729
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
730

731
    WorkerState m_state;
732

733
    void run();
734
    void stop() noexcept;
735

736
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
737
};
738

739

740
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
741
{
1,120✔
742
    return m_file_access_cache;
1,120✔
743
}
1,120✔
744

745

746
// ============================ ServerImpl ============================
747

748
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
749
public:
750
    std::uint_fast64_t errors_seen = 0;
751

752
    std::atomic<milliseconds_type> m_par_time;
753
    std::atomic<milliseconds_type> m_seq_time;
754

755
    util::Mutex last_client_accesses_mutex;
756

757
    const std::shared_ptr<util::Logger> logger_ptr;
758
    util::Logger& logger;
759

760
    network::Service& get_service() noexcept
761
    {
52,560✔
762
        return m_service;
52,560✔
763
    }
52,560✔
764

765
    const network::Service& get_service() const noexcept
766
    {
×
767
        return m_service;
×
768
    }
×
769

770
    std::mt19937_64& get_random() noexcept
771
    {
66,230✔
772
        return m_random;
66,230✔
773
    }
66,230✔
774

775
    const Server::Config& get_config() const noexcept
776
    {
114,872✔
777
        return m_config;
114,872✔
778
    }
114,872✔
779

780
    std::size_t get_max_upload_backlog() const noexcept
781
    {
45,808✔
782
        return m_max_upload_backlog;
45,808✔
783
    }
45,808✔
784

785
    const std::string& get_root_dir() const noexcept
786
    {
5,640✔
787
        return m_root_dir;
5,640✔
788
    }
5,640✔
789

790
    network::ssl::Context& get_ssl_context() noexcept
791
    {
48✔
792
        return *m_ssl_context;
48✔
793
    }
48✔
794

795
    const AccessControl& get_access_control() const noexcept
796
    {
×
797
        return m_access_control;
×
798
    }
×
799

800
    ProtocolVersionRange get_protocol_version_range() const noexcept
801
    {
2,024✔
802
        return m_protocol_version_range;
2,024✔
803
    }
2,024✔
804

805
    ServerProtocol& get_server_protocol() noexcept
806
    {
136,008✔
807
        return m_server_protocol;
136,008✔
808
    }
136,008✔
809

810
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
811
    {
4,194✔
812
        return m_compress_memory_arena;
4,194✔
813
    }
4,194✔
814

815
    MiscBuffers& get_misc_buffers() noexcept
816
    {
48,704✔
817
        return m_misc_buffers;
48,704✔
818
    }
48,704✔
819

820
    int_fast64_t get_current_server_session_ident() const noexcept
821
    {
×
822
        return m_current_server_session_ident;
×
823
    }
×
824

825
    util::ScratchMemory& get_scratch_memory() noexcept
826
    {
×
827
        return m_scratch_memory;
×
828
    }
×
829

830
    Worker& get_worker() noexcept
831
    {
40,992✔
832
        return m_worker;
40,992✔
833
    }
40,992✔
834

835
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
836
    {
×
837
        parallel_section = m_par_time;
×
838
        sequential_section = m_seq_time;
×
839
    }
×
840

841
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
842
    ~ServerImpl() noexcept;
843

844
    void start();
845

846
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
847
    {
684✔
848
        m_config.listen_address = listen_address;
684✔
849
        m_config.listen_port = listen_port;
684✔
850
        m_config.reuse_address = reuse_address;
684✔
851

852
        start(); // Throws
684✔
853
    }
684✔
854

855
    network::Endpoint listen_endpoint() const
856
    {
1,226✔
857
        return m_acceptor.local_endpoint();
1,226✔
858
    }
1,226✔
859

860
    void run();
861
    void stop() noexcept;
862

863
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
864

865
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
866
    void remove_sync_connection(int_fast64_t connection_id);
867

868
    size_t get_number_of_http_connections()
869
    {
×
870
        return m_http_connections.size();
×
871
    }
×
872

873
    size_t get_number_of_sync_connections()
874
    {
×
875
        return m_sync_connections.size();
×
876
    }
×
877

878
    bool is_sync_stopped()
879
    {
40,818✔
880
        return m_sync_stopped;
40,818✔
881
    }
40,818✔
882

883
    const std::set<std::string>& get_realm_names() const noexcept
884
    {
×
885
        return m_realm_names;
×
886
    }
×
887

888
    // virt_path must be valid when get_or_create_file() is called.
889
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
890
    {
5,612✔
891
        util::bind_ptr<ServerFile> file = get_file(virt_path);
5,612✔
892
        if (REALM_LIKELY(file))
5,612✔
893
            return file;
4,488✔
894

895
        _impl::VirtualPathComponents virt_path_components =
1,124✔
896
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,124✔
897
        REALM_ASSERT(virt_path_components.is_valid);
1,124✔
898

899
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,124✔
900
        m_realm_names.insert(virt_path);         // Throws
1,124✔
901
        {
1,124✔
902
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,124✔
903
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,124✔
904
                                      disable_sync_to_disk)); // Throws
1,124✔
905
        }
1,124✔
906

907
        file->initialize();
1,124✔
908
        m_files[virt_path] = file; // Throws
1,124✔
909
        file->activate();          // Throws
1,124✔
910
        return file;
1,124✔
911
    }
5,612✔
912

913
    std::unique_ptr<ServerHistory> make_history_for_path()
914
    {
×
915
        return std::make_unique<ServerHistory>(*this);
×
916
    }
×
917

918
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
919
    {
5,612✔
920
        auto i = m_files.find(virt_path);
5,612✔
921
        if (REALM_LIKELY(i != m_files.end()))
5,612✔
922
            return i->second;
4,490✔
923
        return {};
1,122✔
924
    }
5,612✔
925

926
    // Returns the number of seconds since the Epoch of
927
    // std::chrono::system_clock.
928
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
929
    {
×
930
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
931
            return m_config.token_expiration_clock->now();
×
932
        return std::chrono::system_clock::now();
×
933
    }
×
934

935
    void set_connection_reaper_timeout(milliseconds_type);
936

937
    void close_connections();
938
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
939

940
    void recognize_external_change(const std::string& virt_path);
941

942
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
943
                                                  milliseconds_type timeout);
944

945
    // Server global outputbuffers that can be reused.
946
    // The server is single threaded, so there are no
947
    // synchronization issues.
948
    // output_buffers_count is equal to the
949
    // maximum number of buffers needed at any point.
950
    static constexpr int output_buffers_count = 1;
951
    OutputBuffer output_buffers[output_buffers_count];
952

953
    bool is_load_balancing_allowed() const
954
    {
×
955
        return m_allow_load_balancing;
×
956
    }
×
957

958
    // inc_byte_size_for_pending_downstream_changesets() must be called by
959
    // ServerFile objects when changesets from downstream clients have been
960
    // received.
961
    //
962
    // dec_byte_size_for_pending_downstream_changesets() must be called by
963
    // ServerFile objects when changesets from downstream clients have been
964
    // processed or discarded.
965
    //
966
    // ServerImpl uses this information to keep a running tally (metric
967
    // `upload.pending.bytes`) of the total byte size of pending changesets from
968
    // downstream clients.
969
    //
970
    // These functions must be called on the network thread.
971
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
972
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
973

974
    // Overriding member functions in _impl::ServerHistory::Context
975
    std::mt19937_64& server_history_get_random() noexcept override final;
976

977
private:
978
    Server::Config m_config;
979
    network::Service m_service;
980
    std::mt19937_64 m_random;
981
    const std::size_t m_max_upload_backlog;
982
    const std::string m_root_dir;
983
    const AccessControl m_access_control;
984
    const ProtocolVersionRange m_protocol_version_range;
985

986
    // The reserved files will be closed in situations where the server
987
    // runs out of file descriptors.
988
    std::unique_ptr<File> m_reserved_files[5];
989

990
    // The set of all Realm files known to this server, represented by their
991
    // virtual path.
992
    //
993
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
994
    // reported by an invocation of _impl::get_realm_names()), then the
995
    // corresponding virtual path is in `m_realm_names`, assuming no external
996
    // file-system level intervention.
997
    std::set<std::string> m_realm_names;
998

999
    std::unique_ptr<network::ssl::Context> m_ssl_context;
1000
    ServerFileAccessCache m_file_access_cache;
1001
    Worker m_worker;
1002
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1003
    network::Acceptor m_acceptor;
1004
    std::int_fast64_t m_next_conn_id = 0;
1005
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1006
    network::Endpoint m_next_http_conn_endpoint;
1007
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1008
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1009
    ServerProtocol m_server_protocol;
1010
    compression::CompressMemoryArena m_compress_memory_arena;
1011
    MiscBuffers m_misc_buffers;
1012
    int_fast64_t m_current_server_session_ident;
1013
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1014
    bool m_allow_load_balancing = false;
1015

1016
    util::Mutex m_mutex;
1017

1018
    bool m_stopped = false; // Protected by `m_mutex`
1019

1020
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1021
    // When m_sync_stopped is true, the server does not perform any sync.
1022
    bool m_sync_stopped = false;
1023

1024
    std::atomic<bool> m_running{false}; // Debugging facility
1025

1026
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1027

1028
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1029

1030
    util::ScratchMemory m_scratch_memory;
1031

1032
    void listen();
1033
    void initiate_accept();
1034
    void handle_accept(std::error_code);
1035

1036
    void reap_connections();
1037
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1038
    void do_close_connections();
1039

1040
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1041
    {
1,192✔
1042
        if (config.max_upload_backlog == 0)
1,192✔
1043
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
1,192✔
1044
        return config.max_upload_backlog;
×
1045
    }
1,192✔
1046

1047
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1048
    {
1,192✔
1049
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
1,192✔
1050
        const int actual_max = get_current_protocol_version();
1,192✔
1051
        static_assert(actual_min <= actual_max, "");
1,192✔
1052
        int min = actual_min;
1,192✔
1053
        int max = actual_max;
1,192✔
1054
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
1,192!
1055
            if (config.max_protocol_version < min)
×
1056
                throw Server::NoSupportedProtocolVersions();
×
1057
            max = config.max_protocol_version;
×
1058
        }
×
1059
        return {min, max};
1,192✔
1060
    }
1,192✔
1061

1062
    void do_recognize_external_change(const std::string& virt_path);
1063

1064
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1065
                                                     milliseconds_type timeout);
1066
};
1067

1068
// ============================ SyncConnection ============================
1069

1070
class SyncConnection : public websocket::Config {
1071
public:
1072
    const std::shared_ptr<util::Logger> logger_ptr;
1073
    util::Logger& logger;
1074

1075
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1076
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1077
    // Clients with sync protocol version less than 10 do not support log messages
1078
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1079

1080
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1081
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1082
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1083
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1084
        : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
916✔
1085
                                                          serv.logger_ptr)} // Throws
916✔
1086
        , logger{*logger_ptr}
916✔
1087
        , m_server{serv}
916✔
1088
        , m_id{id}
916✔
1089
        , m_socket{std::move(socket)}
916✔
1090
        , m_ssl_stream{std::move(ssl_stream)}
916✔
1091
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
916✔
1092
        , m_websocket{*this}
916✔
1093
        , m_client_protocol_version{client_protocol_version}
916✔
1094
        , m_client_user_agent{std::move(client_user_agent)}
916✔
1095
        , m_remote_endpoint{std::move(remote_endpoint)}
916✔
1096
        , m_appservices_request_id{std::move(appservices_request_id)}
916✔
1097
    {
2,024✔
1098
        // Make the output buffer stream throw std::bad_alloc if it fails to
1099
        // expand the buffer
1100
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
2,024✔
1101

1102
        network::Service& service = m_server.get_service();
2,024✔
1103
        auto handler = [this](Status status) {
99,512✔
1104
            if (!status.is_ok())
99,512✔
1105
                return;
×
1106
            if (!m_is_sending)
99,512✔
1107
                send_next_message(); // Throws
43,912✔
1108
        };
99,512✔
1109
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
2,024✔
1110
    }
2,024✔
1111

1112
    ~SyncConnection() noexcept;
1113

1114
    ServerImpl& get_server() noexcept
1115
    {
117,976✔
1116
        return m_server;
117,976✔
1117
    }
117,976✔
1118

1119
    ServerProtocol& get_server_protocol() noexcept
1120
    {
136,010✔
1121
        return m_server.get_server_protocol();
136,010✔
1122
    }
136,010✔
1123

1124
    int get_client_protocol_version()
1125
    {
105,996✔
1126
        return m_client_protocol_version;
105,996✔
1127
    }
105,996✔
1128

1129
    const std::string& get_client_user_agent() const noexcept
1130
    {
5,610✔
1131
        return m_client_user_agent;
5,610✔
1132
    }
5,610✔
1133

1134
    const std::string& get_remote_endpoint() const noexcept
1135
    {
5,610✔
1136
        return m_remote_endpoint;
5,610✔
1137
    }
5,610✔
1138

1139
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1140
    {
2,024✔
1141
        return logger_ptr;
2,024✔
1142
    }
2,024✔
1143

1144
    std::mt19937_64& websocket_get_random() noexcept final override
1145
    {
65,108✔
1146
        return m_server.get_random();
65,108✔
1147
    }
65,108✔
1148

1149
    bool websocket_binary_message_received(const char* data, size_t size) final override
1150
    {
71,416✔
1151
        using sf = _impl::SimulatedFailure;
71,416✔
1152
        if (sf::check_trigger(sf::sync_server__read_head)) {
71,416✔
1153
            // Suicide
1154
            read_error(sf::sync_server__read_head);
506✔
1155
            return false;
506✔
1156
        }
506✔
1157
        // After a connection level error has occurred, all incoming messages
1158
        // will be ignored. By continuing to read until end of input, the server
1159
        // is able to know when the client closes the connection, which in
1160
        // general means that is has received the ERROR message.
1161
        if (REALM_LIKELY(!m_is_closing)) {
70,910✔
1162
            m_last_activity_at = steady_clock_now();
70,900✔
1163
            handle_message_received(data, size);
70,900✔
1164
        }
70,900✔
1165
        return true;
70,910✔
1166
    }
71,416✔
1167

1168
    bool websocket_ping_message_received(const char* data, size_t size) final override
1169
    {
×
1170
        if (REALM_LIKELY(!m_is_closing)) {
×
1171
            m_last_activity_at = steady_clock_now();
×
1172
            handle_ping_received(data, size);
×
1173
        }
×
1174
        return true;
×
1175
    }
×
1176

1177
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1178
    {
65,114✔
1179
        if (m_ssl_stream) {
65,114✔
1180
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1181
        }
70✔
1182
        else {
65,044✔
1183
            m_socket->async_write(data, size, std::move(handler)); // Throws
65,044✔
1184
        }
65,044✔
1185
    }
65,114✔
1186

1187
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1188
    {
215,770✔
1189
        if (m_ssl_stream) {
215,770✔
1190
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
170✔
1191
        }
170✔
1192
        else {
215,600✔
1193
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
215,600✔
1194
        }
215,600✔
1195
    }
215,770✔
1196

1197
    void async_read_until(char* buffer, size_t size, char delim,
1198
                          websocket::ReadCompletionHandler handler) final override
1199
    {
×
1200
        if (m_ssl_stream) {
×
1201
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1202
                                           std::move(handler)); // Throws
×
1203
        }
×
1204
        else {
×
1205
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1206
                                       std::move(handler)); // Throws
×
1207
        }
×
1208
    }
×
1209

1210
    void websocket_read_error_handler(std::error_code ec) final override
1211
    {
666✔
1212
        read_error(ec);
666✔
1213
    }
666✔
1214

1215
    void websocket_write_error_handler(std::error_code ec) final override
1216
    {
×
1217
        write_error(ec);
×
1218
    }
×
1219

1220
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view) final override
1221
    {
×
1222
        // WebSocket class has already logged a message for this error
1223
        close_due_to_error(ec); // Throws
×
1224
    }
×
1225

1226
    void websocket_protocol_error_handler(std::error_code ec) final override
1227
    {
×
1228
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1229
        close_due_to_error(ec);                                              // Throws
×
1230
    }
×
1231

1232
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1233
    {
×
1234
        // This is not called since we handle HTTP request in handle_request_for_sync()
1235
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
1236
    }
×
1237

1238
    int_fast64_t get_id() const noexcept
1239
    {
×
1240
        return m_id;
×
1241
    }
×
1242

1243
    network::Socket& get_socket() noexcept
1244
    {
×
1245
        return *m_socket;
×
1246
    }
×
1247

1248
    void initiate();
1249

1250
    // Commits suicide
1251
    template <class... Params>
1252
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1253

1254
    // Commits suicide
1255
    void terminate_if_dead(SteadyTimePoint now);
1256

1257
    void enlist_to_send(Session*) noexcept;
1258

1259
    // Sessions should get the output_buffer and insert a message, after which
1260
    // they call initiate_write_output_buffer().
1261
    OutputBuffer& get_output_buffer()
1262
    {
65,108✔
1263
        m_output_buffer.reset();
65,108✔
1264
        return m_output_buffer;
65,108✔
1265
    }
65,108✔
1266

1267
    // More advanced memory strategies can be implemented if needed.
1268
    void release_output_buffer() {}
64,930✔
1269

1270
    // When this function is called, the connection will initiate a write with
1271
    // its output_buffer. Sessions use this method.
1272
    void initiate_write_output_buffer();
1273

1274
    void initiate_pong_output_buffer();
1275

1276
    void handle_protocol_error(Status status);
1277

1278
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1279
                              bool need_client_file_ident, bool is_subserver);
1280

1281
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1282
                               salt_type client_file_ident_salt, version_type scan_server_version,
1283
                               version_type scan_client_version, version_type latest_server_version,
1284
                               salt_type latest_server_version_salt);
1285

1286
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1287
                                version_type progress_server_version, version_type locked_server_version,
1288
                                const UploadChangesets&);
1289

1290
    void receive_mark_message(session_ident_type, request_ident_type);
1291

1292
    void receive_unbind_message(session_ident_type);
1293

1294
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1295

1296
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1297

1298
    void protocol_error(ProtocolError, Session* = nullptr);
1299

1300
    void initiate_soft_close();
1301

1302
    void discard_session(session_ident_type) noexcept;
1303

1304
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1305
                          std::optional<std::string> co_id = std::nullopt);
1306

1307
private:
1308
    ServerImpl& m_server;
1309
    const int_fast64_t m_id;
1310
    std::unique_ptr<network::Socket> m_socket;
1311
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1312
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1313

1314
    websocket::Socket m_websocket;
1315
    std::unique_ptr<char[]> m_input_body_buffer;
1316
    OutputBuffer m_output_buffer;
1317
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1318

1319
    // The protocol version in use by the connected client.
1320
    const int m_client_protocol_version;
1321

1322
    // The user agent description passed by the client.
1323
    const std::string m_client_user_agent;
1324

1325
    const std::string m_remote_endpoint;
1326

1327
    const std::string m_appservices_request_id;
1328

1329
    // A queue of sessions that have enlisted for an opportunity to send a
1330
    // message. Sessions will be served in the order that they enlist. A session
1331
    // can only occur once in this queue (linked list). If the queue is not
1332
    // empty, and no message is currently being written to the socket, the first
1333
    // session is taken out of the queue, and then granted an opportunity to
1334
    // send a message.
1335
    //
1336
    // Sessions will never be destroyed while in this queue. This is ensured
1337
    // because the connection owns the sessions that are associated with it, and
1338
    // the connection only removes a session from m_sessions at points in time
1339
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1340
    // (Connection::send_next_message() and Connection::~Connection()).
1341
    SessionQueue m_sessions_enlisted_to_send;
1342

1343
    Session* m_receiving_session = nullptr;
1344

1345
    bool m_is_sending = false;
1346
    bool m_is_closing = false;
1347

1348
    bool m_send_pong = false;
1349
    bool m_sending_pong = false;
1350

1351
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1352

1353
    milliseconds_type m_last_ping_timestamp = 0;
1354

1355
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1356
    // set to true (initiation of soft close). Otherwise, if no messages have
1357
    // been received from the client, this is the time at which the connection
1358
    // object was initiated (completion of WebSocket handshake). Otherwise this
1359
    // is the time at which the last message was received from the client.
1360
    SteadyTimePoint m_last_activity_at;
1361

1362
    // These are initialized by do_initiate_soft_close().
1363
    //
1364
    // With recent versions of the protocol (when the version is greater than,
1365
    // or equal to 23), `m_error_session_ident` is always zero.
1366
    ProtocolError m_error_code = {};
1367
    session_ident_type m_error_session_ident = 0;
1368

1369
    struct LogMessage {
1370
        session_ident_type sess_ident;
1371
        util::Logger::Level level;
1372
        std::string message;
1373
        std::optional<std::string> co_id;
1374
    };
1375

1376
    std::mutex m_log_mutex;
1377
    std::queue<LogMessage> m_log_messages;
1378

1379
    static std::string make_logger_prefix(int_fast64_t id)
1380
    {
2,024✔
1381
        std::ostringstream out;
2,024✔
1382
        out.imbue(std::locale::classic());
2,024✔
1383
        out << "Sync Connection[" << id << "]: "; // Throws
2,024✔
1384
        return out.str();                         // Throws
2,024✔
1385
    }
2,024✔
1386

1387
    // The return value of handle_message_received() designates whether
1388
    // message processing should continue. If the connection object is
1389
    // destroyed during execution of handle_message_received(), the return
1390
    // value must be false.
1391
    void handle_message_received(const char* data, size_t size);
1392

1393
    void handle_ping_received(const char* data, size_t size);
1394

1395
    void send_next_message();
1396
    void send_pong(milliseconds_type timestamp);
1397
    void send_log_message(const LogMessage& log_msg);
1398

1399
    void handle_write_output_buffer();
1400
    void handle_pong_output_buffer();
1401

1402
    void initiate_write_error(ProtocolError, session_ident_type);
1403
    void handle_write_error(std::error_code ec);
1404

1405
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1406
    void read_error(std::error_code);
1407
    void write_error(std::error_code);
1408

1409
    void close_due_to_close_by_client(std::error_code);
1410
    void close_due_to_error(std::error_code);
1411

1412
    void terminate_sessions();
1413

1414
    void bad_session_ident(const char* message_type, session_ident_type);
1415
    void message_after_unbind(const char* message_type, session_ident_type);
1416
    void message_before_ident(const char* message_type, session_ident_type);
1417
};
1418

1419

1420
inline void SyncConnection::read_error(std::error_code ec)
1421
{
1,172✔
1422
    REALM_ASSERT(ec != util::error::operation_aborted);
1,172✔
1423
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,172✔
1424
        // Suicide
1425
        close_due_to_close_by_client(ec); // Throws
666✔
1426
        return;
666✔
1427
    }
666✔
1428
    if (ec == util::MiscExtErrors::delim_not_found) {
506✔
1429
        logger.error("Input message head delimited not found"); // Throws
×
1430
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1431
        return;
×
1432
    }
×
1433

1434
    logger.error("Reading failed: %1", ec.message()); // Throws
506✔
1435

1436
    // Suicide
1437
    close_due_to_error(ec); // Throws
506✔
1438
}
506✔
1439

1440
inline void SyncConnection::write_error(std::error_code ec)
1441
{
×
1442
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1443
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1444
        // Suicide
1445
        close_due_to_close_by_client(ec); // Throws
×
1446
        return;
×
1447
    }
×
1448
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1449

1450
    // Suicide
1451
    close_due_to_error(ec); // Throws
×
1452
}
×
1453

1454

1455
// ============================ HTTPConnection ============================
1456

1457
std::string g_user_agent = "User-Agent";
1458

1459
class HTTPConnection {
1460
public:
1461
    const std::shared_ptr<Logger> logger_ptr;
1462
    util::Logger& logger;
1463

1464
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1465
        : logger_ptr{std::make_shared<PrefixLogger>(util::LogCategory::server, make_logger_prefix(id),
1,486✔
1466
                                                    serv.logger_ptr)} // Throws
1,486✔
1467
        , logger{*logger_ptr}
1,486✔
1468
        , m_server{serv}
1,486✔
1469
        , m_id{id}
1,486✔
1470
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1,486✔
1471
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1,486✔
1472
        , m_http_server{*this, logger_ptr}
1,486✔
1473
    {
3,262✔
1474
        // Make the output buffer stream throw std::bad_alloc if it fails to
1475
        // expand the buffer
1476
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
3,262✔
1477

1478
        if (is_ssl) {
3,262✔
1479
            using namespace network::ssl;
48✔
1480
            Context& ssl_context = serv.get_ssl_context();
48✔
1481
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1482
                                                    Stream::server); // Throws
48✔
1483
        }
48✔
1484
    }
3,262✔
1485

1486
    ServerImpl& get_server() noexcept
1487
    {
×
1488
        return m_server;
×
1489
    }
×
1490

1491
    int_fast64_t get_id() const noexcept
1492
    {
2,070✔
1493
        return m_id;
2,070✔
1494
    }
2,070✔
1495

1496
    network::Socket& get_socket() noexcept
1497
    {
4,998✔
1498
        return *m_socket;
4,998✔
1499
    }
4,998✔
1500

1501
    template <class H>
1502
    void async_write(const char* data, size_t size, H handler)
1503
    {
2,044✔
1504
        if (m_ssl_stream) {
2,044✔
1505
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1506
        }
14✔
1507
        else {
2,030✔
1508
            m_socket->async_write(data, size, std::move(handler)); // Throws
2,030✔
1509
        }
2,030✔
1510
    }
2,044✔
1511

1512
    template <class H>
1513
    void async_read(char* buffer, size_t size, H handler)
1514
    {
8✔
1515
        if (m_ssl_stream) {
8✔
1516
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1517
                                     std::move(handler)); // Throws
×
1518
        }
×
1519
        else {
8✔
1520
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1521
                                 std::move(handler)); // Throws
8✔
1522
        }
8✔
1523
    }
8✔
1524

1525
    template <class H>
1526
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1527
    {
18,292✔
1528
        if (m_ssl_stream) {
18,292✔
1529
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1530
                                           std::move(handler)); // Throws
126✔
1531
        }
126✔
1532
        else {
18,166✔
1533
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
18,166✔
1534
                                       std::move(handler)); // Throws
18,166✔
1535
        }
18,166✔
1536
    }
18,292✔
1537

1538
    void initiate(std::string remote_endpoint)
1539
    {
2,070✔
1540
        m_last_activity_at = steady_clock_now();
2,070✔
1541
        m_remote_endpoint = std::move(remote_endpoint);
2,070✔
1542

1543
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
2,070✔
1544

1545
        if (m_ssl_stream) {
2,070✔
1546
            initiate_ssl_handshake(); // Throws
24✔
1547
        }
24✔
1548
        else {
2,046✔
1549
            initiate_http(); // Throws
2,046✔
1550
        }
2,046✔
1551
    }
2,070✔
1552

1553
    void respond_200_ok()
1554
    {
×
1555
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1556
    }
×
1557

1558
    void respond_404_not_found()
1559
    {
×
1560
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1561
    }
×
1562

1563
    void respond_503_service_unavailable()
1564
    {
×
1565
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1566
    }
×
1567

1568
    // Commits suicide
1569
    template <class... Params>
1570
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1571
    {
46✔
1572
        logger.log(log_level, log_message, log_params...); // Throws
46✔
1573
        m_ssl_stream.reset();
46✔
1574
        m_socket.reset();
46✔
1575
        m_server.remove_http_connection(m_id); // Suicide
46✔
1576
    }
46✔
1577

1578
    // Commits suicide
1579
    void terminate_if_dead(SteadyTimePoint now)
1580
    {
2✔
1581
        milliseconds_type time = steady_duration(m_last_activity_at, now);
2✔
1582
        const Server::Config& config = m_server.get_config();
2✔
1583
        if (m_is_sending) {
2✔
1584
            if (time >= config.http_response_timeout) {
×
1585
                // Suicide
1586
                terminate(Logger::Level::detail,
×
1587
                          "HTTP connection closed (request timeout)"); // Throws
×
1588
            }
×
1589
        }
×
1590
        else {
2✔
1591
            if (time >= config.http_request_timeout) {
2✔
1592
                // Suicide
1593
                terminate(Logger::Level::detail,
×
1594
                          "HTTP connection closed (response timeout)"); // Throws
×
1595
            }
×
1596
        }
2✔
1597
    }
2✔
1598

1599
    std::string get_appservices_request_id() const
1600
    {
2,044✔
1601
        return m_appservices_request_id.to_string();
2,044✔
1602
    }
2,044✔
1603

1604
private:
1605
    ServerImpl& m_server;
1606
    const int_fast64_t m_id;
1607
    const ObjectId m_appservices_request_id = ObjectId::gen();
1608
    std::unique_ptr<network::Socket> m_socket;
1609
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1610
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1611
    HTTPServer<HTTPConnection> m_http_server;
1612
    OutputBuffer m_output_buffer;
1613
    bool m_is_sending = false;
1614
    SteadyTimePoint m_last_activity_at;
1615
    std::string m_remote_endpoint;
1616
    int m_negotiated_protocol_version = 0;
1617

1618
    void initiate_ssl_handshake()
1619
    {
24✔
1620
        auto handler = [this](std::error_code ec) {
24✔
1621
            if (ec != util::error::operation_aborted)
24✔
1622
                handle_ssl_handshake(ec); // Throws
24✔
1623
        };
24✔
1624
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1625
    }
24✔
1626

1627
    void handle_ssl_handshake(std::error_code ec)
1628
    {
24✔
1629
        if (ec) {
24✔
1630
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
10✔
1631
            close_due_to_error(ec);                                         // Throws
10✔
1632
            return;
10✔
1633
        }
10✔
1634
        initiate_http(); // Throws
14✔
1635
    }
14✔
1636

1637
    void initiate_http()
1638
    {
2,060✔
1639
        logger.debug("Connection initiates HTTP receipt");
2,060✔
1640

1641
        auto handler = [this](HTTPRequest request, std::error_code ec) {
2,060✔
1642
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
2,060✔
1643
                return;
×
1644
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
2,060✔
1645
                logger.error("Malformed HTTP request");
×
1646
                close_due_to_error(ec); // Throws
×
1647
                return;
×
1648
            }
×
1649
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
2,060✔
1650
                logger.error("Bad HTTP request");
8✔
1651
                const char* body = "The HTTP request was corrupted";
8✔
1652
                handle_400_bad_request(body); // Throws
8✔
1653
                return;
8✔
1654
            }
8✔
1655
            if (REALM_UNLIKELY(ec)) {
2,052✔
1656
                read_error(ec); // Throws
16✔
1657
                return;
16✔
1658
            }
16✔
1659
            handle_http_request(std::move(request)); // Throws
2,036✔
1660
        };
2,036✔
1661
        m_http_server.async_receive_request(std::move(handler)); // Throws
2,060✔
1662
    }
2,060✔
1663

1664
    void handle_http_request(const HTTPRequest& request)
1665
    {
2,036✔
1666
        StringData path = request.path;
2,036✔
1667

1668
        logger.debug("HTTP request received, request = %1", request);
2,036✔
1669

1670
        m_is_sending = true;
2,036✔
1671
        m_last_activity_at = steady_clock_now();
2,036✔
1672

1673
        // FIXME: When thinking of this function as a switching device, it seem
1674
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
1675
        // supposed to be mandatory, then that check ought to be delegated to
1676
        // handle_request_for_sync(), as that will yield a sharper separation of
1677
        // concerns.
1678
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
2,036✔
1679
            handle_request_for_sync(request); // Throws
2,024✔
1680
        }
2,024✔
1681
        else {
12✔
1682
            handle_404_not_found(request); // Throws
12✔
1683
        }
12✔
1684
    }
2,036✔
1685

1686
    void handle_request_for_sync(const HTTPRequest& request)
1687
    {
2,024✔
1688
        if (m_server.is_sync_stopped()) {
2,024✔
1689
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1690
                         "stopped"); // Throws
×
1691
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1692
                                                    "connections"); // Throws
×
1693
            return;
×
1694
        }
×
1695

1696
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
2,024✔
1697

1698
        // Figure out whether there are any protocol versions supported by both
1699
        // the client and the server, and if so, choose the newest one of them.
1700
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
2,024✔
1701
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
2,024✔
1702
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
2,024✔
1703
        {
2,024✔
1704
            protocol_version_ranges.clear();
2,024✔
1705
            util::MemoryInputStream in;
2,024✔
1706
            in.imbue(std::locale::classic());
2,024✔
1707
            in.unsetf(std::ios_base::skipws);
2,024✔
1708
            std::string_view value;
2,024✔
1709
            if (sec_websocket_protocol)
2,024✔
1710
                value = *sec_websocket_protocol;
2,024✔
1711
            HttpListHeaderValueParser parser{value};
2,024✔
1712
            std::string_view elem;
2,024✔
1713
            while (parser.next(elem)) {
24,288✔
1714
                // FIXME: Use std::string_view::begins_with() in C++20.
1715
                const StringData protocol{elem};
22,262✔
1716
                std::string_view prefix;
22,262✔
1717
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
22,262✔
1718
                    prefix = get_pbs_websocket_protocol_prefix();
22,262✔
UNCOV
1719
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
×
1720
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1721
                if (!prefix.empty()) {
22,262✔
1722
                    auto parse_version = [&](std::string_view str) {
22,264✔
1723
                        in.set_buffer(str.data(), str.data() + str.size());
22,264✔
1724
                        int version = 0;
22,264✔
1725
                        in >> version;
22,264✔
1726
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
22,264✔
1727
                            return version;
22,264✔
UNCOV
1728
                        return -1;
×
1729
                    };
22,264✔
1730
                    int min, max;
22,262✔
1731
                    std::string_view range = elem.substr(prefix.size());
22,262✔
1732
                    auto i = range.find('-');
22,262✔
1733
                    if (i != std::string_view::npos) {
22,262✔
1734
                        min = parse_version(range.substr(0, i));
×
1735
                        max = parse_version(range.substr(i + 1));
×
1736
                    }
×
1737
                    else {
22,262✔
1738
                        min = parse_version(range);
22,262✔
1739
                        max = min;
22,262✔
1740
                    }
22,262✔
1741
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
22,264✔
1742
                        protocol_version_ranges.emplace_back(min, max); // Throws
22,264✔
1743
                        continue;
22,264✔
1744
                    }
22,264✔
1745
                    logger.error("Protocol version negotiation failed: Client sent malformed "
2,147,483,647✔
1746
                                 "specification of supported protocol versions: '%1'",
2,147,483,647✔
1747
                                 elem); // Throws
2,147,483,647✔
1748
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
2,147,483,647✔
1749
                                           "specification of supported protocol "
2,147,483,647✔
1750
                                           "versions\n"); // Throws
2,147,483,647✔
1751
                    return;
2,147,483,647✔
1752
                }
22,262✔
UNCOV
1753
                logger.warn("Unrecognized protocol token in HTTP response header "
×
UNCOV
1754
                            "Sec-WebSocket-Protocol: '%1'",
×
UNCOV
1755
                            elem); // Throws
×
UNCOV
1756
            }
×
1757
            if (protocol_version_ranges.empty()) {
2,026✔
1758
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1759
                             "specification of supported protocol versions"); // Throws
×
1760
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1761
                                       "of supported protocol versions\n"); // Throws
×
1762
                return;
×
1763
            }
×
1764
        }
2,026✔
1765
        {
2,026✔
1766
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
2,026✔
1767
            int server_min = server_range.first;
2,026✔
1768
            int server_max = server_range.second;
2,026✔
1769
            int best_match = 0;
2,026✔
1770
            int overall_client_min = std::numeric_limits<int>::max();
2,026✔
1771
            int overall_client_max = std::numeric_limits<int>::min();
2,026✔
1772
            for (const auto& range : protocol_version_ranges) {
22,264✔
1773
                int client_min = range.first;
22,264✔
1774
                int client_max = range.second;
22,264✔
1775
                if (client_max >= server_min && client_min <= server_max) {
22,264✔
1776
                    // Overlap
1777
                    int version = std::min(client_max, server_max);
22,264✔
1778
                    if (version > best_match) {
22,264✔
1779
                        best_match = version;
2,024✔
1780
                    }
2,024✔
1781
                }
22,264✔
1782
                if (client_min < overall_client_min)
22,264✔
1783
                    overall_client_min = client_min;
22,264✔
1784
                if (client_max > overall_client_max)
22,264✔
1785
                    overall_client_max = client_max;
2,024✔
1786
            }
22,264✔
1787
            Formatter& formatter = misc_buffers.formatter;
2,026✔
1788
            if (REALM_UNLIKELY(best_match == 0)) {
2,026✔
1789
                const char* elaboration = "No version supported by both client and server";
×
1790
                const char* identifier_hint = nullptr;
×
1791
                if (overall_client_max < server_min) {
×
1792
                    // Client is too old
1793
                    elaboration = "Client is too old for server";
×
1794
                    identifier_hint = "CLIENT_TOO_OLD";
×
1795
                }
×
1796
                else if (overall_client_min > server_max) {
×
1797
                    // Client is too new
1798
                    elaboration = "Client is too new for server";
×
1799
                    identifier_hint = "CLIENT_TOO_NEW";
×
1800
                }
×
1801
                auto format_ranges = [&](const auto& list) {
×
1802
                    bool nonfirst = false;
×
1803
                    for (auto range : list) {
×
1804
                        if (nonfirst)
×
1805
                            formatter << ", "; // Throws
×
1806
                        int min = range.first, max = range.second;
×
1807
                        REALM_ASSERT(min <= max);
×
1808
                        formatter << min;
×
1809
                        if (max != min)
×
1810
                            formatter << "-" << max;
×
1811
                        nonfirst = true;
×
1812
                    }
×
1813
                };
×
1814
                using Range = ProtocolVersionRange;
×
1815
                formatter.reset();
×
1816
                format_ranges(protocol_version_ranges); // Throws
×
1817
                logger.error("Protocol version negotiation failed: %1 "
×
1818
                             "(client supports: %2)",
×
1819
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1820
                formatter.reset();
×
1821
                formatter << "Protocol version negotiation failed: "
×
1822
                             ""
×
1823
                          << elaboration << ".\n\n";                                   // Throws
×
1824
                formatter << "Server supports: ";                                      // Throws
×
1825
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1826
                formatter << "\n";                                                     // Throws
×
1827
                formatter << "Client supports: ";                                      // Throws
×
1828
                format_ranges(protocol_version_ranges);                                // Throws
×
1829
                formatter << "\n\n";                                                   // Throws
×
1830
                formatter << "REALM_SYNC_PROTOCOL_MISMATCH";                           // Throws
×
1831
                if (identifier_hint)
×
1832
                    formatter << ":" << identifier_hint;                      // Throws
×
1833
                formatter << "\n";                                            // Throws
×
1834
                handle_400_bad_request({formatter.data(), formatter.size()}); // Throws
×
1835
                return;
×
1836
            }
×
1837
            m_negotiated_protocol_version = best_match;
2,026✔
1838
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
2,026✔
1839
                         m_negotiated_protocol_version); // Throws
2,026✔
1840
            formatter.reset();
2,026✔
1841
        }
2,026✔
1842

1843
        std::string sec_websocket_protocol_2;
×
1844
        {
2,026✔
1845
            std::string_view prefix =
2,026✔
1846
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
2,026✔
1847
                    ? get_old_pbs_websocket_protocol_prefix()
2,026✔
1848
                    : get_pbs_websocket_protocol_prefix();
2,026✔
1849
            std::ostringstream out;
2,026✔
1850
            out.imbue(std::locale::classic());
2,026✔
1851
            out << prefix << m_negotiated_protocol_version; // Throws
2,026✔
1852
            sec_websocket_protocol_2 = std::move(out).str();
2,026✔
1853
        }
2,026✔
1854

1855
        std::error_code ec;
2,026✔
1856
        util::Optional<HTTPResponse> response =
2,026✔
1857
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
2,026✔
1858

1859
        if (ec) {
2,026✔
1860
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1861
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1862
            }
×
1863
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1864
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1865
            }
×
1866
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1867
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1868
            }
×
1869
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1870
                logger.error("The header Sec-WebSocket-Key is missing");
×
1871
            }
×
1872

1873
            logger.error("The HTTP request with the error is:\n%1", request);
×
1874
            logger.error("Check the proxy configuration and make sure that the "
×
1875
                         "HTTP request is a valid Websocket request.");
×
1876
            close_due_to_error(ec);
×
1877
            return;
×
1878
        }
×
1879
        REALM_ASSERT(response);
2,026✔
1880
        add_common_http_response_headers(*response);
2,026✔
1881

1882
        std::string user_agent;
2,026✔
1883
        {
2,026✔
1884
            auto i = request.headers.find(g_user_agent);
2,026✔
1885
            if (i != request.headers.end())
2,026✔
1886
                user_agent = i->second; // Throws (copy)
2,024✔
1887
        }
2,026✔
1888

1889
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
2,026✔
1890
                        this](std::error_code ec) {
2,026✔
1891
            // If the operation is aborted, the socket object may have been destroyed.
1892
            if (ec != util::error::operation_aborted) {
2,024✔
1893
                if (ec) {
2,024✔
1894
                    write_error(ec);
×
1895
                    return;
×
1896
                }
×
1897

1898
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
2,024✔
1899
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
2,024✔
1900
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
2,024✔
1901
                    get_appservices_request_id()); // Throws
2,024✔
1902
                SyncConnection& sync_conn_ref = *sync_conn;
2,024✔
1903
                m_server.add_sync_connection(m_id, std::move(sync_conn));
2,024✔
1904
                m_server.remove_http_connection(m_id);
2,024✔
1905
                sync_conn_ref.initiate();
2,024✔
1906
            }
2,024✔
1907
        };
2,024✔
1908
        m_http_server.async_send_response(*response, std::move(handler));
2,026✔
1909
    }
2,026✔
1910

1911
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1912
    {
20✔
1913
        std::string body_2 = std::string(body); // Throws
20✔
1914

1915
        HTTPResponse response;
20✔
1916
        response.status = http_status;
20✔
1917
        add_common_http_response_headers(response);
20✔
1918
        response.headers["Connection"] = "close";
20✔
1919

1920
        if (!body_2.empty()) {
20✔
1921
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1922
            response.body = std::move(body_2);
20✔
1923
        }
20✔
1924

1925
        auto handler = [this](std::error_code ec) {
20✔
1926
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1927
                return;
×
1928
            if (REALM_UNLIKELY(ec)) {
20✔
1929
                write_error(ec);
×
1930
                return;
×
1931
            }
×
1932
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1933
        };
20✔
1934
        m_http_server.async_send_response(response, std::move(handler));
20✔
1935
    }
20✔
1936

1937
    void handle_400_bad_request(std::string_view body)
1938
    {
8✔
1939
        logger.detail("400 Bad Request");
8✔
1940
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1941
    }
8✔
1942

1943
    void handle_404_not_found(const HTTPRequest&)
1944
    {
12✔
1945
        logger.detail("404 Not Found"); // Throws
12✔
1946
        handle_text_response(HTTPStatus::NotFound,
12✔
1947
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1948
    }
12✔
1949

1950
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1951
    {
×
1952
        logger.debug("503 Service Unavailable");                       // Throws
×
1953
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1954
    }
×
1955

1956
    void add_common_http_response_headers(HTTPResponse& response)
1957
    {
2,044✔
1958
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
2,044✔
1959
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
2,044✔
1960
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
1961
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1962
        }
20✔
1963
    }
2,044✔
1964

1965
    void read_error(std::error_code ec)
1966
    {
16✔
1967
        REALM_ASSERT(ec != util::error::operation_aborted);
16✔
1968
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
16!
1969
            // Suicide
1970
            close_due_to_close_by_client(ec); // Throws
16✔
1971
            return;
16✔
1972
        }
16✔
1973
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1974
            logger.error("Input message head delimited not found"); // Throws
×
1975
            close_due_to_error(ec);                                 // Throws
×
1976
            return;
×
1977
        }
×
1978

1979
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1980

1981
        // Suicide
1982
        close_due_to_error(ec); // Throws
×
1983
    }
×
1984

1985
    void write_error(std::error_code ec)
1986
    {
×
1987
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1988
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1989
            // Suicide
1990
            close_due_to_close_by_client(ec); // Throws
×
1991
            return;
×
1992
        }
×
1993
        logger.error("Writing failed: %1", ec.message()); // Throws
×
1994

1995
        // Suicide
1996
        close_due_to_error(ec); // Throws
×
1997
    }
×
1998

1999
    void close_due_to_close_by_client(std::error_code ec)
2000
    {
16✔
2001
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
16✔
2002
        // Suicide
2003
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
16✔
2004
    }
16✔
2005

2006
    void close_due_to_error(std::error_code ec)
2007
    {
10✔
2008
        // Suicide
2009
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
10✔
2010
                  ec.message()); // Throws
10✔
2011
    }
10✔
2012

2013
    static std::string make_logger_prefix(int_fast64_t id)
2014
    {
3,262✔
2015
        std::ostringstream out;
3,262✔
2016
        out.imbue(std::locale::classic());
3,262✔
2017
        out << "HTTP Connection[" << id << "]: "; // Throws
3,262✔
2018
        return out.str();                         // Throws
3,262✔
2019
    }
3,262✔
2020
};
2021

2022

2023
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2024
public:
2025
    std::size_t num_changesets = 0;
2026
    std::size_t accum_original_size = 0;
2027
    std::size_t accum_compacted_size = 0;
2028

2029
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2030
        : m_protocol{protocol}
19,288✔
2031
        , m_buffer{buffer}
19,288✔
2032
        , m_logger{logger}
19,288✔
2033
    {
42,486✔
2034
    }
42,486✔
2035

2036
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2037
    {
43,454✔
2038
        version_type client_version = entry.remote_version;
43,454✔
2039
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
43,454✔
2040
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
43,454✔
2041
        ++num_changesets;
43,454✔
2042
        accum_original_size += original_size;
43,454✔
2043
        accum_compacted_size += entry.changeset.size();
43,454✔
2044
    }
43,454✔
2045

2046
private:
2047
    ServerProtocol& m_protocol;
2048
    OutputBuffer& m_buffer;
2049
    util::Logger& m_logger;
2050
};
2051

2052

2053
// ============================ Session ============================
2054

2055
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2056
//   Protocol             ent file    IDENT    message   message   Error     message
2057
//   state                identifier  message  received  received  occurred  sent
2058
// ---------------------------------------------------------------------------------
2059
//   AllocatingIdent      yes         yes      no        no        no        no
2060
//   SendIdent            no          yes      no        no        no        no
2061
//   WaitForIdent         no          no       no        no        no        no
2062
//   WaitForUnbind        maybe       no       yes       no        no        no
2063
//   SendError            maybe       maybe    maybe     no        yes       no
2064
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2065
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2066
//
2067
//
2068
//   Condition                      Expression
2069
// ----------------------------------------------------------
2070
//   Need client file identifier    need_client_file_ident()
2071
//   Send IDENT message             must_send_ident_message()
2072
//   IDENT message received         ident_message_received()
2073
//   UNBIND message received        unbind_message_received()
2074
//   Error occurred                 error_occurred()
2075
//   ERROR message sent             m_error_message_sent
2076
//
2077
//
2078
//   Protocol
2079
//   state                Will send              Can receive
2080
// -----------------------------------------------------------------------
2081
//   AllocatingIdent      none                   UNBIND
2082
//   SendIdent            IDENT                  UNBIND
2083
//   WaitForIdent         none                   IDENT, UNBIND
2084
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2085
//                        MARK, ALLOC            ALLOC, UNBIND
2086
//   SendError            ERROR                  any
2087
//   WaitForUnbindErr     none                   any
2088
//   SendUnbound          UNBOUND                none
2089
//
2090
class Session final : private FileIdentReceiver {
2091
public:
2092
    util::PrefixLogger logger;
2093

2094
    Session(SyncConnection& conn, session_ident_type session_ident)
2095
        : logger{util::LogCategory::server, make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2,862✔
2096
        , m_connection{conn}
2,862✔
2097
        , m_session_ident{session_ident}
2,862✔
2098
    {
5,638✔
2099
    }
5,638✔
2100

2101
    ~Session() noexcept
2102
    {
5,640✔
2103
        REALM_ASSERT(!is_enlisted_to_send());
5,640✔
2104
        detach_from_server_file();
5,640✔
2105
    }
5,640✔
2106

2107
    SyncConnection& get_connection() noexcept
2108
    {
42,504✔
2109
        return m_connection;
42,504✔
2110
    }
42,504✔
2111

2112
    const Optional<std::array<char, 64>>& get_encryption_key()
2113
    {
×
2114
        return m_connection.get_server().get_config().encryption_key;
×
2115
    }
×
2116

2117
    session_ident_type get_session_ident() const noexcept
2118
    {
160✔
2119
        return m_session_ident;
160✔
2120
    }
160✔
2121

2122
    ServerProtocol& get_server_protocol() noexcept
2123
    {
58,828✔
2124
        return m_connection.get_server_protocol();
58,828✔
2125
    }
58,828✔
2126

2127
    bool need_client_file_ident() const noexcept
2128
    {
7,056✔
2129
        return (m_file_ident_request != 0);
7,056✔
2130
    }
7,056✔
2131

2132
    bool must_send_ident_message() const noexcept
2133
    {
4,340✔
2134
        return m_send_ident_message;
4,340✔
2135
    }
4,340✔
2136

2137
    bool ident_message_received() const noexcept
2138
    {
350,348✔
2139
        return m_client_file_ident != 0;
350,348✔
2140
    }
350,348✔
2141

2142
    bool unbind_message_received() const noexcept
2143
    {
353,348✔
2144
        return m_unbind_message_received;
353,348✔
2145
    }
353,348✔
2146

2147
    bool error_occurred() const noexcept
2148
    {
345,038✔
2149
        return int(m_error_code) != 0;
345,038✔
2150
    }
345,038✔
2151

2152
    bool relayed_alloc_request_in_progress() const noexcept
2153
    {
×
2154
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2155
    }
×
2156

2157
    // Returns the file identifier (always a nonzero value) of the client side
2158
    // file if ident_message_received() returns true. Otherwise it returns zero.
2159
    file_ident_type get_client_file_ident() const noexcept
2160
    {
×
2161
        return m_client_file_ident;
×
2162
    }
×
2163

2164
    void initiate()
2165
    {
5,636✔
2166
        logger.detail("Session initiated", m_session_ident); // Throws
5,636✔
2167
    }
5,636✔
2168

2169
    void terminate()
2170
    {
4,658✔
2171
        logger.detail("Session terminated", m_session_ident); // Throws
4,658✔
2172
    }
4,658✔
2173

2174
    // Initiate the deactivation process, if it has not been initiated already
2175
    // by the client.
2176
    //
2177
    // IMPORTANT: This function must not be called with protocol versions
2178
    // earlier than 23.
2179
    //
2180
    // The deactivation process will eventually lead to termination of the
2181
    // session.
2182
    //
2183
    // The session will detach itself from the server file when the deactivation
2184
    // process is initiated, regardless of whether it is initiated by the
2185
    // client, or by calling this function.
2186
    void initiate_deactivation(ProtocolError error_code)
2187
    {
80✔
2188
        REALM_ASSERT(is_session_level_error(error_code));
80✔
2189
        REALM_ASSERT(!error_occurred()); // Must only be called once
80✔
2190

2191
        // If the UNBIND message has been received, then the client has
2192
        // initiated the deactivation process already.
2193
        if (REALM_LIKELY(!unbind_message_received())) {
80✔
2194
            detach_from_server_file();
80✔
2195
            m_error_code = error_code;
80✔
2196
            // Protocol state is now SendError
2197
            ensure_enlisted_to_send();
80✔
2198
            return;
80✔
2199
        }
80✔
2200
        // Protocol state was SendUnbound, and remains unchanged
2201
    }
80✔
2202

2203
    bool is_enlisted_to_send() const noexcept
2204
    {
280,768✔
2205
        return m_next != nullptr;
280,768✔
2206
    }
280,768✔
2207

2208
    void ensure_enlisted_to_send() noexcept
2209
    {
54,660✔
2210
        if (!is_enlisted_to_send())
54,660✔
2211
            enlist_to_send();
53,508✔
2212
    }
54,660✔
2213

2214
    void enlist_to_send() noexcept
2215
    {
112,442✔
2216
        m_connection.enlist_to_send(this);
112,442✔
2217
    }
112,442✔
2218

2219
    // Overriding memeber function in FileIdentReceiver
2220
    void receive_file_ident(SaltedFileIdent file_ident) override final
2221
    {
1,358✔
2222
        // Protocol state must be AllocatingIdent or WaitForUnbind
2223
        if (!ident_message_received()) {
1,358✔
2224
            REALM_ASSERT(need_client_file_ident());
1,358✔
2225
            REALM_ASSERT(m_send_ident_message);
1,358✔
2226
        }
1,358✔
2227
        else {
×
2228
            REALM_ASSERT(!m_send_ident_message);
×
2229
        }
×
2230
        REALM_ASSERT(!unbind_message_received());
1,358✔
2231
        REALM_ASSERT(!error_occurred());
1,358✔
2232
        REALM_ASSERT(!m_error_message_sent);
1,358✔
2233

2234
        m_file_ident_request = 0;
1,358✔
2235
        m_allocated_file_ident = file_ident;
1,358✔
2236

2237
        // If the protocol state was AllocatingIdent, it is now SendIdent,
2238
        // otherwise it continues to be WaitForUnbind.
2239

2240
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,358✔
2241
                     file_ident.salt); // Throws
1,358✔
2242

2243
        ensure_enlisted_to_send();
1,358✔
2244
    }
1,358✔
2245

2246
    // Called by the associated connection object when this session is granted
2247
    // an opportunity to initiate the sending of a message.
2248
    //
2249
    // This function may lead to the destruction of the session object
2250
    // (suicide).
2251
    void send_message()
2252
    {
112,232✔
2253
        if (REALM_LIKELY(!unbind_message_received())) {
112,232✔
2254
            if (REALM_LIKELY(!error_occurred())) {
109,478✔
2255
                if (REALM_LIKELY(ident_message_received())) {
109,398✔
2256
                    // State is WaitForUnbind.
2257
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
108,036✔
2258
                    if (REALM_LIKELY(!relayed_alloc)) {
108,036✔
2259
                        // Send DOWNLOAD or MARK.
2260
                        continue_history_scan(); // Throws
108,036✔
2261
                        // Session object may have been
2262
                        // destroyed at this point (suicide)
2263
                        return;
108,036✔
2264
                    }
108,036✔
2265
                    send_alloc_message(); // Throws
×
2266
                    return;
×
2267
                }
108,036✔
2268
                // State is SendIdent
2269
                send_ident_message(); // Throws
1,362✔
2270
                return;
1,362✔
2271
            }
109,398✔
2272
            // State is SendError
2273
            send_error_message(); // Throws
80✔
2274
            return;
80✔
2275
        }
109,478✔
2276
        // State is SendUnbound
2277
        send_unbound_message(); // Throws
2,754✔
2278
        terminate();            // Throws
2,754✔
2279
        m_connection.discard_session(m_session_ident);
2,754✔
2280
        // This session is now destroyed!
2281
    }
2,754✔
2282

2283
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2284
                              bool is_subserver, ProtocolError& error)
2285
    {
5,634✔
2286
        if (logger.would_log(util::Logger::Level::info)) {
5,634✔
2287
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
346✔
2288
                          "need_client_file_ident=%3, is_subserver=%4)",
346✔
2289
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
346✔
2290
                          int(is_subserver)); // Throws
346✔
2291
        }
346✔
2292

2293
        ServerImpl& server = m_connection.get_server();
5,634✔
2294
        _impl::VirtualPathComponents virt_path_components =
5,634✔
2295
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
5,634✔
2296

2297
        if (!virt_path_components.is_valid) {
5,634✔
2298
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2299
                         "signed_user_token='%2')",
28✔
2300
                         path,
28✔
2301
                         short_token_fmt(signed_user_token)); // Throws
28✔
2302
            error = ProtocolError::illegal_realm_path;
28✔
2303
            return false;
28✔
2304
        }
28✔
2305

2306
        // The user has proper permissions at this stage.
2307

2308
        m_server_file = server.get_or_create_file(path); // Throws
5,606✔
2309

2310
        m_server_file->add_unidentified_session(this); // Throws
5,606✔
2311

2312
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
5,606✔
2313
                    m_connection.get_client_protocol_version(),
5,606✔
2314
                    m_connection.get_client_user_agent()); // Throws
5,606✔
2315

2316
        m_is_subserver = is_subserver;
5,606✔
2317
        if (REALM_LIKELY(!need_client_file_ident)) {
5,606✔
2318
            // Protocol state is now WaitForUnbind
2319
            return true;
3,124✔
2320
        }
3,124✔
2321

2322
        // FIXME: We must make a choice about client file ident for read only
2323
        // sessions. They should have a special read-only client file ident.
2324
        file_ident_type proxy_file = 0; // No proxy
2,482✔
2325
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
2,482✔
2326
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
2,482✔
2327
        m_send_ident_message = true;
2,482✔
2328
        // Protocol state is now AllocatingIdent
2329

2330
        return true;
2,482✔
2331
    }
5,606✔
2332

2333
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2334
                               version_type scan_server_version, version_type scan_client_version,
2335
                               version_type latest_server_version, salt_type latest_server_version_salt,
2336
                               ProtocolError& error)
2337
    {
4,340✔
2338
        // Protocol state must be WaitForIdent
2339
        REALM_ASSERT(!need_client_file_ident());
4,340✔
2340
        REALM_ASSERT(!m_send_ident_message);
4,340✔
2341
        REALM_ASSERT(!ident_message_received());
4,340✔
2342
        REALM_ASSERT(!unbind_message_received());
4,340✔
2343
        REALM_ASSERT(!error_occurred());
4,340✔
2344
        REALM_ASSERT(!m_error_message_sent);
4,340✔
2345

2346
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
4,340✔
2347
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
4,340✔
2348
                     "latest_server_version_salt=%6)",
4,340✔
2349
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
4,340✔
2350
                     latest_server_version, latest_server_version_salt); // Throws
4,340✔
2351

2352
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
4,340✔
2353
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
4,340✔
2354
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
4,340✔
2355
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
4,340✔
2356
        UploadCursor upload_threshold = {0, 0};
4,340✔
2357
        version_type locked_server_version = 0;
4,340✔
2358
        BootstrapError error_2 =
4,340✔
2359
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
4,340✔
2360
                                                    client_type, upload_threshold, locked_server_version,
4,340✔
2361
                                                    logger); // Throws
4,340✔
2362
        switch (error_2) {
4,340✔
2363
            case BootstrapError::no_error:
4,304✔
2364
                break;
4,304✔
2365
            case BootstrapError::client_file_expired:
✔
2366
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2367
                error = ProtocolError::client_file_expired;
×
2368
                return false;
×
2369
            case BootstrapError::bad_client_file_ident:
✔
2370
                logger.error("Bad client file ident (%1) in IDENT message",
×
2371
                             client_file_ident); // Throws
×
2372
                error = ProtocolError::bad_client_file_ident;
×
2373
                return false;
×
2374
            case BootstrapError::bad_client_file_ident_salt:
4✔
2375
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2376
                             client_file_ident_salt); // Throws
4✔
2377
                error = ProtocolError::diverging_histories;
4✔
2378
                return false;
4✔
2379
            case BootstrapError::bad_download_server_version:
✔
2380
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2381
                error = ProtocolError::bad_server_version;
×
2382
                return false;
×
2383
            case BootstrapError::bad_download_client_version:
4✔
2384
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2385
                error = ProtocolError::bad_client_version;
4✔
2386
                return false;
4✔
2387
            case BootstrapError::bad_server_version:
20✔
2388
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2389
                error = ProtocolError::bad_server_version;
20✔
2390
                return false;
20✔
2391
            case BootstrapError::bad_server_version_salt:
4✔
2392
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2393
                error = ProtocolError::diverging_histories;
4✔
2394
                return false;
4✔
2395
            case BootstrapError::bad_client_type:
✔
2396
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2397
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2398
                                                              // `bad_client_type`.
2399
                return false;
×
2400
        }
4,340✔
2401

2402
        // Make sure there is no other session currently associcated with the
2403
        // same client-side file
2404
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
4,304✔
2405
            SyncConnection& other_conn = other_sess->get_connection();
×
2406
            // It is a protocol violation if the other session is associated
2407
            // with the same connection
2408
            if (&other_conn == &m_connection) {
×
2409
                logger.error("Client file already bound in other session associated with "
×
2410
                             "the same connection"); // Throws
×
2411
                error = ProtocolError::bound_in_other_session;
×
2412
                return false;
×
2413
            }
×
2414
            // When the other session is associated with a different connection
2415
            // (`other_conn`), the clash may be due to the server not yet having
2416
            // realized that the other connection has been closed by the
2417
            // client. If so, the other connention is a "zombie". In the
2418
            // interest of getting rid of zombie connections as fast as
2419
            // possible, we shall assume that a clash with a session in another
2420
            // connection is always due to that other connection being a
2421
            // zombie. And when such a situation is detected, we want to close
2422
            // the zombie connection immediately.
2423
            auto log_level = Logger::Level::detail;
×
2424
            other_conn.terminate(log_level,
×
2425
                                 "Sync connection closed (superseded session)"); // Throws
×
2426
        }
×
2427

2428
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
4,304✔
2429

2430
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
4,304✔
2431
                                                                  m_session_ident, client_file_ident));
4,304✔
2432

2433
        m_server_file->identify_session(this, client_file_ident); // Throws
4,304✔
2434

2435
        m_client_file_ident = client_file_ident;
4,304✔
2436
        m_download_progress = download_progress;
4,304✔
2437
        m_upload_threshold = upload_threshold;
4,304✔
2438
        m_locked_server_version = locked_server_version;
4,304✔
2439

2440
        ServerImpl& server = m_connection.get_server();
4,304✔
2441
        const Server::Config& config = server.get_config();
4,304✔
2442
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
4,304✔
2443

2444
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
4,304✔
2445
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2446
                                              client_file_ident); // Throws
×
2447
        }
×
2448

2449
        // Protocol  state is now WaitForUnbind
2450
        enlist_to_send();
4,304✔
2451
        return true;
4,304✔
2452
    }
4,304✔
2453

2454
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2455
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2456
                                ProtocolError& error)
2457
    {
45,808✔
2458
        // Protocol state must be WaitForUnbind
2459
        REALM_ASSERT(!m_send_ident_message);
45,808✔
2460
        REALM_ASSERT(ident_message_received());
45,808✔
2461
        REALM_ASSERT(!unbind_message_received());
45,808✔
2462
        REALM_ASSERT(!error_occurred());
45,808✔
2463
        REALM_ASSERT(!m_error_message_sent);
45,808✔
2464

2465
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
45,808✔
2466
                      "locked_server_version=%3, num_changesets=%4)",
45,808✔
2467
                      progress_client_version, progress_server_version, locked_server_version,
45,808✔
2468
                      upload_changesets.size()); // Throws
45,808✔
2469

2470
        // We are unable to reproduce the cursor object for the upload progress
2471
        // when the protocol version is less than 29, because the client does
2472
        // not provide the required information. When the protocol version is
2473
        // less than 25, we can always get a consistent cursor by taking it from
2474
        // the changeset that was uploaded last, but in protocol versions 25,
2475
        // 26, 27, and 28, things are more complicated. Here, we receive new
2476
        // values for `last_integrated_server_version` which we cannot afford to
2477
        // ignore, but we do not know what client versions they correspond
2478
        // to. Fortunately, we can produce a cursor that works, and is mutually
2479
        // consistent with previous cursors, by simply bumping
2480
        // `upload_progress.client_version` when
2481
        // `upload_progress.last_intgerated_server_version` grows.
2482
        //
2483
        // To see that this scheme works, consider the last changeset, A, that
2484
        // will have already been uploaded and integrated at the beginning of
2485
        // the next session, and the first changeset, B, that follows A in the
2486
        // client side history, and is not upload skippable (of local origin and
2487
        // nonempty). We then need to show that A will be skipped, if uploaded
2488
        // in the next session, but B will not.
2489
        //
2490
        // Let V be the client version produced by A, and let T be the value of
2491
        // `upload_progress.client_version` as determined in this session, which
2492
        // is used as threshold in the next session. Then we know that A is
2493
        // skipped during the next session if V is less than, or equal to T. If
2494
        // the protocol version is at least 29, the protocol requires that T is
2495
        // greater than, or equal to V. If the protocol version is less than 25,
2496
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
2497
        // or 28, we construct T such that it is always greater than, or equal
2498
        // to V, so in all cases, A will be skipped during the next session.
2499
        //
2500
        // Let W be the client version on which B is based. We then know that B
2501
        // will be retained if, and only if W is greater than, or equalto T. If
2502
        // the protocol version is at least 29, we know that T is less than, or
2503
        // equal to W, since B is not integrated until the next session. If the
2504
        // protocol version is less tahn 25, we know that T is V. Since V must
2505
        // be less than, or equal to W, we again know that T is less than, or
2506
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
2507
        // construct T such that it is equal to V + N, where N is the number of
2508
        // observed increments in `last_integrated_server_version` since the
2509
        // client version prodiced by A. For each of these observed increments,
2510
        // there must have been a distinct new client version, but all these
2511
        // client versions must be less than, or equal to W, since B is not
2512
        // integrated until the next session. Therefore, we know that T = V + N
2513
        // is less than, or qual to W. So, in all cases, B will not skipped
2514
        // during the next session.
2515
        int protocol_version = m_connection.get_client_protocol_version();
45,808✔
2516
        static_cast<void>(protocol_version); // No protocol diversion (yet)
45,808✔
2517

2518
        UploadCursor upload_progress;
45,808✔
2519
        upload_progress = {progress_client_version, progress_server_version};
45,808✔
2520

2521
        // `upload_progress.client_version` must be nondecreasing across the
2522
        // session.
2523
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
45,808✔
2524
        if (REALM_UNLIKELY(!good_1)) {
45,808✔
2525
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2526
                         m_upload_progress.client_version); // Throws
×
2527
            error = ProtocolError::bad_client_version;
×
2528
            return false;
×
2529
        }
×
2530
        // `upload_progress.last_integrated_server_version` must be a version
2531
        // that the client can have heard about.
2532
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
45,808✔
2533
        if (REALM_UNLIKELY(!good_2)) {
45,808✔
2534
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2535
                         upload_progress.last_integrated_server_version,
×
2536
                         m_download_progress.server_version); // Throws
×
2537
            error = ProtocolError::bad_server_version;
×
2538
            return false;
×
2539
        }
×
2540

2541
        // `upload_progress` must be consistent.
2542
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
45,808✔
2543
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2544
                         upload_progress.last_integrated_server_version); // Throws
×
2545
            error = ProtocolError::bad_server_version;
×
2546
            return false;
×
2547
        }
×
2548
        // `upload_progress` and `m_upload_threshold` must be mutually
2549
        // consistent.
2550
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
45,808✔
2551
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2552
                         "threshold (%3, %4)",
×
2553
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2554
                         m_upload_threshold.client_version,
×
2555
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2556
            error = ProtocolError::bad_server_version;
×
2557
            return false;
×
2558
        }
×
2559
        // `upload_progress` and `m_upload_progress` must be mutually
2560
        // consistent.
2561
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
45,808✔
2562
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2563
                         "upload progress (%3, %4)",
×
2564
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2565
                         m_upload_progress.client_version,
×
2566
                         m_upload_progress.last_integrated_server_version); // Throws
×
2567
            error = ProtocolError::bad_server_version;
×
2568
            return false;
×
2569
        }
×
2570

2571
        version_type locked_server_version_2 = locked_server_version;
45,808✔
2572

2573
        // `locked_server_version_2` must be nondecreasing over the lifetime of
2574
        // the client-side file.
2575
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
45,808✔
2576
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2577
                         m_locked_server_version); // Throws
×
2578
            error = ProtocolError::bad_server_version;
×
2579
            return false;
×
2580
        }
×
2581
        // `locked_server_version_2` must be a version that the client can have
2582
        // heard about.
2583
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
45,808✔
2584
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2585
                         m_download_progress.server_version); // Throws
×
2586
            error = ProtocolError::bad_server_version;
×
2587
            return false;
×
2588
        }
×
2589

2590
        std::size_t num_previously_integrated_changesets = 0;
45,808✔
2591
        if (!upload_changesets.empty()) {
45,808✔
2592
            UploadCursor up = m_upload_progress;
24,590✔
2593
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
36,910✔
2594
                // `uc.upload_cursor.client_version` must be increasing across
2595
                // all the changesets in this UPLOAD message, and all must be
2596
                // greater than upload_progress.client_version of previous
2597
                // UPLOAD message.
2598
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
36,910✔
2599
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2600
                                 "changeset (%1 <= %2)",
×
2601
                                 uc.upload_cursor.client_version,
×
2602
                                 up.client_version); // Throws
×
2603
                    error = ProtocolError::bad_client_version;
×
2604
                    return false;
×
2605
                }
×
2606
                // `uc.upload_progress` must be consistent.
2607
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
36,910✔
2608
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2609
                                 uc.upload_cursor.client_version,
×
2610
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2611
                    error = ProtocolError::bad_server_version;
×
2612
                    return false;
×
2613
                }
×
2614
                // `uc.upload_progress` must be mutually consistent with
2615
                // previous upload cursor.
2616
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
36,910✔
2617
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2618
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2619
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2620
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2621
                    error = ProtocolError::bad_server_version;
×
2622
                    return false;
×
2623
                }
×
2624
                // `uc.upload_progress` must be mutually consistent with
2625
                // threshold, that is, for changesets that have not previously
2626
                // been integrated, it is important that the specified value of
2627
                // `last_integrated_server_version` is greater than, or equal to
2628
                // the reciprocal history base version.
2629
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
36,910✔
2630
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
36,910✔
2631
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2632
                                 "inconsistent with threshold (%3, %4)",
×
2633
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2634
                                 m_upload_threshold.client_version,
×
2635
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2636
                    error = ProtocolError::bad_server_version;
×
2637
                    return false;
×
2638
                }
×
2639
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
36,910✔
2640
                if (previously_integrated)
36,910✔
2641
                    ++num_previously_integrated_changesets;
2,442✔
2642
                up = uc.upload_cursor;
36,910✔
2643
            }
36,910✔
2644
            // `upload_progress.client_version` must be greater than, or equal
2645
            // to client versions produced by each of the changesets in this
2646
            // UPLOAD message.
2647
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
24,590✔
2648
                logger.error("Upload progress less than client version produced by uploaded "
×
2649
                             "changeset (%1 > %2)",
×
2650
                             up.client_version,
×
2651
                             upload_progress.client_version); // Throws
×
2652
                error = ProtocolError::bad_client_version;
×
2653
                return false;
×
2654
            }
×
2655
            // The upload cursor of last uploaded changeset must be mutually
2656
            // consistent with the reported upload progress.
2657
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
24,590✔
2658
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2659
                             "inconsistent with upload progress (%3, %4)",
×
2660
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2661
                             upload_progress.last_integrated_server_version); // Throws
×
2662
                error = ProtocolError::bad_server_version;
×
2663
                return false;
×
2664
            }
×
2665
        }
24,590✔
2666

2667
        // FIXME: Part of a very poor man's substitute for a proper backpressure
2668
        // scheme.
2669
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
45,808✔
2670
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2671
            // Using this exact error code, because it causes `try_again` flag
2672
            // to be set to true, which causes the client to wait for about 5
2673
            // minuites before trying to connect again.
2674
            error = ProtocolError::connection_closed;
×
2675
            return false;
×
2676
        }
×
2677

2678
        m_upload_progress = upload_progress;
45,808✔
2679

2680
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
45,808✔
2681
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
45,808✔
2682

2683
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
45,808✔
2684
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
45,808✔
2685

2686
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
45,808✔
2687
        if (!have_anything_to_do)
45,808✔
2688
            return true;
332✔
2689

2690
        if (!have_real_upload_progress)
45,476✔
2691
            upload_progress = m_upload_threshold;
×
2692

2693
        if (num_previously_integrated_changesets > 0) {
45,476✔
2694
            logger.detail("Ignoring %1 previously integrated changesets",
718✔
2695
                          num_previously_integrated_changesets); // Throws
718✔
2696
        }
718✔
2697
        if (num_changesets_to_integrate > 0) {
45,476✔
2698
            logger.detail("Initiate integration of %1 remote changesets",
24,160✔
2699
                          num_changesets_to_integrate); // Throws
24,160✔
2700
        }
24,160✔
2701

2702
        REALM_ASSERT(m_server_file);
45,476✔
2703
        ServerFile& file = *m_server_file;
45,476✔
2704
        std::size_t offset = num_previously_integrated_changesets;
45,476✔
2705
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
45,476✔
2706
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
45,476✔
2707

2708
        m_locked_server_version = locked_server_version_2;
45,476✔
2709
        return true;
45,476✔
2710
    }
45,808✔
2711

2712
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2713
    {
12,152✔
2714
        // Protocol state must be WaitForUnbind
2715
        REALM_ASSERT(!m_send_ident_message);
12,152✔
2716
        REALM_ASSERT(ident_message_received());
12,152✔
2717
        REALM_ASSERT(!unbind_message_received());
12,152✔
2718
        REALM_ASSERT(!error_occurred());
12,152✔
2719
        REALM_ASSERT(!m_error_message_sent);
12,152✔
2720

2721
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
12,152✔
2722

2723
        m_download_completion_request = request_ident;
12,152✔
2724

2725
        ensure_enlisted_to_send();
12,152✔
2726
        return true;
12,152✔
2727
    }
12,152✔
2728

2729
    // Returns true if the deactivation process has been completed, at which
2730
    // point the caller (SyncConnection::receive_unbind_message()) should
2731
    // terminate the session.
2732
    //
2733
    // CAUTION: This function may commit suicide!
2734
    void receive_unbind_message()
2735
    {
2,796✔
2736
        // Protocol state may be anything but SendUnbound
2737
        REALM_ASSERT(!m_unbind_message_received);
2,796✔
2738

2739
        logger.detail("Received: UNBIND"); // Throws
2,796✔
2740

2741
        detach_from_server_file();
2,796✔
2742
        m_unbind_message_received = true;
2,796✔
2743

2744
        // Detect completion of the deactivation process
2745
        if (m_error_message_sent) {
2,796✔
2746
            // Deactivation process completed
2747
            terminate(); // Throws
26✔
2748
            m_connection.discard_session(m_session_ident);
26✔
2749
            // This session is now destroyed!
2750
            return;
26✔
2751
        }
26✔
2752

2753
        // Protocol state is now SendUnbound
2754
        ensure_enlisted_to_send();
2,770✔
2755
    }
2,770✔
2756

2757
    void receive_error_message(session_ident_type, int, std::string_view)
2758
    {
×
2759
        REALM_ASSERT(!m_unbind_message_received);
×
2760

2761
        logger.detail("Received: ERROR"); // Throws
×
2762
    }
×
2763

2764
private:
2765
    SyncConnection& m_connection;
2766

2767
    const session_ident_type m_session_ident;
2768

2769
    // Not null if, and only if this session is in
2770
    // m_connection.m_sessions_enlisted_to_send.
2771
    Session* m_next = nullptr;
2772

2773
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2774
    // reset to null when the deactivation process is initiated, either when the
2775
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2776
    util::bind_ptr<ServerFile> m_server_file;
2777

2778
    bool m_disable_download = false;
2779
    bool m_is_subserver = false;
2780

2781
    using file_ident_request_type = ServerFile::file_ident_request_type;
2782

2783
    // When nonzero, this session has an outstanding request for a client file
2784
    // identifier.
2785
    file_ident_request_type m_file_ident_request = 0;
2786

2787
    // Payload for next outgoing ALLOC message.
2788
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2789

2790
    // Zero until the session receives an IDENT message from the client.
2791
    file_ident_type m_client_file_ident = 0;
2792

2793
    // Zero until initiate_deactivation() is called.
2794
    ProtocolError m_error_code = {};
2795

2796
    // The current point of progression of the download process. Set to (<server
2797
    // version>, <client version>) of the IDENT message when the IDENT message
2798
    // is received. At the time of return from continue_history_scan(), it
2799
    // points to the latest server version such that all preceding changesets in
2800
    // the server-side history have been downloaded, are currently being
2801
    // downloaded, or are *download excluded*.
2802
    DownloadCursor m_download_progress = {0, 0};
2803

2804
    request_ident_type m_download_completion_request = 0;
2805

2806
    // Records the progress of the upload process. Used to check that the client
2807
    // uploads changesets in order. Also, when m_upload_progress >
2808
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2809
    // version of the upload progress.
2810
    UploadCursor m_upload_progress = {0, 0};
2811

2812
    // Initialized on reception of the IDENT message. Specifies the actual
2813
    // upload progress (as recorded on the server-side) at the beginning of the
2814
    // session, and it remains fixed throughout the session.
2815
    //
2816
    // m_upload_threshold includes the progress resulting from the received
2817
    // changesets that have not yet been integrated (only relevant for
2818
    // synchronous backup).
2819
    UploadCursor m_upload_threshold = {0, 0};
2820

2821
    // Works partially as a cache of the persisted value, and partially as a way
2822
    // of checking that the client respects that it can never decrease.
2823
    version_type m_locked_server_version = 0;
2824

2825
    bool m_send_ident_message = false;
2826
    bool m_unbind_message_received = false;
2827
    bool m_error_message_sent = false;
2828

2829
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2830
    /// has been sent in the current session. The variable is used to ensure
2831
    /// that a DOWNLOAD message is always sent in a session. The received
2832
    /// DOWNLOAD message is needed by the client to ensure that its current
2833
    /// download progress is up to date.
2834
    bool m_one_download_message_sent = false;
2835

2836
    static std::string make_logger_prefix(session_ident_type session_ident)
2837
    {
5,640✔
2838
        std::ostringstream out;
5,640✔
2839
        out.imbue(std::locale::classic());
5,640✔
2840
        out << "Session[" << session_ident << "]: "; // Throws
5,640✔
2841
        return out.str();                            // Throws
5,640✔
2842
    }
5,640✔
2843

2844
    // Scan the history for changesets to be downloaded.
2845
    // If the history is longer than the end point of the previous scan,
2846
    // a DOWNLOAD message will be sent.
2847
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2848
    // requested to be notified about download completion.
2849
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2850
    //
2851
    // This function may lead to the destruction of the session object
2852
    // (suicide).
2853
    void continue_history_scan()
2854
    {
108,034✔
2855
        // Protocol state must be WaitForUnbind
2856
        REALM_ASSERT(!m_send_ident_message);
108,034✔
2857
        REALM_ASSERT(ident_message_received());
108,034✔
2858
        REALM_ASSERT(!unbind_message_received());
108,034✔
2859
        REALM_ASSERT(!error_occurred());
108,034✔
2860
        REALM_ASSERT(!m_error_message_sent);
108,034✔
2861
        REALM_ASSERT(!is_enlisted_to_send());
108,034✔
2862

2863
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
108,034✔
2864
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
108,034✔
2865

2866
        ServerImpl& server = m_connection.get_server();
108,034✔
2867
        const Server::Config& config = server.get_config();
108,034✔
2868
        if (REALM_UNLIKELY(m_disable_download))
108,034✔
2869
            return;
×
2870

2871
        bool have_more_to_scan =
108,034✔
2872
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
108,034✔
2873
        if (have_more_to_scan) {
108,034✔
2874
            m_server_file->register_client_access(m_client_file_ident);     // Throws
42,484✔
2875
            const ServerHistory& history = m_server_file->access().history; // Throws
42,484✔
2876
            const char* body;
42,484✔
2877
            std::size_t uncompressed_body_size;
42,484✔
2878
            std::size_t compressed_body_size = 0;
42,484✔
2879
            bool body_is_compressed = false;
42,484✔
2880
            version_type end_version = last_server_version.version;
42,484✔
2881
            DownloadCursor download_progress;
42,484✔
2882
            UploadCursor upload_progress = {0, 0};
42,484✔
2883
            std::uint_fast64_t downloadable_bytes = 0;
42,484✔
2884
            std::size_t num_changesets;
42,484✔
2885
            std::size_t accum_original_size;
42,484✔
2886
            std::size_t accum_compacted_size;
42,484✔
2887
            ServerProtocol& protocol = get_server_protocol();
42,484✔
2888
            bool disable_download_compaction = config.disable_download_compaction;
42,484✔
2889
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
42,484!
2890
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
42,484!
2891
            DownloadCache& cache = m_server_file->get_download_cache();
42,484✔
2892
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
42,484!
2893
            if (fetch_from_cache) {
42,484✔
2894
                body = cache.body.get();
×
2895
                uncompressed_body_size = cache.uncompressed_body_size;
×
2896
                compressed_body_size = cache.compressed_body_size;
×
2897
                body_is_compressed = cache.body_is_compressed;
×
2898
                download_progress = cache.download_progress;
×
2899
                downloadable_bytes = cache.downloadable_bytes;
×
2900
                num_changesets = cache.num_changesets;
×
2901
                accum_original_size = cache.accum_original_size;
×
2902
                accum_compacted_size = cache.accum_compacted_size;
×
2903
            }
×
2904
            else {
42,484✔
2905
                // Discard the old cached DOWNLOAD body before generating a new
2906
                // one to be cached. This can make a big difference because the
2907
                // size of that body can be very large (10GiB has been seen in a
2908
                // real-world case).
2909
                if (enable_cache)
42,484✔
2910
                    cache.body = {};
×
2911

2912
                OutputBuffer& out = server.get_misc_buffers().download_message;
42,484✔
2913
                out.reset();
42,484✔
2914
                download_progress = m_download_progress;
42,484✔
2915
                auto fetch_and_compress = [&](std::size_t max_download_size) {
42,486✔
2916
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
42,486✔
2917
                    std::uint_fast64_t cumulative_byte_size_current;
42,486✔
2918
                    std::uint_fast64_t cumulative_byte_size_total;
42,486✔
2919
                    bool not_expired = history.fetch_download_info(
42,486✔
2920
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
42,486✔
2921
                        cumulative_byte_size_current, cumulative_byte_size_total, disable_download_compaction,
42,486✔
2922
                        max_download_size); // Throws
42,486✔
2923
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
42,486✔
2924
                    SyncConnection& conn = get_connection();
42,486✔
2925
                    if (REALM_UNLIKELY(!not_expired)) {
42,486✔
2926
                        logger.debug("History scanning failed: Client file entry "
×
2927
                                     "expired during session"); // Throws
×
2928
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2929
                        // Session object may have been destroyed at this point
2930
                        // (suicide).
2931
                        return false;
×
2932
                    }
×
2933

2934
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
42,486✔
2935
                    uncompressed_body_size = out.size();
42,486✔
2936
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
42,486✔
2937
                    body = uncompressed.data();
42,486✔
2938
                    std::size_t max_uncompressed = 1024;
42,486✔
2939
                    if (uncompressed.size() > max_uncompressed) {
42,486✔
2940
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,194✔
2941
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,194✔
2942
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,194✔
2943
                        if (buffer.size() < uncompressed.size()) {
4,194✔
2944
                            body = buffer.data();
4,194✔
2945
                            compressed_body_size = buffer.size();
4,194✔
2946
                            body_is_compressed = true;
4,194✔
2947
                        }
4,194✔
2948
                    }
4,194✔
2949
                    num_changesets = handler.num_changesets;
42,486✔
2950
                    accum_original_size = handler.accum_original_size;
42,486✔
2951
                    accum_compacted_size = handler.accum_compacted_size;
42,486✔
2952
                    return true;
42,486✔
2953
                };
42,486✔
2954
                if (enable_cache) {
42,484✔
2955
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2956
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2957
                        // Session object may have been destroyed at this point
2958
                        // (suicide).
2959
                        return;
×
2960
                    }
×
2961
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2962
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2963
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2964
                    std::copy(body, body + body_size, cache.body.get());
×
2965
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2966
                    cache.compressed_body_size = compressed_body_size;
×
2967
                    cache.body_is_compressed = body_is_compressed;
×
2968
                    cache.end_version = end_version;
×
2969
                    cache.download_progress = download_progress;
×
2970
                    cache.downloadable_bytes = downloadable_bytes;
×
2971
                    cache.num_changesets = num_changesets;
×
2972
                    cache.accum_original_size = accum_original_size;
×
2973
                    cache.accum_compacted_size = accum_compacted_size;
×
2974
                }
×
2975
                else {
42,484✔
2976
                    std::size_t max_download_size = config.max_download_size;
42,484✔
2977
                    if (!fetch_and_compress(max_download_size)) { // Throws
42,484✔
2978
                        // Session object may have been destroyed at this point
2979
                        // (suicide).
2980
                        return;
×
2981
                    }
×
2982
                }
42,484✔
2983
            }
42,484✔
2984

2985
            OutputBuffer& out = m_connection.get_output_buffer();
42,484✔
2986
            protocol.make_download_message(
42,484✔
2987
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
42,484✔
2988
                download_progress.last_integrated_client_version, last_server_version.version,
42,484✔
2989
                last_server_version.salt, upload_progress.client_version,
42,484✔
2990
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
42,484✔
2991
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
42,484✔
2992

2993
            if (!disable_download_compaction) {
42,486✔
2994
                std::size_t saved = accum_original_size - accum_compacted_size;
42,486✔
2995
                double saved_2 = (accum_original_size == 0 ? 0 : std::round(saved * 100.0 / accum_original_size));
42,486✔
2996
                logger.detail("Download compaction: Saved %1 bytes (%2%%)", saved, saved_2); // Throws
42,486✔
2997
            }
42,486✔
2998

2999
            m_download_progress = download_progress;
42,484✔
3000
            logger.debug("Setting of m_download_progress.server_version = %1",
42,484✔
3001
                         m_download_progress.server_version); // Throws
42,484✔
3002
            send_download_message();
42,484✔
3003
            m_one_download_message_sent = true;
42,484✔
3004

3005
            enlist_to_send();
42,484✔
3006
        }
42,484✔
3007
        else if (m_download_completion_request) {
65,550✔
3008
            // Send a MARK message
3009
            request_ident_type request_ident = m_download_completion_request;
12,142✔
3010
            send_mark_message(request_ident);  // Throws
12,142✔
3011
            m_download_completion_request = 0; // Request handled
12,142✔
3012
            enlist_to_send();
12,142✔
3013
        }
12,142✔
3014
    }
108,034✔
3015

3016
    void send_ident_message()
3017
    {
1,358✔
3018
        // Protocol state must be SendIdent
3019
        REALM_ASSERT(!need_client_file_ident());
1,358✔
3020
        REALM_ASSERT(m_send_ident_message);
1,358✔
3021
        REALM_ASSERT(!ident_message_received());
1,358✔
3022
        REALM_ASSERT(!unbind_message_received());
1,358✔
3023
        REALM_ASSERT(!error_occurred());
1,358✔
3024
        REALM_ASSERT(!m_error_message_sent);
1,358✔
3025

3026
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,358✔
3027

3028
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,358✔
3029
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,358✔
3030

3031
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,358✔
3032
                     client_file_ident_salt); // Throws
1,358✔
3033

3034
        ServerProtocol& protocol = get_server_protocol();
1,358✔
3035
        OutputBuffer& out = m_connection.get_output_buffer();
1,358✔
3036
        int protocol_version = m_connection.get_client_protocol_version();
1,358✔
3037
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,358✔
3038
                                    client_file_ident_salt); // Throws
1,358✔
3039
        m_connection.initiate_write_output_buffer();         // Throws
1,358✔
3040

3041
        m_allocated_file_ident.ident = 0; // Consumed
1,358✔
3042
        m_send_ident_message = false;
1,358✔
3043
        // Protocol state is now WaitForStateRequest or WaitForIdent
3044
    }
1,358✔
3045

3046
    void send_download_message()
3047
    {
42,486✔
3048
        m_connection.initiate_write_output_buffer(); // Throws
42,486✔
3049
    }
42,486✔
3050

3051
    void send_mark_message(request_ident_type request_ident)
3052
    {
12,142✔
3053
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
12,142✔
3054

3055
        ServerProtocol& protocol = get_server_protocol();
12,142✔
3056
        OutputBuffer& out = m_connection.get_output_buffer();
12,142✔
3057
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
12,142✔
3058
        m_connection.initiate_write_output_buffer();                     // Throws
12,142✔
3059
    }
12,142✔
3060

3061
    void send_alloc_message()
3062
    {
×
3063
        // Protocol state must be WaitForUnbind
3064
        REALM_ASSERT(!m_send_ident_message);
×
3065
        REALM_ASSERT(ident_message_received());
×
3066
        REALM_ASSERT(!unbind_message_received());
×
3067
        REALM_ASSERT(!error_occurred());
×
3068
        REALM_ASSERT(!m_error_message_sent);
×
3069

3070
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3071

3072
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3073
        REALM_ASSERT(false);
×
3074

3075
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3076

3077
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3078

3079
        ServerProtocol& protocol = get_server_protocol();
×
3080
        OutputBuffer& out = m_connection.get_output_buffer();
×
3081
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3082
        m_connection.initiate_write_output_buffer();                   // Throws
×
3083

3084
        m_allocated_file_ident.ident = 0; // Consumed
×
3085

3086
        // Other messages may be waiting to be sent.
3087
        enlist_to_send();
×
3088
    }
×
3089

3090
    void send_unbound_message()
3091
    {
2,758✔
3092
        // Protocol state must be SendUnbound
3093
        REALM_ASSERT(unbind_message_received());
2,758✔
3094
        REALM_ASSERT(!m_error_message_sent);
2,758✔
3095

3096
        logger.debug("Sending: UNBOUND"); // Throws
2,758✔
3097

3098
        ServerProtocol& protocol = get_server_protocol();
2,758✔
3099
        OutputBuffer& out = m_connection.get_output_buffer();
2,758✔
3100
        protocol.make_unbound_message(out, m_session_ident); // Throws
2,758✔
3101
        m_connection.initiate_write_output_buffer();         // Throws
2,758✔
3102
    }
2,758✔
3103

3104
    void send_error_message()
3105
    {
80✔
3106
        // Protocol state must be SendError
3107
        REALM_ASSERT(!unbind_message_received());
80✔
3108
        REALM_ASSERT(error_occurred());
80✔
3109
        REALM_ASSERT(!m_error_message_sent);
80✔
3110

3111
        REALM_ASSERT(is_session_level_error(m_error_code));
80✔
3112

3113
        ProtocolError error_code = m_error_code;
80✔
3114
        const char* message = get_protocol_error_message(int(error_code));
80✔
3115
        std::size_t message_size = std::strlen(message);
80✔
3116
        bool try_again = determine_try_again(error_code);
80✔
3117

3118
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
80✔
3119
                      try_again); // Throws
80✔
3120

3121
        ServerProtocol& protocol = get_server_protocol();
80✔
3122
        OutputBuffer& out = m_connection.get_output_buffer();
80✔
3123
        int protocol_version = m_connection.get_client_protocol_version();
80✔
3124
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
80✔
3125
                                    m_session_ident); // Throws
80✔
3126
        m_connection.initiate_write_output_buffer();  // Throws
80✔
3127

3128
        m_error_message_sent = true;
80✔
3129
        // Protocol state is now WaitForUnbindErr
3130
    }
80✔
3131

3132
    void send_log_message(util::Logger::Level level, const std::string&& message)
3133
    {
4,306✔
3134
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
4,306✔
3135
            return logger.log(level, message.c_str());
×
3136
        }
×
3137

3138
        m_connection.send_log_message(level, std::move(message), m_session_ident);
4,306✔
3139
    }
4,306✔
3140

3141
    // Idempotent
3142
    void detach_from_server_file() noexcept
3143
    {
8,512✔
3144
        if (!m_server_file)
8,512✔
3145
            return;
2,904✔
3146
        ServerFile& file = *m_server_file;
5,608✔
3147
        if (ident_message_received()) {
5,608✔
3148
            file.remove_identified_session(m_client_file_ident);
4,308✔
3149
        }
4,308✔
3150
        else {
1,300✔
3151
            file.remove_unidentified_session(this);
1,300✔
3152
        }
1,300✔
3153
        if (m_file_ident_request != 0)
5,608✔
3154
            file.cancel_file_ident_request(m_file_ident_request);
1,130✔
3155
        m_server_file.reset();
5,608✔
3156
    }
5,608✔
3157

3158
    friend class SessionQueue;
3159
};
3160

3161

3162
// ============================ SessionQueue implementation ============================
3163

3164
void SessionQueue::push_back(Session* sess) noexcept
3165
{
112,448✔
3166
    REALM_ASSERT(!sess->m_next);
112,448✔
3167
    if (m_back) {
112,448✔
3168
        sess->m_next = m_back->m_next;
41,140✔
3169
        m_back->m_next = sess;
41,140✔
3170
    }
41,140✔
3171
    else {
71,308✔
3172
        sess->m_next = sess;
71,308✔
3173
    }
71,308✔
3174
    m_back = sess;
112,448✔
3175
}
112,448✔
3176

3177

3178
Session* SessionQueue::pop_front() noexcept
3179
{
162,258✔
3180
    Session* sess = nullptr;
162,258✔
3181
    if (m_back) {
162,258✔
3182
        sess = m_back->m_next;
112,234✔
3183
        if (sess != m_back) {
112,234✔
3184
            m_back->m_next = sess->m_next;
41,058✔
3185
        }
41,058✔
3186
        else {
71,176✔
3187
            m_back = nullptr;
71,176✔
3188
        }
71,176✔
3189
        sess->m_next = nullptr;
112,234✔
3190
    }
112,234✔
3191
    return sess;
162,258✔
3192
}
162,258✔
3193

3194

3195
void SessionQueue::clear() noexcept
3196
{
3,240✔
3197
    if (m_back) {
3,240✔
3198
        Session* sess = m_back;
128✔
3199
        for (;;) {
208✔
3200
            Session* next = sess->m_next;
208✔
3201
            sess->m_next = nullptr;
208✔
3202
            if (next == m_back)
208✔
3203
                break;
128✔
3204
            sess = next;
80✔
3205
        }
80✔
3206
        m_back = nullptr;
128✔
3207
    }
128✔
3208
}
3,240✔
3209

3210

3211
// ============================ ServerFile implementation ============================
3212

3213
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3214
                       std::string real_path, bool disable_sync_to_disk)
3215
    : logger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.logger_ptr}               // Throws
436✔
3216
    , wlogger{util::LogCategory::server, "ServerFile[" + virt_path + "]: ", server.get_worker().logger_ptr} // Throws
436✔
3217
    , m_server{server}
436✔
3218
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
436✔
3219
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
436✔
3220
{
1,120✔
3221
}
1,120✔
3222

3223

3224
ServerFile::~ServerFile() noexcept
3225
{
1,120✔
3226
    REALM_ASSERT(m_unidentified_sessions.empty());
1,120✔
3227
    REALM_ASSERT(m_identified_sessions.empty());
1,120✔
3228
    REALM_ASSERT(m_file_ident_request == 0);
1,120✔
3229
}
1,120✔
3230

3231

3232
void ServerFile::initialize()
3233
{
1,120✔
3234
    const ServerHistory& history = access().history; // Throws
1,120✔
3235
    file_ident_type partial_file_ident = 0;
1,120✔
3236
    version_type partial_progress_reference_version = 0;
1,120✔
3237
    bool has_upstream_sync_status;
1,120✔
3238
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,120✔
3239
                       partial_progress_reference_version); // Throws
1,120✔
3240
    REALM_ASSERT(!has_upstream_sync_status);
1,120✔
3241
    REALM_ASSERT(partial_file_ident == 0);
1,120✔
3242
}
1,120✔
3243

3244

3245
void ServerFile::activate() {}
1,120✔
3246

3247

3248
// This function must be called only after a completed invocation of
3249
// initialize(). Both functinos must only ever be called by the network event
3250
// loop thread.
3251
void ServerFile::register_client_access(file_ident_type) {}
92,264✔
3252

3253

3254
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file, ClientType client_type)
3255
    -> file_ident_request_type
3256
{
2,488✔
3257
    auto request = ++m_last_file_ident_request;
2,488✔
3258
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
2,488✔
3259

3260
    on_work_added(); // Throws
2,488✔
3261
    return request;
2,488✔
3262
}
2,488✔
3263

3264

3265
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3266
{
1,130✔
3267
    auto i = m_file_ident_requests.find(request);
1,130✔
3268
    REALM_ASSERT(i != m_file_ident_requests.end());
1,130✔
3269
    FileIdentRequestInfo& info = i->second;
1,130✔
3270
    REALM_ASSERT(info.receiver);
1,130✔
3271
    info.receiver = nullptr;
1,130✔
3272
}
1,130✔
3273

3274

3275
void ServerFile::add_unidentified_session(Session* sess)
3276
{
5,608✔
3277
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
5,608✔
3278
    m_unidentified_sessions.insert(sess); // Throws
5,608✔
3279
}
5,608✔
3280

3281

3282
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3283
{
4,308✔
3284
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
4,308✔
3285
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
4,308✔
3286

3287
    m_identified_sessions[client_file_ident] = sess; // Throws
4,308✔
3288
    m_unidentified_sessions.erase(sess);
4,308✔
3289
}
4,308✔
3290

3291

3292
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3293
{
1,304✔
3294
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
1,304✔
3295
    m_unidentified_sessions.erase(sess);
1,304✔
3296
}
1,304✔
3297

3298

3299
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3300
{
4,308✔
3301
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
4,308✔
3302
    m_identified_sessions.erase(client_file_ident);
4,308✔
3303
}
4,308✔
3304

3305

3306
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3307
{
4,304✔
3308
    auto i = m_identified_sessions.find(client_file_ident);
4,304✔
3309
    if (i == m_identified_sessions.end())
4,304✔
3310
        return nullptr;
4,308✔
3311
    return i->second;
2,147,483,647✔
3312
}
4,304✔
3313

3314
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3315
{
45,810✔
3316
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
45,810✔
3317
}
45,810✔
3318

3319

3320
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3321
                                                version_type locked_server_version, const UploadChangeset* changesets,
3322
                                                std::size_t num_changesets)
3323
{
45,476✔
3324
    register_client_access(client_file_ident); // Throws
45,476✔
3325

3326
    bool dirty = false;
45,476✔
3327

3328
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
45,476✔
3329
    std::size_t num_bytes = 0;
45,476✔
3330
    for (std::size_t i = 0; i < num_changesets; ++i) {
79,948✔
3331
        const UploadChangeset& uc = changesets[i];
34,472✔
3332
        auto& changesets = list.changesets;
34,472✔
3333
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,472✔
3334
                                uc.changeset); // Throws
34,472✔
3335
        num_bytes += uc.changeset.size();
34,472✔
3336
        dirty = true;
34,472✔
3337
    }
34,472✔
3338

3339
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
45,476✔
3340
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
45,476✔
3341
    if (upload_progress.client_version > list.upload_progress.client_version) {
45,476✔
3342
        list.upload_progress = upload_progress;
45,468✔
3343
        dirty = true;
45,468✔
3344
    }
45,468✔
3345

3346
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
45,476✔
3347
    if (locked_server_version > list.locked_server_version) {
45,476✔
3348
        list.locked_server_version = locked_server_version;
39,506✔
3349
        dirty = true;
39,506✔
3350
    }
39,506✔
3351

3352
    if (REALM_LIKELY(dirty)) {
45,476✔
3353
        if (num_changesets > 0) {
45,470✔
3354
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
24,160✔
3355
        }
24,160✔
3356
        else {
21,310✔
3357
            on_work_added(); // Throws
21,310✔
3358
        }
21,310✔
3359
    }
45,470✔
3360
}
45,476✔
3361

3362

3363
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3364
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3365
                                                    ClientType client_type, UploadCursor& upload_progress,
3366
                                                    version_type& locked_server_version, Logger& logger)
3367
{
4,340✔
3368
    // The Realm file may contain a later snapshot than the one reflected by
3369
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
3370
    if (server_version.version > m_version_info.sync_version.version)
4,340✔
3371
        return BootstrapError::bad_server_version;
20✔
3372

3373
    const ServerHistory& hist = access().history; // Throws
4,320✔
3374
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
4,320✔
3375
                                                         client_type, upload_progress, locked_server_version,
4,320✔
3376
                                                         logger); // Throws
4,320✔
3377

3378
    // FIXME: Rather than taking previously buffered changesets from the same
3379
    // client file into account when determining the upload progress, and then
3380
    // allowing for an error during the integration of those changesets to be
3381
    // reported to, and terminate the new session, consider to instead postpone
3382
    // the bootstrapping of the new session until all previously buffered
3383
    // changesets from same client file have been fully processed.
3384

3385
    if (error == BootstrapError::no_error) {
4,320✔
3386
        register_client_access(client_file_ident.ident); // Throws
4,308✔
3387

3388
        // If upload, or releaseing of server versions progressed further during
3389
        // previous sessions than the persisted points, take that into account
3390
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
4,308✔
3391
        if (i != m_work.changesets_from_downstream.end()) {
4,308✔
3392
            const IntegratableChangesetList& list = i->second;
1,436✔
3393
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,436✔
3394
            upload_progress = list.upload_progress;
1,436✔
3395
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,436✔
3396
            locked_server_version = list.locked_server_version;
1,436✔
3397
        }
1,436✔
3398
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
4,308✔
3399
        if (j != m_changesets_from_downstream.end()) {
4,308✔
3400
            const IntegratableChangesetList& list = j->second;
114✔
3401
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
114✔
3402
            upload_progress = list.upload_progress;
114✔
3403
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
114✔
3404
            locked_server_version = list.locked_server_version;
114✔
3405
        }
114✔
3406
    }
4,308✔
3407

3408
    return error;
4,320✔
3409
}
4,340✔
3410

3411
// NOTE: This function is executed by the worker thread
3412
void ServerFile::worker_process_work_unit(WorkerState& state)
3413
{
38,722✔
3414
    SteadyTimePoint start_time = steady_clock_now();
38,722✔
3415
    milliseconds_type parallel_time = 0;
38,722✔
3416

3417
    Work& work = m_work;
38,722✔
3418
    wlogger.debug("Work unit execution started"); // Throws
38,722✔
3419

3420
    if (work.has_primary_work) {
38,722✔
3421
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
38,722✔
3422
            worker_allocate_file_identifiers(); // Throws
1,412✔
3423

3424
        if (!m_work.changesets_from_downstream.empty())
38,722✔
3425
            worker_integrate_changes_from_downstream(state); // Throws
37,316✔
3426
    }
38,722✔
3427

3428
    wlogger.debug("Work unit execution completed"); // Throws
38,722✔
3429

3430
    milliseconds_type time = steady_duration(start_time);
38,722✔
3431
    milliseconds_type seq_time = time - parallel_time;
38,722✔
3432
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
38,722✔
3433
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
38,722✔
3434

3435
    // Pass control back to the network event loop thread
3436
    network::Service& service = m_server.get_service();
38,722✔
3437
    service.post([this](Status) {
38,722✔
3438
        // FIXME: The safety of capturing `this` here, relies on the fact
3439
        // that ServerFile objects currently are not destroyed until the
3440
        // server object is destroyed.
3441
        group_postprocess_stage_1(); // Throws
38,336✔
3442
        // Suicide may have happened at this point
3443
    }); // Throws
38,336✔
3444
}
38,722✔
3445

3446

3447
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3448
{
24,160✔
3449
    m_num_changesets_from_downstream += num_changesets;
24,160✔
3450

3451
    if (num_bytes > 0) {
24,160✔
3452
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
24,160✔
3453
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
24,160✔
3454
    }
24,160✔
3455

3456
    on_work_added(); // Throws
24,160✔
3457
}
24,160✔
3458

3459

3460
void ServerFile::on_work_added()
3461
{
47,962✔
3462
    if (m_has_blocked_work)
47,962✔
3463
        return;
9,092✔
3464
    m_has_blocked_work = true;
38,870✔
3465
    // Reference file
3466
    if (m_has_work_in_progress)
38,870✔
3467
        return;
13,226✔
3468
    group_unblock_work(); // Throws
25,644✔
3469
}
25,644✔
3470

3471

3472
void ServerFile::group_unblock_work()
3473
{
38,792✔
3474
    REALM_ASSERT(!m_has_work_in_progress);
38,792✔
3475
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
38,792✔
3476
        unblock_work(); // Throws
38,792✔
3477
        const Work& work = m_work;
38,792✔
3478
        if (REALM_LIKELY(work.has_primary_work)) {
38,792✔
3479
            logger.trace("Work unit unblocked"); // Throws
38,750✔
3480
            m_has_work_in_progress = true;
38,750✔
3481
            Worker& worker = m_server.get_worker();
38,750✔
3482
            worker.enqueue(this); // Throws
38,750✔
3483
        }
38,750✔
3484
    }
38,792✔
3485
}
38,792✔
3486

3487

3488
void ServerFile::unblock_work()
3489
{
38,792✔
3490
    REALM_ASSERT(m_has_blocked_work);
38,792✔
3491

3492
    m_work.reset();
38,792✔
3493

3494
    // Discard requests for file identifiers whose receiver is no longer
3495
    // waiting.
3496
    {
38,792✔
3497
        auto i = m_file_ident_requests.begin();
38,792✔
3498
        auto end = m_file_ident_requests.end();
38,792✔
3499
        while (i != end) {
41,280✔
3500
            auto j = i++;
2,488✔
3501
            const FileIdentRequestInfo& info = j->second;
2,488✔
3502
            if (!info.receiver)
2,488✔
3503
                m_file_ident_requests.erase(j);
1,002✔
3504
        }
2,488✔
3505
    }
38,792✔
3506
    std::size_t n = m_file_ident_requests.size();
38,792✔
3507
    if (n > 0) {
38,792✔
3508
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,428✔
3509
        std::size_t i = 0;
1,428✔
3510
        for (const auto& pair : m_file_ident_requests) {
1,486✔
3511
            const FileIdentRequestInfo& info = pair.second;
1,486✔
3512
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,486✔
3513
            slot.proxy_file = info.proxy_file;
1,486✔
3514
            slot.client_type = info.client_type;
1,486✔
3515
            ++i;
1,486✔
3516
        }
1,486✔
3517
        m_work.has_primary_work = true;
1,428✔
3518
    }
1,428✔
3519

3520
    // FIXME: `ServerFile::m_changesets_from_downstream` and
3521
    // `Work::changesets_from_downstream` should be renamed to something else,
3522
    // as it may contain kinds of data other than changesets.
3523

3524
    using std::swap;
38,792✔
3525
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
38,792✔
3526
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
38,792✔
3527
    bool has_changesets = !m_work.changesets_from_downstream.empty();
38,792✔
3528
    if (has_changesets) {
38,792✔
3529
        m_work.has_primary_work = true;
37,332✔
3530
    }
37,332✔
3531

3532
    // Keep track of the size of pending changesets
3533
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
38,792✔
3534
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
38,792✔
3535
    m_blocked_changesets_from_downstream_byte_size = 0;
38,792✔
3536

3537
    m_num_changesets_from_downstream = 0;
38,792✔
3538
    m_has_blocked_work = false;
38,792✔
3539
}
38,792✔
3540

3541

3542
void ServerFile::resume_download() noexcept
3543
{
23,220✔
3544
    for (const auto& entry : m_identified_sessions) {
38,304✔
3545
        Session& sess = *entry.second;
38,304✔
3546
        sess.ensure_enlisted_to_send();
38,304✔
3547
    }
38,304✔
3548
}
23,220✔
3549

3550

3551
void ServerFile::recognize_external_change()
3552
{
4,800✔
3553
    VersionInfo prev_version_info = m_version_info;
4,800✔
3554
    const ServerHistory& history = access().history;       // Throws
4,800✔
3555
    bool has_upstream_status;                              // Dummy
4,800✔
3556
    sync::file_ident_type partial_file_ident;              // Dummy
4,800✔
3557
    sync::version_type partial_progress_reference_version; // Dummy
4,800✔
3558
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,800✔
3559
                       partial_progress_reference_version); // Throws
4,800✔
3560

3561
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,800✔
3562
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,800✔
3563
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,800✔
3564
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,800✔
3565
        resume_download();
4,800✔
3566
    }
4,800✔
3567
}
4,800✔
3568

3569

3570
// NOTE: This function is executed by the worker thread
3571
void ServerFile::worker_allocate_file_identifiers()
3572
{
1,412✔
3573
    Work& work = m_work;
1,412✔
3574
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,412✔
3575
    ServerHistory& hist = worker_access().history;                                      // Throws
1,412✔
3576
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,412✔
3577
    m_work.produced_new_realm_version = true;
1,412✔
3578
}
1,412✔
3579

3580

3581
// Returns true when, and only when this function produces a new sync version
3582
// (adds a new entry to the sync history).
3583
//
3584
// NOTE: This function is executed by the worker thread
3585
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3586
{
37,314✔
3587
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
37,314✔
3588

3589
    std::unique_ptr<ServerHistory> hist_ptr;
37,314✔
3590
    DBRef sg_ptr;
37,314✔
3591
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
37,314✔
3592
    bool backup_whole_realm = false;
37,314✔
3593
    bool produced_new_realm_version = hist.integrate_client_changesets(
37,314✔
3594
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
37,314✔
3595
        wlogger); // Throws
37,314✔
3596
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
37,314✔
3597
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
37,314✔
3598
    if (produced_new_realm_version) {
37,314✔
3599
        m_work.produced_new_realm_version = true;
37,298✔
3600
        if (produced_new_sync_version) {
37,298✔
3601
            m_work.produced_new_sync_version = true;
18,432✔
3602
        }
18,432✔
3603
    }
37,298✔
3604
    return produced_new_sync_version;
37,314✔
3605
}
37,314✔
3606

3607
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3608
                                                   DBRef& sg_ptr)
3609
{
37,316✔
3610
    if (state.use_file_cache)
37,316✔
3611
        return worker_access().history; // Throws
37,320✔
3612
    const std::string& path = m_worker_file.realm_path;
4,294,967,294✔
3613
    hist_ptr = m_server.make_history_for_path();                   // Throws
4,294,967,294✔
3614
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
4,294,967,294✔
3615
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
4,294,967,294✔
3616
    sg_ptr->claim_sync_agent();                                    // Throws
4,294,967,294✔
3617
    return *hist_ptr;                                              // Throws
4,294,967,294✔
3618
}
37,316✔
3619

3620

3621
// When worker thread finishes work unit.
3622
void ServerFile::group_postprocess_stage_1()
3623
{
38,336✔
3624
    REALM_ASSERT(m_has_work_in_progress);
38,336✔
3625

3626
    group_finalize_work_stage_1(); // Throws
38,336✔
3627
    group_finalize_work_stage_2(); // Throws
38,336✔
3628
    group_postprocess_stage_2();   // Throws
38,336✔
3629
}
38,336✔
3630

3631

3632
void ServerFile::group_postprocess_stage_2()
3633
{
38,334✔
3634
    REALM_ASSERT(m_has_work_in_progress);
38,334✔
3635
    group_postprocess_stage_3(); // Throws
38,334✔
3636
    // Suicide may have happened at this point
3637
}
38,334✔
3638

3639

3640
// When all files, including the reference file, have been backed up.
3641
void ServerFile::group_postprocess_stage_3()
3642
{
38,334✔
3643
    REALM_ASSERT(m_has_work_in_progress);
38,334✔
3644
    m_has_work_in_progress = false;
38,334✔
3645

3646
    logger.trace("Work unit postprocessing complete"); // Throws
38,334✔
3647
    if (m_has_blocked_work)
38,334✔
3648
        group_unblock_work(); // Throws
13,148✔
3649
}
38,334✔
3650

3651

3652
void ServerFile::finalize_work_stage_1()
3653
{
38,336✔
3654
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
38,336✔
3655
        // Report the byte size of completed downstream changesets.
3656
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
18,440✔
3657
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
18,440✔
3658
        m_unblocked_changesets_from_downstream_byte_size = 0;
18,440✔
3659
    }
18,440✔
3660

3661
    // Deal with errors (bad changesets) pertaining to downstream clients
3662
    std::size_t num_changesets_removed = 0;
38,336✔
3663
    std::size_t num_bytes_removed = 0;
38,336✔
3664
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
38,336✔
3665
        file_ident_type client_file_ident = entry.first;
20✔
3666
        ExtendedIntegrationError error = entry.second;
20✔
3667
        ProtocolError error_2 = ProtocolError::other_session_error;
20✔
3668
        switch (error) {
20✔
3669
            case ExtendedIntegrationError::client_file_expired:
✔
3670
                logger.debug("Changeset integration failed: Client file entry "
×
3671
                             "expired during session"); // Throws
×
3672
                error_2 = ProtocolError::client_file_expired;
×
3673
                break;
×
3674
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3675
                error_2 = ProtocolError::bad_origin_file_ident;
×
3676
                break;
×
3677
            case ExtendedIntegrationError::bad_changeset:
20✔
3678
                error_2 = ProtocolError::bad_changeset;
20✔
3679
                break;
20✔
3680
        }
20✔
3681
        auto i = m_identified_sessions.find(client_file_ident);
20✔
3682
        if (i != m_identified_sessions.end()) {
20✔
3683
            Session& sess = *i->second;
20✔
3684
            SyncConnection& conn = sess.get_connection();
20✔
3685
            conn.protocol_error(error_2, &sess); // Throws
20✔
3686
        }
20✔
3687
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
20✔
3688
        std::size_t num_changesets = list.changesets.size();
20✔
3689
        std::size_t num_bytes = 0;
20✔
3690
        for (const IntegratableChangeset& ic : list.changesets)
20✔
3691
            num_bytes += ic.changeset.size();
×
3692
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
20✔
3693
                    client_file_ident); // Throws
20✔
3694
        num_changesets_removed += num_changesets;
20✔
3695
        num_bytes_removed += num_bytes;
20✔
3696
        m_changesets_from_downstream.erase(client_file_ident);
20✔
3697
    }
20✔
3698

3699
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
38,336✔
3700
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
38,336✔
3701

3702
    if (num_changesets_removed == 0)
38,336✔
3703
        return;
38,336✔
3704

3705
    m_num_changesets_from_downstream -= num_changesets_removed;
×
3706

3707
    // The byte size of the blocked changesets must be decremented.
3708
    if (num_bytes_removed > 0) {
×
3709
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3710
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3711
    }
×
3712
}
×
3713

3714

3715
void ServerFile::finalize_work_stage_2()
3716
{
38,336✔
3717
    // Expose new snapshot to remote peers
3718
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
38,336✔
3719
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
38,336✔
3720
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
38,316✔
3721
        m_version_info = m_work.version_info;
38,316✔
3722
    }
38,316✔
3723

3724
    bool resume_download_and_upload = m_work.produced_new_sync_version;
38,336✔
3725

3726
    // Deliver allocated file identifiers to requesters
3727
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
38,336✔
3728
    auto begin = m_file_ident_requests.begin();
38,336✔
3729
    auto i = begin;
38,336✔
3730
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
38,336✔
3731
        FileIdentRequestInfo& info = i->second;
1,458✔
3732
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,458✔
3733
        REALM_ASSERT(info.client_type == slot.client_type);
1,458✔
3734
        if (FileIdentReceiver* receiver = info.receiver) {
1,458✔
3735
            info.receiver = nullptr;
1,358✔
3736
            receiver->receive_file_ident(slot.file_ident); // Throws
1,358✔
3737
        }
1,358✔
3738
        ++i;
1,458✔
3739
    }
1,458✔
3740
    m_file_ident_requests.erase(begin, i);
38,336✔
3741

3742
    // Resume download to downstream clients
3743
    if (resume_download_and_upload) {
38,336✔
3744
        resume_download();
18,420✔
3745
    }
18,420✔
3746
}
38,336✔
3747

3748
// ============================ Worker implementation ============================
3749

3750
Worker::Worker(ServerImpl& server)
3751
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::server, "Worker: ", server.logger_ptr)}
544✔
3752
    // Throws
3753
    , logger(*logger_ptr)
544✔
3754
    , m_server{server}
544✔
3755
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
544✔
3756
{
1,192✔
3757
    util::seed_prng_nondeterministically(m_random); // Throws
1,192✔
3758
}
1,192✔
3759

3760

3761
void Worker::enqueue(ServerFile* file)
3762
{
38,754✔
3763
    util::LockGuard lock{m_mutex};
38,754✔
3764
    m_queue.push_back(file); // Throws
38,754✔
3765
    m_cond.notify_all();
38,754✔
3766
}
38,754✔
3767

3768

3769
std::mt19937_64& Worker::server_history_get_random() noexcept
3770
{
2,508✔
3771
    return m_random;
2,508✔
3772
}
2,508✔
3773

3774

3775
void Worker::run()
3776
{
1,136✔
3777
    for (;;) {
39,858✔
3778
        ServerFile* file = nullptr;
39,858✔
3779
        {
39,858✔
3780
            util::LockGuard lock{m_mutex};
39,858✔
3781
            for (;;) {
79,032✔
3782
                if (REALM_UNLIKELY(m_stop))
79,032✔
3783
                    return;
1,136✔
3784
                if (!m_queue.empty()) {
77,896✔
3785
                    file = m_queue.front();
38,722✔
3786
                    m_queue.pop_front();
38,722✔
3787
                    break;
38,722✔
3788
                }
38,722✔
3789
                m_cond.wait(lock);
39,174✔
3790
            }
39,174✔
3791
        }
39,858✔
3792
        file->worker_process_work_unit(m_state); // Throws
38,722✔
3793
    }
38,722✔
3794
}
1,136✔
3795

3796

3797
void Worker::stop() noexcept
3798
{
1,136✔
3799
    util::LockGuard lock{m_mutex};
1,136✔
3800
    m_stop = true;
1,136✔
3801
    m_cond.notify_all();
1,136✔
3802
}
1,136✔
3803

3804

3805
// ============================ ServerImpl implementation ============================
3806

3807
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3808
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::server, std::move(config.logger))}
544✔
3809
    , logger{*logger_ptr}
544✔
3810
    , m_config{std::move(config)}
544✔
3811
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
544✔
3812
    , m_root_dir{root_dir} // Throws
544✔
3813
    , m_access_control{std::move(pkey)}
544✔
3814
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
544✔
3815
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
544✔
3816
    , m_worker{*this}                                                                    // Throws
544✔
3817
    , m_acceptor{get_service()}
544✔
3818
    , m_server_protocol{}       // Throws
544✔
3819
    , m_compress_memory_arena{} // Throws
544✔
3820
{
1,192✔
3821
    if (m_config.ssl) {
1,192✔
3822
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3823
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3824
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3825
    }
24✔
3826
}
1,192✔
3827

3828

3829
ServerImpl::~ServerImpl() noexcept
3830
{
1,192✔
3831
    bool server_destroyed_while_still_running = m_running;
1,192✔
3832
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
1,192✔
3833
}
1,192✔
3834

3835

3836
void ServerImpl::start()
3837
{
1,192✔
3838
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
1,192✔
3839
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
1,192✔
3840
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
1,192✔
3841
                m_protocol_version_range.first,
1,192✔
3842
                m_protocol_version_range.second); // Throws
1,192✔
3843
    logger.info("Platform: %1", util::get_platform_info());
1,192✔
3844
    bool is_debug_build = false;
1,192✔
3845
#if REALM_DEBUG
1,192✔
3846
    is_debug_build = true;
1,192✔
3847
#endif
1,192✔
3848
    {
1,192✔
3849
        const char* lead_text = "Build mode";
1,192✔
3850
        if (is_debug_build) {
1,192✔
3851
            logger.info("%1: Debug", lead_text); // Throws
1,192✔
3852
        }
1,192✔
3853
        else {
×
3854
            logger.info("%1: Release", lead_text); // Throws
×
3855
        }
×
3856
    }
1,192✔
3857
    if (is_debug_build) {
1,192✔
3858
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
1,192✔
3859
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
1,192✔
3860
    }
1,192✔
3861
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
1,192✔
3862
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
1,192✔
3863
    {
1,192✔
3864
        const char* lead_text = "Encryption";
1,192✔
3865
        if (m_config.encryption_key) {
1,192✔
3866
            logger.info("%1: Yes", lead_text); // Throws
4✔
3867
        }
4✔
3868
        else {
1,188✔
3869
            logger.info("%1: No", lead_text); // Throws
1,188✔
3870
        }
1,188✔
3871
    }
1,192✔
3872
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
1,192✔
3873
    {
1,192✔
3874
        const char* lead_text = "Disable sync to disk";
1,192✔
3875
        if (m_config.disable_sync_to_disk) {
1,192✔
3876
            logger.info("%1: All files", lead_text); // Throws
488✔
3877
        }
488✔
3878
        else {
704✔
3879
            logger.info("%1: No", lead_text); // Throws
704✔
3880
        }
704✔
3881
    }
1,192✔
3882
    if (m_config.disable_sync_to_disk) {
1,192✔
3883
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
488✔
3884
                    "never do this in production!"); // Throws
488✔
3885
    }
488✔
3886
    logger.info("Download compaction: %1",
1,192✔
3887
                (m_config.disable_download_compaction ? "No" : "Yes")); // Throws
1,192✔
3888
    logger.info("Download bootstrap caching: %1",
1,192✔
3889
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
1,192✔
3890
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
1,192✔
3891
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
1,192✔
3892
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
1,192✔
3893
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
1,192✔
3894
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
1,192✔
3895
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
1,192✔
3896
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
1,192✔
3897
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
1,192✔
3898

3899
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
1,192✔
3900

3901
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
1,192✔
3902

3903
    listen(); // Throws
1,192✔
3904
}
1,192✔
3905

3906

3907
void ServerImpl::run()
3908
{
1,136✔
3909
    auto ta = util::make_temp_assign(m_running, true);
1,136✔
3910

3911
    {
1,136✔
3912
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
1,136✔
3913
        std::string name;
1,136✔
3914
        if (util::Thread::get_name(name)) {
1,136✔
3915
            name += "-worker";
620✔
3916
            worker_thread.start_with_signals_blocked(name); // Throws
620✔
3917
        }
620✔
3918
        else {
516✔
3919
            worker_thread.start_with_signals_blocked(); // Throws
516✔
3920
        }
516✔
3921

3922
        m_service.run(); // Throws
1,136✔
3923

3924
        worker_thread.stop_and_rethrow(); // Throws
1,136✔
3925
    }
1,136✔
3926

3927
    logger.info("Realm sync server stopped");
1,136✔
3928
}
1,136✔
3929

3930

3931
void ServerImpl::stop() noexcept
3932
{
2,046✔
3933
    util::LockGuard lock{m_mutex};
2,046✔
3934
    if (m_stopped)
2,046✔
3935
        return;
854✔
3936
    m_stopped = true;
1,192✔
3937
    m_wait_or_service_stopped_cond.notify_all();
1,192✔
3938
    m_service.stop();
1,192✔
3939
}
1,192✔
3940

3941

3942
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3943
{
24,160✔
3944
    m_pending_changesets_from_downstream_byte_size += byte_size;
24,160✔
3945
    logger.debug("Byte size for pending downstream changesets incremented by "
24,160✔
3946
                 "%1 to reach a total of %2",
24,160✔
3947
                 byte_size,
24,160✔
3948
                 m_pending_changesets_from_downstream_byte_size); // Throws
24,160✔
3949
}
24,160✔
3950

3951

3952
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3953
{
18,440✔
3954
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
18,440✔
3955
    m_pending_changesets_from_downstream_byte_size -= byte_size;
18,440✔
3956
    logger.debug("Byte size for pending downstream changesets decremented by "
18,440✔
3957
                 "%1 to reach a total of %2",
18,440✔
3958
                 byte_size,
18,440✔
3959
                 m_pending_changesets_from_downstream_byte_size); // Throws
18,440✔
3960
}
18,440✔
3961

3962

3963
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3964
{
1,120✔
3965
    return get_random();
1,120✔
3966
}
1,120✔
3967

3968

3969
void ServerImpl::listen()
3970
{
1,192✔
3971
    network::Resolver resolver{get_service()};
1,192✔
3972
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
1,192✔
3973
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
1,192✔
3974
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
1,192✔
3975

3976
    auto i = endpoints.begin();
1,192✔
3977
    auto end = endpoints.end();
1,192✔
3978
    for (;;) {
1,192✔
3979
        std::error_code ec;
1,192✔
3980
        m_acceptor.open(i->protocol(), ec);
1,192✔
3981
        if (!ec) {
1,192✔
3982
            using SocketBase = network::SocketBase;
1,192✔
3983
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
1,192✔
3984
            if (!ec) {
1,192✔
3985
                m_acceptor.bind(*i, ec);
1,192✔
3986
                if (!ec)
1,192✔
3987
                    break;
1,192✔
3988
            }
1,192✔
3989
            m_acceptor.close();
×
3990
        }
×
3991
        if (i + 1 == end) {
×
3992
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
3993
                // FIXME: We don't have the error code for previous attempts, so
3994
                // can't print a nice message.
3995
                logger.error("Failed to bind to %1:%2", i2->address(),
×
3996
                             i2->port()); // Throws
×
3997
            }
×
3998
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
3999
                         ec.message()); // Throws
×
4000
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
4001
        }
×
4002
    }
×
4003

4004
    m_acceptor.listen(m_config.listen_backlog);
1,192✔
4005

4006
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
1,192✔
4007
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
1,192✔
4008
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
1,192✔
4009
                m_config.listen_backlog, ssl_mode); // Throws
1,192✔
4010

4011
    initiate_accept();
1,192✔
4012
}
1,192✔
4013

4014

4015
void ServerImpl::initiate_accept()
4016
{
3,262✔
4017
    auto handler = [this](std::error_code ec) {
3,262✔
4018
        if (ec != util::error::operation_aborted)
2,070✔
4019
            handle_accept(ec);
2,070✔
4020
    };
2,070✔
4021
    bool is_ssl = bool(m_ssl_context);
3,262✔
4022
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
3,262✔
4023
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
3,262✔
4024
}
3,262✔
4025

4026

4027
void ServerImpl::handle_accept(std::error_code ec)
4028
{
2,070✔
4029
    if (ec) {
2,070✔
4030
        if (ec != util::error::connection_aborted) {
×
4031
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4032

4033
            // We close the reserved files to get a few extra file descriptors.
4034
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4035
                m_reserved_files[i].reset();
×
4036
            }
×
4037

4038
            // FIXME: There are probably errors that need to be treated
4039
            // specially, and not cause the server to "crash".
4040

4041
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4042
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4043
                             "consider increasing the limit in your system config"); // Throws
×
4044
                throw OutOfFilesError(ec);
×
4045
            }
×
4046
            else {
×
4047
                throw std::system_error(ec);
×
4048
            }
×
4049
        }
×
4050
        logger.debug("Skipping aborted connection"); // Throws
×
4051
    }
×
4052
    else {
2,070✔
4053
        HTTPConnection& conn = *m_next_http_conn;
2,070✔
4054
        if (m_config.tcp_no_delay)
2,070✔
4055
            conn.get_socket().set_option(network::SocketBase::no_delay(true));  // Throws
1,736✔
4056
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn)); // Throws
2,070✔
4057
        Formatter& formatter = m_misc_buffers.formatter;
2,070✔
4058
        formatter.reset();
2,070✔
4059
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
2,070✔
4060
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
2,070✔
4061
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
2,070✔
4062
    }
2,070✔
4063
    initiate_accept(); // Throws
2,070✔
4064
}
2,070✔
4065

4066

4067
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4068
{
2,070✔
4069
    m_http_connections.erase(conn_id);
2,070✔
4070
}
2,070✔
4071

4072

4073
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4074
{
2,024✔
4075
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
2,024✔
4076
}
2,024✔
4077

4078

4079
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4080
{
1,176✔
4081
    m_sync_connections.erase(connection_id);
1,176✔
4082
}
1,176✔
4083

4084

4085
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4086
{
4✔
4087
    get_service().post([this, timeout](Status) {
4✔
4088
        m_config.connection_reaper_timeout = timeout;
4✔
4089
    });
4✔
4090
}
4✔
4091

4092

4093
void ServerImpl::close_connections()
4094
{
20✔
4095
    get_service().post([this](Status) {
20✔
4096
        do_close_connections(); // Throws
20✔
4097
    });
20✔
4098
}
20✔
4099

4100

4101
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4102
{
72✔
4103
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4104
}
72✔
4105

4106

4107
void ServerImpl::recognize_external_change(const std::string& virt_path)
4108
{
4,800✔
4109
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4110
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4111
        do_recognize_external_change(virt_path); // Throws
4,800✔
4112
    });                                          // Throws
4,800✔
4113
}
4,800✔
4114

4115

4116
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4117
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4118
{
×
4119
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4120
                "timeout = %1",
×
4121
                timeout); // Throws
×
4122

4123
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4124
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4125
                                                    timeout); // Throws
×
4126
    });
×
4127
}
×
4128

4129

4130
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4131
{
1,346✔
4132
    m_connection_reaper_timer.emplace(get_service());
1,346✔
4133
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
1,346✔
4134
        if (status != ErrorCodes::OperationAborted) {
154✔
4135
            reap_connections();                        // Throws
154✔
4136
            initiate_connection_reaper_timer(timeout); // Throws
154✔
4137
        }
154✔
4138
    }); // Throws
154✔
4139
}
1,346✔
4140

4141

4142
void ServerImpl::reap_connections()
4143
{
154✔
4144
    logger.debug("Discarding dead connections"); // Throws
154✔
4145
    SteadyTimePoint now = steady_clock_now();
154✔
4146
    {
154✔
4147
        auto end = m_http_connections.end();
154✔
4148
        auto i = m_http_connections.begin();
154✔
4149
        while (i != end) {
156✔
4150
            HTTPConnection& conn = *i->second;
2✔
4151
            ++i;
2✔
4152
            // Suicide
4153
            conn.terminate_if_dead(now); // Throws
2✔
4154
        }
2✔
4155
    }
154✔
4156
    {
154✔
4157
        auto end = m_sync_connections.end();
154✔
4158
        auto i = m_sync_connections.begin();
154✔
4159
        while (i != end) {
304✔
4160
            SyncConnection& conn = *i->second;
150✔
4161
            ++i;
150✔
4162
            // Suicide
4163
            conn.terminate_if_dead(now); // Throws
150✔
4164
        }
150✔
4165
    }
154✔
4166
}
154✔
4167

4168

4169
void ServerImpl::do_close_connections()
4170
{
20✔
4171
    for (auto& entry : m_sync_connections) {
20✔
4172
        SyncConnection& conn = *entry.second;
20✔
4173
        conn.initiate_soft_close(); // Throws
20✔
4174
    }
20✔
4175
}
20✔
4176

4177

4178
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4179
{
4,800✔
4180
    auto i = m_files.find(virt_path);
4,800✔
4181
    if (i == m_files.end())
4,800✔
4182
        return;
×
4183
    ServerFile& file = *i->second;
4,800✔
4184
    file.recognize_external_change();
4,800✔
4185
}
4,800✔
4186

4187

4188
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4189
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4190
{
×
4191
    static_cast<void>(timeout);
×
4192
    if (m_sync_stopped)
×
4193
        return;
×
4194
    do_close_connections(); // Throws
×
4195
    m_sync_stopped = true;
×
4196
    bool completion_reached = false;
×
4197
    completion_handler(completion_reached); // Throws
×
4198
}
×
4199

4200

4201
// ============================ SyncConnection implementation ============================
4202

4203
SyncConnection::~SyncConnection() noexcept
4204
{
2,024✔
4205
    m_sessions_enlisted_to_send.clear();
2,024✔
4206
    m_sessions.clear();
2,024✔
4207
}
2,024✔
4208

4209

4210
void SyncConnection::initiate()
4211
{
2,024✔
4212
    m_last_activity_at = steady_clock_now();
2,024✔
4213
    logger.debug("Sync Connection initiated");
2,024✔
4214
    m_websocket.initiate_server_websocket_after_handshake();
2,024✔
4215
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
2,024✔
4216
                     m_appservices_request_id);
2,024✔
4217
}
2,024✔
4218

4219

4220
template <class... Params>
4221
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4222
{
1,176✔
4223
    terminate_sessions();                              // Throws
1,176✔
4224
    logger.log(log_level, log_message, log_params...); // Throws
1,176✔
4225
    m_websocket.stop();
1,176✔
4226
    m_ssl_stream.reset();
1,176✔
4227
    m_socket.reset();
1,176✔
4228
    // Suicide
4229
    m_server.remove_sync_connection(m_id);
1,176✔
4230
}
1,176✔
4231

4232

4233
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4234
{
150✔
4235
    milliseconds_type time = steady_duration(m_last_activity_at, now);
150✔
4236
    const Server::Config& config = m_server.get_config();
150✔
4237
    if (m_is_closing) {
150✔
4238
        if (time >= config.soft_close_timeout) {
×
4239
            // Suicide
4240
            terminate(Logger::Level::detail,
×
4241
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4242
        }
×
4243
    }
×
4244
    else {
150✔
4245
        if (time >= config.connection_reaper_timeout) {
150✔
4246
            // Suicide
4247
            terminate(Logger::Level::detail,
4✔
4248
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4249
        }
4✔
4250
    }
150✔
4251
}
150✔
4252

4253

4254
void SyncConnection::enlist_to_send(Session* sess) noexcept
4255
{
112,442✔
4256
    REALM_ASSERT(m_send_trigger);
112,442✔
4257
    REALM_ASSERT(!m_is_closing);
112,442✔
4258
    REALM_ASSERT(!sess->is_enlisted_to_send());
112,442✔
4259
    m_sessions_enlisted_to_send.push_back(sess);
112,442✔
4260
    m_send_trigger->trigger();
112,442✔
4261
}
112,442✔
4262

4263

4264
void SyncConnection::handle_protocol_error(Status status)
4265
{
×
4266
    logger.error("%1", status);
×
4267
    switch (status.code()) {
×
4268
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4269
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4270
            break;
×
4271
        case ErrorCodes::LimitExceeded:
×
4272
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4273
            break;
×
4274
        default:
×
4275
            protocol_error(ProtocolError::other_error);
×
4276
            break;
×
4277
    }
×
4278
}
×
4279

4280
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4281
                                          std::string signed_user_token, bool need_client_file_ident,
4282
                                          bool is_subserver)
4283
{
5,638✔
4284
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
5,638✔
4285
    bool was_inserted = p.second;
5,638✔
4286
    if (REALM_UNLIKELY(!was_inserted)) {
5,638✔
4287
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4288
                     session_ident);                           // Throws
×
4289
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4290
        return;
×
4291
    }
×
4292
    try {
5,638✔
4293
        p.first->second.reset(new Session(*this, session_ident)); // Throws
5,638✔
4294
    }
5,638✔
4295
    catch (...) {
5,638✔
4296
        m_sessions.erase(p.first);
×
4297
        throw;
×
4298
    }
×
4299

4300
    Session& sess = *p.first->second;
5,638✔
4301
    sess.initiate(); // Throws
5,638✔
4302
    ProtocolError error;
5,638✔
4303
    bool success =
5,638✔
4304
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
5,638✔
4305
                                  error); // Throws
5,638✔
4306
    if (REALM_UNLIKELY(!success))         // Throws
5,638✔
4307
        protocol_error(error, &sess);     // Throws
28✔
4308
}
5,638✔
4309

4310

4311
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4312
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4313
                                           version_type scan_client_version, version_type latest_server_version,
4314
                                           salt_type latest_server_version_salt)
4315
{
4,356✔
4316
    auto i = m_sessions.find(session_ident);
4,356✔
4317
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,356✔
4318
        bad_session_ident("IDENT", session_ident); // Throws
×
4319
        return;
×
4320
    }
×
4321
    Session& sess = *i->second;
4,356✔
4322
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,356✔
4323
        message_after_unbind("IDENT", session_ident); // Throws
×
4324
        return;
×
4325
    }
×
4326
    if (REALM_UNLIKELY(sess.error_occurred())) {
4,356✔
4327
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4328
        // messages, other than UNBIND, must be ignored.
4329
        return;
16✔
4330
    }
16✔
4331
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
4,340✔
4332
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4333
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4334
        return;
×
4335
    }
×
4336
    if (REALM_UNLIKELY(sess.ident_message_received())) {
4,340✔
4337
        logger.error("Received second IDENT message for session"); // Throws
×
4338
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4339
        return;
×
4340
    }
×
4341

4342
    ProtocolError error = {};
4,340✔
4343
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
4,340✔
4344
                                              scan_client_version, latest_server_version, latest_server_version_salt,
4,340✔
4345
                                              error); // Throws
4,340✔
4346
    if (REALM_UNLIKELY(!success))                     // Throws
4,340✔
4347
        protocol_error(error, &sess);                 // Throws
32✔
4348
}
4,340✔
4349

4350
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4351
                                            version_type progress_server_version, version_type locked_server_version,
4352
                                            const UploadChangesets& upload_changesets)
4353
{
45,808✔
4354
    auto i = m_sessions.find(session_ident);
45,808✔
4355
    if (REALM_UNLIKELY(i == m_sessions.end())) {
45,808✔
4356
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4357
        return;
×
4358
    }
×
4359
    Session& sess = *i->second;
45,808✔
4360
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
45,808✔
4361
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4362
        return;
×
4363
    }
×
4364
    if (REALM_UNLIKELY(sess.error_occurred())) {
45,808✔
4365
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4366
        // messages, other than UNBIND, must be ignored.
4367
        return;
×
4368
    }
×
4369
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
45,808✔
4370
        message_before_ident("UPLOAD", session_ident); // Throws
×
4371
        return;
×
4372
    }
×
4373

4374
    ProtocolError error = {};
45,808✔
4375
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
45,808✔
4376
                                               locked_server_version, upload_changesets, error); // Throws
45,808✔
4377
    if (REALM_UNLIKELY(!success))                                                                // Throws
45,808✔
4378
        protocol_error(error, &sess);                                                            // Throws
×
4379
}
45,808✔
4380

4381

4382
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4383
{
12,202✔
4384
    auto i = m_sessions.find(session_ident);
12,202✔
4385
    if (REALM_UNLIKELY(i == m_sessions.end())) {
12,202✔
4386
        bad_session_ident("MARK", session_ident);
×
4387
        return;
×
4388
    }
×
4389
    Session& sess = *i->second;
12,202✔
4390
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
12,202✔
4391
        message_after_unbind("MARK", session_ident); // Throws
×
4392
        return;
×
4393
    }
×
4394
    if (REALM_UNLIKELY(sess.error_occurred())) {
12,202✔
4395
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4396
        // messages, other than UNBIND, must be ignored.
4397
        return;
48✔
4398
    }
48✔
4399
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
12,154✔
4400
        message_before_ident("MARK", session_ident); // Throws
×
4401
        return;
×
4402
    }
×
4403

4404
    ProtocolError error;
12,154✔
4405
    bool success = sess.receive_mark_message(request_ident, error); // Throws
12,154✔
4406
    if (REALM_UNLIKELY(!success))                                   // Throws
12,154✔
4407
        protocol_error(error, &sess);                               // Throws
×
4408
}
12,154✔
4409

4410

4411
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4412
{
2,796✔
4413
    auto i = m_sessions.find(session_ident); // Throws
2,796✔
4414
    if (REALM_UNLIKELY(i == m_sessions.end())) {
2,796✔
4415
        bad_session_ident("UNBIND", session_ident); // Throws
×
4416
        return;
×
4417
    }
×
4418
    Session& sess = *i->second;
2,796✔
4419
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
2,796✔
4420
        message_after_unbind("UNBIND", session_ident); // Throws
×
4421
        return;
×
4422
    }
×
4423

4424
    sess.receive_unbind_message(); // Throws
2,796✔
4425
    // NOTE: The session might have gotten destroyed at this time!
4426
}
2,796✔
4427

4428

4429
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4430
{
104✔
4431
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
104✔
4432
    m_send_pong = true;
104✔
4433
    m_last_ping_timestamp = timestamp;
104✔
4434
    if (!m_is_sending)
104✔
4435
        send_next_message();
102✔
4436
}
104✔
4437

4438

4439
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4440
                                           std::string_view error_body)
4441
{
×
4442
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4443
                 session_ident); // Throws
×
4444
    auto i = m_sessions.find(session_ident);
×
4445
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4446
        bad_session_ident("ERROR", session_ident);
×
4447
        return;
×
4448
    }
×
4449
    Session& sess = *i->second;
×
4450
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4451
        message_after_unbind("ERROR", session_ident); // Throws
×
4452
        return;
×
4453
    }
×
4454

4455
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4456
}
×
4457

4458
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4459
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4460
{
6,332✔
4461
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
6,332✔
4462
        return logger.log(level, message.c_str());
×
4463
    }
×
4464

4465
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
6,332✔
4466
    {
6,332✔
4467
        std::lock_guard lock(m_log_mutex);
6,332✔
4468
        m_log_messages.push(std::move(log_msg));
6,332✔
4469
    }
6,332✔
4470
    m_send_trigger->trigger();
6,332✔
4471
}
6,332✔
4472

4473

4474
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4475
{
×
4476
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4477
                 session_ident);                      // Throws
×
4478
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4479
}
×
4480

4481

4482
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4483
{
×
4484
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4485
                 session_ident);                      // Throws
×
4486
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4487
}
×
4488

4489

4490
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4491
{
×
4492
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4493
                 session_ident);                      // Throws
×
4494
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4495
}
×
4496

4497

4498
void SyncConnection::handle_message_received(const char* data, size_t size)
4499
{
70,904✔
4500
    // parse_message_received() parses the message and calls the
4501
    // proper handler on the SyncConnection object (this).
4502
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
70,904✔
4503
    return;
70,904✔
4504
}
70,904✔
4505

4506

4507
void SyncConnection::handle_ping_received(const char* data, size_t size)
4508
{
×
4509
    // parse_message_received() parses the message and calls the
4510
    // proper handler on the SyncConnection object (this).
4511
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4512
    return;
×
4513
}
×
4514

4515

4516
void SyncConnection::send_next_message()
4517
{
108,950✔
4518
    REALM_ASSERT(!m_is_sending);
108,950✔
4519
    REALM_ASSERT(!m_sending_pong);
108,950✔
4520
    if (m_send_pong) {
108,950✔
4521
        send_pong(m_last_ping_timestamp);
104✔
4522
        if (m_sending_pong)
104✔
4523
            return;
104✔
4524
    }
104✔
4525
    for (;;) {
162,258✔
4526
        Session* sess = m_sessions_enlisted_to_send.pop_front();
162,258✔
4527
        if (!sess) {
162,258✔
4528
            // No sessions were enlisted to send
4529
            if (REALM_LIKELY(!m_is_closing))
50,028✔
4530
                break; // Check to see if there are any log messages to go out
50,010✔
4531
            // Send a connection level ERROR
4532
            REALM_ASSERT(!is_session_level_error(m_error_code));
18✔
4533
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
18✔
4534
            return;
18✔
4535
        }
50,028✔
4536
        sess->send_message(); // Throws
112,230✔
4537
        // NOTE: The session might have gotten destroyed at this time!
4538

4539
        // At this point, `m_is_sending` is true if, and only if the session
4540
        // chose to send a message. If it chose to not send a message, we must
4541
        // loop back and give the next session in `m_sessions_enlisted_to_send`
4542
        // a chance.
4543
        if (m_is_sending)
112,230✔
4544
            return;
58,830✔
4545
    }
112,230✔
4546
    {
49,998✔
4547
        std::lock_guard lock(m_log_mutex);
49,998✔
4548
        if (!m_log_messages.empty()) {
49,998✔
4549
            send_log_message(m_log_messages.front());
6,160✔
4550
            m_log_messages.pop();
6,160✔
4551
        }
6,160✔
4552
    }
49,998✔
4553
    // Otherwise, nothing to do
4554
}
49,998✔
4555

4556

4557
void SyncConnection::initiate_write_output_buffer()
4558
{
64,988✔
4559
    auto handler = [this](std::error_code ec, size_t) {
64,988✔
4560
        if (!ec) {
64,972✔
4561
            handle_write_output_buffer();
64,828✔
4562
        }
64,828✔
4563
    };
64,972✔
4564

4565
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
64,988✔
4566
                                   std::move(handler)); // Throws
64,988✔
4567
    m_is_sending = true;
64,988✔
4568
}
64,988✔
4569

4570

4571
void SyncConnection::initiate_pong_output_buffer()
4572
{
104✔
4573
    auto handler = [this](std::error_code ec, size_t) {
104✔
4574
        if (!ec) {
104✔
4575
            handle_pong_output_buffer();
104✔
4576
        }
104✔
4577
    };
104✔
4578

4579
    REALM_ASSERT(!m_is_sending);
104✔
4580
    REALM_ASSERT(!m_sending_pong);
104✔
4581
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
104✔
4582
                                   std::move(handler)); // Throws
104✔
4583

4584
    m_is_sending = true;
104✔
4585
    m_sending_pong = true;
104✔
4586
}
104✔
4587

4588

4589
void SyncConnection::send_pong(milliseconds_type timestamp)
4590
{
104✔
4591
    REALM_ASSERT(m_send_pong);
104✔
4592
    REALM_ASSERT(!m_sending_pong);
104✔
4593
    m_send_pong = false;
104✔
4594
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
104✔
4595

4596
    OutputBuffer& out = get_output_buffer();
104✔
4597
    get_server_protocol().make_pong(out, timestamp); // Throws
104✔
4598

4599
    initiate_pong_output_buffer(); // Throws
104✔
4600
}
104✔
4601

4602
void SyncConnection::send_log_message(const LogMessage& log_msg)
4603
{
6,158✔
4604
    OutputBuffer& out = get_output_buffer();
6,158✔
4605
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
6,158✔
4606
                                           log_msg.co_id); // Throws
6,158✔
4607

4608
    initiate_write_output_buffer(); // Throws
6,158✔
4609
}
6,158✔
4610

4611

4612
void SyncConnection::handle_write_output_buffer()
4613
{
64,828✔
4614
    release_output_buffer();
64,828✔
4615
    m_is_sending = false;
64,828✔
4616
    send_next_message(); // Throws
64,828✔
4617
}
64,828✔
4618

4619

4620
void SyncConnection::handle_pong_output_buffer()
4621
{
104✔
4622
    release_output_buffer();
104✔
4623
    REALM_ASSERT(m_is_sending);
104✔
4624
    REALM_ASSERT(m_sending_pong);
104✔
4625
    m_is_sending = false;
104✔
4626
    m_sending_pong = false;
104✔
4627
    send_next_message(); // Throws
104✔
4628
}
104✔
4629

4630

4631
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4632
{
20✔
4633
    const char* message = get_protocol_error_message(int(error_code));
20✔
4634
    std::size_t message_size = std::strlen(message);
20✔
4635
    bool try_again = determine_try_again(error_code);
20✔
4636

4637
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
20✔
4638
                  message_size, try_again, session_ident); // Throws
20✔
4639

4640
    OutputBuffer& out = get_output_buffer();
20✔
4641
    int protocol_version = get_client_protocol_version();
20✔
4642
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
20✔
4643
                                             session_ident); // Throws
20✔
4644

4645
    auto handler = [this](std::error_code ec, size_t) {
20✔
4646
        handle_write_error(ec); // Throws
20✔
4647
    };
20✔
4648
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
20✔
4649
    m_is_sending = true;
20✔
4650
}
20✔
4651

4652

4653
void SyncConnection::handle_write_error(std::error_code ec)
4654
{
20✔
4655
    m_is_sending = false;
20✔
4656
    REALM_ASSERT(m_is_closing);
20✔
4657
    if (!m_ssl_stream) {
20✔
4658
        m_socket->shutdown(network::Socket::shutdown_send, ec);
20✔
4659
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
20!
4660
            throw std::system_error(ec);
×
4661
    }
20✔
4662
}
20✔
4663

4664

4665
// For connection level errors, `sess` is ignored. For session level errors, a
4666
// session must be specified.
4667
//
4668
// If a session is specified, that session object will have been detached from
4669
// the ServerFile object (and possibly destroyed) upon return from
4670
// protocol_error().
4671
//
4672
// If a session is specified for a protocol level error, that session object
4673
// will have been destroyed upon return from protocol_error(). For session level
4674
// errors, the specified session will have been destroyed upon return from
4675
// protocol_error() if, and only if the negotiated protocol version is less than
4676
// 23.
4677
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4678
{
80✔
4679
    REALM_ASSERT(!m_is_closing);
80✔
4680
    bool session_level = is_session_level_error(error_code);
80✔
4681
    REALM_ASSERT(!session_level || sess);
80✔
4682
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
80✔
4683
    if (logger.would_log(util::Logger::Level::debug)) {
80✔
4684
        const char* message = get_protocol_error_message(int(error_code));
×
4685
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4686
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4687
    }
×
4688
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
80✔
4689
    if (session_level) {
80✔
4690
        sess->initiate_deactivation(error_code); // Throws
80✔
4691
        return;
80✔
4692
    }
80✔
4693
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4694
}
×
4695

4696

4697
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4698
{
20✔
4699
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
20✔
4700

4701
    // With recent versions of the protocol (when the version is greater than,
4702
    // or equal to 23), this function will only be called for connection level
4703
    // errors, never for session specific errors. However, for the purpose of
4704
    // emulating earlier protocol versions, this function might be called for
4705
    // session specific errors too.
4706
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
20✔
4707
    REALM_ASSERT(!is_session_level_error(error_code));
20✔
4708

4709
    REALM_ASSERT(m_send_trigger);
20✔
4710
    REALM_ASSERT(!m_is_closing);
20✔
4711
    m_is_closing = true;
20✔
4712

4713
    m_error_code = error_code;
20✔
4714
    m_error_session_ident = session_ident;
20✔
4715

4716
    // Don't waste time and effort sending any other messages
4717
    m_send_pong = false;
20✔
4718
    m_sessions_enlisted_to_send.clear();
20✔
4719

4720
    m_receiving_session = nullptr;
20✔
4721

4722
    terminate_sessions(); // Throws
20✔
4723

4724
    m_send_trigger->trigger();
20✔
4725
}
20✔
4726

4727

4728
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4729
{
666✔
4730
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
666✔
4731
    // Suicide
4732
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
666✔
4733
}
666✔
4734

4735

4736
void SyncConnection::close_due_to_error(std::error_code ec)
4737
{
506✔
4738
    // Suicide
4739
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
506✔
4740
              ec.message()); // Throws
506✔
4741
}
506✔
4742

4743

4744
void SyncConnection::terminate_sessions()
4745
{
1,196✔
4746
    for (auto& entry : m_sessions) {
1,872✔
4747
        Session& sess = *entry.second;
1,872✔
4748
        sess.terminate(); // Throws
1,872✔
4749
    }
1,872✔
4750
    m_sessions_enlisted_to_send.clear();
1,196✔
4751
    m_sessions.clear();
1,196✔
4752
}
1,196✔
4753

4754

4755
void SyncConnection::initiate_soft_close()
4756
{
20✔
4757
    if (!m_is_closing) {
20✔
4758
        session_ident_type session_ident = 0;                                    // Not session specific
20✔
4759
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
20✔
4760
    }
20✔
4761
}
20✔
4762

4763

4764
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4765
{
2,786✔
4766
    m_sessions.erase(session_ident);
2,786✔
4767
}
2,786✔
4768

4769
} // anonymous namespace
4770

4771

4772
// ============================ sync::Server implementation ============================
4773

4774
class Server::Implementation : public ServerImpl {
4775
public:
4776
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4777
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
544✔
4778
    {
1,192✔
4779
    }
1,192✔
4780
    virtual ~Implementation() {}
1,192✔
4781
};
4782

4783

4784
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4785
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
544✔
4786
{
1,192✔
4787
}
1,192✔
4788

4789

4790
Server::Server(Server&& serv) noexcept
4791
    : m_impl{std::move(serv.m_impl)}
4792
{
×
4793
}
×
4794

4795

4796
Server::~Server() noexcept {}
1,192✔
4797

4798

4799
void Server::start()
4800
{
508✔
4801
    m_impl->start(); // Throws
508✔
4802
}
508✔
4803

4804

4805
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4806
{
684✔
4807
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
684✔
4808
}
684✔
4809

4810

4811
network::Endpoint Server::listen_endpoint() const
4812
{
1,226✔
4813
    return m_impl->listen_endpoint(); // Throws
1,226✔
4814
}
1,226✔
4815

4816

4817
void Server::run()
4818
{
1,136✔
4819
    m_impl->run(); // Throws
1,136✔
4820
}
1,136✔
4821

4822

4823
void Server::stop() noexcept
4824
{
2,046✔
4825
    m_impl->stop();
2,046✔
4826
}
2,046✔
4827

4828

4829
uint_fast64_t Server::errors_seen() const noexcept
4830
{
684✔
4831
    return m_impl->errors_seen;
684✔
4832
}
684✔
4833

4834

4835
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4836
                                                      milliseconds_type timeout)
4837
{
×
4838
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4839
}
×
4840

4841

4842
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4843
{
4✔
4844
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4845
}
4✔
4846

4847

4848
void Server::close_connections()
4849
{
20✔
4850
    m_impl->close_connections();
20✔
4851
}
20✔
4852

4853

4854
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4855
{
72✔
4856
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4857
}
72✔
4858

4859

4860
void Server::recognize_external_change(const std::string& virt_path)
4861
{
4,800✔
4862
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4863
}
4,800✔
4864

4865

4866
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4867
{
×
4868
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4869
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc