• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1771

20 Oct 2023 08:58AM UTC coverage: 91.567% (-0.009%) from 91.576%
1771

push

Evergreen

web-flow
Fix blocked DB::open on multiprocess access on exFAT filesystem (#6959)

Fix double file lock and DB::open being blocked with multiple concurrent realm access on fat32/exfat file systems.

When file is truncated on fat32/exfat its uid is available for other files. With multiple processes opening and truncating the same set of files could lead to the situation when within one process get_unique_id will return the same value for different files in timing is right. This breaks proper initialization of static data for interprocess mutexes, so that subsequent locks will hang by trying to lock essentially the same file twice.

94304 of 173552 branches covered (0.0%)

59 of 82 new or added lines in 5 files covered. (71.95%)

53 existing lines in 13 files now uncovered.

230544 of 251776 relevant lines covered (91.57%)

6594884.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.42
/src/realm/sync/noinst/server/server.cpp
1
#include <realm/sync/noinst/server/server.hpp>
2

3
#include <realm/binary_data.hpp>
4
#include <realm/impl/simulated_failure.hpp>
5
#include <realm/object_id.hpp>
6
#include <realm/string_data.hpp>
7
#include <realm/sync/changeset.hpp>
8
#include <realm/sync/trigger.hpp>
9
#include <realm/sync/impl/clamped_hex_dump.hpp>
10
#include <realm/sync/impl/clock.hpp>
11
#include <realm/sync/network/http.hpp>
12
#include <realm/sync/network/network_ssl.hpp>
13
#include <realm/sync/network/websocket.hpp>
14
#include <realm/sync/noinst/client_history_impl.hpp>
15
#include <realm/sync/noinst/protocol_codec.hpp>
16
#include <realm/sync/noinst/server/access_control.hpp>
17
#include <realm/sync/noinst/server/server_dir.hpp>
18
#include <realm/sync/noinst/server/server_file_access_cache.hpp>
19
#include <realm/sync/noinst/server/server_impl_base.hpp>
20
#include <realm/sync/transform.hpp>
21
#include <realm/util/base64.hpp>
22
#include <realm/util/bind_ptr.hpp>
23
#include <realm/util/buffer_stream.hpp>
24
#include <realm/util/circular_buffer.hpp>
25
#include <realm/util/compression.hpp>
26
#include <realm/util/file.hpp>
27
#include <realm/util/json_parser.hpp>
28
#include <realm/util/load_file.hpp>
29
#include <realm/util/memory_stream.hpp>
30
#include <realm/util/optional.hpp>
31
#include <realm/util/platform_info.hpp>
32
#include <realm/util/random.hpp>
33
#include <realm/util/safe_int_ops.hpp>
34
#include <realm/util/scope_exit.hpp>
35
#include <realm/util/scratch_allocator.hpp>
36
#include <realm/util/thread.hpp>
37
#include <realm/util/thread_exec_guard.hpp>
38
#include <realm/util/value_reset_guard.hpp>
39
#include <realm/version.hpp>
40

41
#include <algorithm>
42
#include <atomic>
43
#include <cctype>
44
#include <chrono>
45
#include <cmath>
46
#include <condition_variable>
47
#include <cstdint>
48
#include <cstdio>
49
#include <cstring>
50
#include <functional>
51
#include <locale>
52
#include <map>
53
#include <memory>
54
#include <queue>
55
#include <sstream>
56
#include <stdexcept>
57
#include <thread>
58
#include <vector>
59

60
// NOTE: The protocol specification is in `/doc/protocol.md`
61

62

63
// FIXME: Verify that session identifier spoofing cannot be used to get access
64
// to sessions belonging to other network conections in any way.
65
// FIXME: Seems that server must close connection with zero sessions after a
66
// certain timeout.
67

68

69
using namespace realm;
70
using namespace realm::sync;
71
using namespace realm::util;
72

73
// clang-format off
74
using ServerHistory         = _impl::ServerHistory;
75
using ServerProtocol        = _impl::ServerProtocol;
76
using ServerFileAccessCache = _impl::ServerFileAccessCache;
77
using ServerImplBase = _impl::ServerImplBase;
78

79
using IntegratableChangeset = ServerHistory::IntegratableChangeset;
80
using IntegratableChangesetList = ServerHistory::IntegratableChangesetList;
81
using IntegratableChangesets = ServerHistory::IntegratableChangesets;
82
using IntegrationResult = ServerHistory::IntegrationResult;
83
using BootstrapError = ServerHistory::BootstrapError;
84
using ExtendedIntegrationError = ServerHistory::ExtendedIntegrationError;
85
using ClientType = ServerHistory::ClientType;
86
using FileIdentAllocSlot = ServerHistory::FileIdentAllocSlot;
87
using FileIdentAllocSlots = ServerHistory::FileIdentAllocSlots;
88

89
using UploadChangeset = ServerProtocol::UploadChangeset;
90
// clang-format on
91

92

93
using UploadChangesets = std::vector<UploadChangeset>;
94

95
using EventLoopMetricsHandler = network::Service::EventLoopMetricsHandler;
96

97

98
static_assert(std::numeric_limits<session_ident_type>::digits >= 63, "Bad session identifier type");
99
static_assert(std::numeric_limits<file_ident_type>::digits >= 63, "Bad file identifier type");
100
static_assert(std::numeric_limits<version_type>::digits >= 63, "Bad version type");
101
static_assert(std::numeric_limits<timestamp_type>::digits >= 63, "Bad timestamp type");
102

103

104
namespace {
105

106
enum class SchedStatus { done = 0, pending, in_progress };
107

108
// Only used by the Sync Server to support older pbs sync clients (prior to protocol v8)
109
constexpr std::string_view get_old_pbs_websocket_protocol_prefix() noexcept
110
{
×
111
    return "com.mongodb.realm-sync/";
×
112
}
×
113

114
std::string short_token_fmt(const std::string& str, size_t cutoff = 30)
115
{
420✔
116
    if (str.size() > cutoff) {
420✔
117
        return "..." + str.substr(str.size() - cutoff);
×
118
    }
×
119
    else {
420✔
120
        return str;
420✔
121
    }
420✔
122
}
420✔
123

124

125
class HttpListHeaderValueParser {
126
public:
127
    HttpListHeaderValueParser(std::string_view string) noexcept
128
        : m_string{string}
129
    {
1,912✔
130
    }
1,912✔
131
    bool next(std::string_view& elem) noexcept
132
    {
19,120✔
133
        while (m_pos < m_string.size()) {
19,120✔
134
            size_type i = m_pos;
17,208✔
135
            size_type j = m_string.find(',', i);
17,208✔
136
            if (j != std::string_view::npos) {
17,208✔
137
                m_pos = j + 1;
15,296✔
138
            }
15,296✔
139
            else {
1,912✔
140
                j = m_string.size();
1,912✔
141
                m_pos = j;
1,912✔
142
            }
1,912✔
143

8,478✔
144
            // Exclude leading and trailing white space
8,478✔
145
            while (i < j && is_http_lws(m_string[i]))
32,504✔
146
                ++i;
15,296✔
147
            while (j > i && is_http_lws(m_string[j - 1]))
17,208✔
148
                --j;
×
149

8,478✔
150
            if (i != j) {
17,208✔
151
                elem = m_string.substr(i, j - i);
17,208✔
152
                return true;
17,208✔
153
            }
17,208✔
154
        }
17,208✔
155
        return false;
10,390✔
156
    }
19,120✔
157

158
private:
159
    using size_type = std::string_view::size_type;
160
    const std::string_view m_string;
161
    size_type m_pos = 0;
162
    static bool is_http_lws(char ch) noexcept
163
    {
49,708✔
164
        return (ch == '\t' || ch == '\n' || ch == '\r' || ch == ' ');
49,708✔
165
    }
49,708✔
166
};
167

168

169
using SteadyClock = std::conditional<std::chrono::high_resolution_clock::is_steady,
170
                                     std::chrono::high_resolution_clock, std::chrono::steady_clock>::type;
171
using SteadyTimePoint = SteadyClock::time_point;
172

173
SteadyTimePoint steady_clock_now() noexcept
174
{
151,244✔
175
    return SteadyClock::now();
151,244✔
176
}
151,244✔
177

178
milliseconds_type steady_duration(SteadyTimePoint start_time, SteadyTimePoint end_time = steady_clock_now()) noexcept
179
{
38,346✔
180
    auto duration = end_time - start_time;
38,346✔
181
    auto millis_duration = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
38,346✔
182
    return milliseconds_type(millis_duration);
38,346✔
183
}
38,346✔
184

185

186
bool determine_try_again(ProtocolError error_code) noexcept
187
{
96✔
188
    return (error_code == ProtocolError::connection_closed);
96✔
189
}
96✔
190

191

192
class ServerFile;
193
class ServerImpl;
194
class HTTPConnection;
195
class SyncConnection;
196
class Session;
197

198

199
using Formatter = util::ResettableExpandableBufferOutputStream;
200
using OutputBuffer = util::ResettableExpandableBufferOutputStream;
201

202
using ProtocolVersionRange = std::pair<int, int>;
203

204
class MiscBuffers {
205
public:
206
    Formatter formatter;
207
    OutputBuffer download_message;
208

209
    using ProtocolVersionRanges = std::vector<ProtocolVersionRange>;
210
    ProtocolVersionRanges protocol_version_ranges;
211

212
    std::vector<char> compress;
213

214
    MiscBuffers()
215
    {
7,968✔
216
        formatter.imbue(std::locale::classic());
7,968✔
217
        download_message.imbue(std::locale::classic());
7,968✔
218
    }
7,968✔
219
};
220

221

222
struct DownloadCache {
223
    std::unique_ptr<char[]> body;
224
    std::size_t uncompressed_body_size;
225
    std::size_t compressed_body_size;
226
    bool body_is_compressed;
227
    version_type end_version;
228
    DownloadCursor download_progress;
229
    std::uint_fast64_t downloadable_bytes;
230
    std::size_t num_changesets;
231
    std::size_t accum_original_size;
232
    std::size_t accum_compacted_size;
233
};
234

235

236
// An unblocked work unit is comprised of one Work object for each of the files
237
// that contribute work to the work unit, generally one reference file and a
238
// number of partial files.
239
class Work {
240
public:
241
    // In general, primary work is all forms of modifying work, including file
242
    // deletion.
243
    bool has_primary_work = false;
244

245
    // Only for reference files
246
    bool might_produce_new_sync_version = false;
247

248
    bool produced_new_realm_version = false;
249
    bool produced_new_sync_version = false;
250
    bool expired_reference_version = false;
251

252
    // True if, and only if changesets_from_downstream contains at least one
253
    // changeset.
254
    bool have_changesets_from_downstream = false;
255

256
    FileIdentAllocSlots file_ident_alloc_slots;
257
    std::vector<std::unique_ptr<char[]>> changeset_buffers;
258
    IntegratableChangesets changesets_from_downstream;
259

260
    VersionInfo version_info;
261

262
    // Result of integration of changesets from downstream clients
263
    IntegrationResult integration_result;
264

265
    void reset() noexcept
266
    {
38,378✔
267
        has_primary_work = false;
38,378✔
268

19,216✔
269
        might_produce_new_sync_version = false;
38,378✔
270

19,216✔
271
        produced_new_realm_version = false;
38,378✔
272
        produced_new_sync_version = false;
38,378✔
273
        expired_reference_version = false;
38,378✔
274
        have_changesets_from_downstream = false;
38,378✔
275

19,216✔
276
        file_ident_alloc_slots.clear();
38,378✔
277
        changeset_buffers.clear();
38,378✔
278
        changesets_from_downstream.clear();
38,378✔
279

19,216✔
280
        version_info = {};
38,378✔
281
        integration_result = {};
38,378✔
282
    }
38,378✔
283
};
284

285

286
class WorkerState {
287
public:
288
    FileIdentAllocSlots file_ident_alloc_slots;
289
    util::ScratchMemory scratch_memory;
290
    bool use_file_cache = true;
291
    std::unique_ptr<ServerHistory> reference_hist;
292
    DBRef reference_sg;
293
};
294

295

296
// ============================ SessionQueue ============================
297

298
class SessionQueue {
299
public:
300
    void push_back(Session*) noexcept;
301
    Session* pop_front() noexcept;
302
    void clear() noexcept;
303

304
private:
305
    Session* m_back = nullptr;
306
};
307

308

309
// ============================ FileIdentReceiver ============================
310

311
class FileIdentReceiver {
312
public:
313
    virtual void receive_file_ident(SaltedFileIdent) = 0;
314

315
protected:
316
    ~FileIdentReceiver() {}
5,644✔
317
};
318

319

320
// ============================ WorkerBox =============================
321

322
class WorkerBox {
323
public:
324
    using JobType = util::UniqueFunction<void(WorkerState&)>;
325
    void add_work(WorkerState& state, JobType job)
326
    {
×
327
        std::unique_lock<std::mutex> lock(m_mutex);
×
328
        if (m_jobs.size() >= m_queue_limit) {
×
329
            // Once we have many queued jobs, it is better to use this thread to run a new job
×
330
            // than to queue it.
×
331
            run_a_job(lock, state, job);
×
332
        }
×
333
        else {
×
334
            // Create worker threads on demand (if all existing threads are active):
×
335
            if (m_threads.size() < m_max_num_threads && m_active >= m_threads.size()) {
×
336
                m_threads.emplace_back([this]() {
×
337
                    WorkerState state;
×
338
                    state.use_file_cache = false;
×
339
                    JobType the_job;
×
340
                    std::unique_lock<std::mutex> lock(m_mutex);
×
341
                    for (;;) {
×
342
                        while (m_jobs.empty() && !m_finish_up)
×
343
                            m_changes.wait(lock);
×
344
                        if (m_finish_up)
×
345
                            break; // terminate thread
×
346
                        the_job = std::move(m_jobs.back());
×
347
                        m_jobs.pop_back();
×
348
                        run_a_job(lock, state, the_job);
×
349
                        m_changes.notify_all();
×
350
                    }
×
351
                });
×
352
            }
×
353

×
354
            // Submit the job for execution:
×
355
            m_jobs.emplace_back(std::move(job));
×
356
            m_changes.notify_all();
×
357
        }
×
358
    }
×
359

360
    // You should call wait_completion() before trying to destroy a WorkerBox to get proper
361
    // propagation of exceptions.
362
    void wait_completion(WorkerState& state)
363
    {
×
364
        std::unique_lock<std::mutex> lock(m_mutex);
×
365
        while (!m_jobs.empty() || m_active > 0) {
×
366
            if (!m_jobs.empty()) { // if possible, make this thread participate in running m_jobs
×
367
                JobType the_job = std::move(m_jobs.back());
×
368
                m_jobs.pop_back();
×
369
                run_a_job(lock, state, the_job);
×
370
            }
×
371
            else {
×
372
                m_changes.wait(lock);
×
373
            }
×
374
        }
×
375
        if (m_epr) {
×
376
            std::rethrow_exception(m_epr);
×
377
        }
×
378
    }
×
379

380
    WorkerBox(unsigned int num_threads)
381
    {
×
382
        m_queue_limit = num_threads * 10; // fudge factor for job size variation
×
383
        m_max_num_threads = num_threads;
×
384
    }
×
385

386
    ~WorkerBox()
387
    {
×
388
        {
×
389
            std::unique_lock<std::mutex> lock(m_mutex);
×
390
            m_finish_up = true;
×
391
            m_changes.notify_all();
×
392
        }
×
393
        for (auto& e : m_threads)
×
394
            e.join();
×
395
    }
×
396

397
private:
398
    std::mutex m_mutex;
399
    std::condition_variable m_changes;
400
    std::vector<std::thread> m_threads;
401
    std::vector<JobType> m_jobs;
402
    unsigned int m_active = 0;
403
    bool m_finish_up = false;
404
    unsigned int m_queue_limit = 0;
405
    unsigned int m_max_num_threads = 0;
406
    std::exception_ptr m_epr;
407

408
    void run_a_job(std::unique_lock<std::mutex>& lock, WorkerState& state, JobType& job)
409
    {
×
410
        ++m_active;
×
411
        lock.unlock();
×
412
        try {
×
413
            job(state);
×
414
            lock.lock();
×
415
        }
×
416
        catch (...) {
×
417
            lock.lock();
×
418
            if (!m_epr)
×
419
                m_epr = std::current_exception();
×
420
        }
×
421
        --m_active;
×
422
    }
×
423
};
424

425

426
// ============================ ServerFile ============================
427

428
class ServerFile : public util::RefCountBase {
429
public:
430
    util::PrefixLogger logger;
431

432
    // Logger to be used by the worker thread
433
    util::PrefixLogger wlogger;
434

435
    ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path, std::string real_path,
436
               bool disable_sync_to_disk);
437
    ~ServerFile() noexcept;
438

439
    void initialize();
440
    void activate();
441

442
    ServerImpl& get_server() noexcept
443
    {
40,274✔
444
        return m_server;
40,274✔
445
    }
40,274✔
446

447
    const std::string& get_real_path() const noexcept
448
    {
×
449
        return m_file.realm_path;
×
450
    }
×
451

452
    const std::string& get_virt_path() const noexcept
453
    {
×
454
        return m_file.virt_path;
×
455
    }
×
456

457
    ServerFileAccessCache::File& access()
458
    {
50,174✔
459
        return m_file.access(); // Throws
50,174✔
460
    }
50,174✔
461

462
    ServerFileAccessCache::File& worker_access()
463
    {
38,274✔
464
        return m_worker_file.access(); // Throws
38,274✔
465
    }
38,274✔
466

467
    version_type get_realm_version() const noexcept
468
    {
×
469
        return m_version_info.realm_version;
×
470
    }
×
471

472
    version_type get_sync_version() const noexcept
473
    {
×
474
        return m_version_info.sync_version.version;
×
475
    }
×
476

477
    SaltedVersion get_salted_sync_version() const noexcept
478
    {
102,898✔
479
        return m_version_info.sync_version;
102,898✔
480
    }
102,898✔
481

482
    DownloadCache& get_download_cache() noexcept;
483

484
    void register_client_access(file_ident_type client_file_ident);
485

486
    using file_ident_request_type = std::int_fast64_t;
487

488
    // Initiate a request for a new client file identifier.
489
    //
490
    // Unless the request is cancelled, the identifier will be delivered to the
491
    // receiver by way of an invocation of
492
    // FileIdentReceiver::receive_file_ident().
493
    //
494
    // FileIdentReceiver::receive_file_ident() is guaranteed to not be called
495
    // until after request_file_ident() has returned (no callback reentrance).
496
    //
497
    // New client file identifiers will be delivered to receivers in the order
498
    // that they were requested.
499
    //
500
    // The returned value is a nonzero integer that can be used to cancel the
501
    // request before the file identifier is delivered using
502
    // cancel_file_ident_request().
503
    auto request_file_ident(FileIdentReceiver&, file_ident_type proxy_file, ClientType) -> file_ident_request_type;
504

505
    // Cancel the specified file identifier request.
506
    //
507
    // It is an error to call this function after the identifier has been
508
    // delivered.
509
    void cancel_file_ident_request(file_ident_request_type) noexcept;
510

511
    void add_unidentified_session(Session*);
512
    void identify_session(Session*, file_ident_type client_file_ident);
513

514
    void remove_unidentified_session(Session*) noexcept;
515
    void remove_identified_session(file_ident_type client_file_ident) noexcept;
516

517
    Session* get_identified_session(file_ident_type client_file_ident) noexcept;
518

519
    bool can_add_changesets_from_downstream() const noexcept;
520
    void add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
521
                                        version_type locked_server_version, const UploadChangeset*,
522
                                        std::size_t num_changesets);
523

524
    // bootstrap_client_session calls the function of same name in server_history
525
    // but corrects the upload_progress with information from pending
526
    // integratable changesets. A situation can occur where a client terminates
527
    // a session and starts a new session and re-uploads changesets that are known
528
    // by the ServerFile object but not by the ServerHistory.
529
    BootstrapError bootstrap_client_session(SaltedFileIdent client_file_ident, DownloadCursor download_progress,
530
                                            SaltedVersion server_version, ClientType client_type,
531
                                            UploadCursor& upload_progress, version_type& locked_server_version,
532
                                            Logger&);
533

534
    // NOTE: This function is executed by the worker thread
535
    void worker_process_work_unit(WorkerState&);
536

537
    void recognize_external_change();
538

539
private:
540
    ServerImpl& m_server;
541
    ServerFileAccessCache::Slot m_file;
542

543
    // In general, `m_version_info` refers to the last snapshot of the Realm
544
    // file that is supposed to be visible to remote peers engaging in regular
545
    // Realm file synchronization.
546
    VersionInfo m_version_info;
547

548
    file_ident_request_type m_last_file_ident_request = 0;
549

550
    // The set of sessions whose client file identifier is not yet known, i.e.,
551
    // those for which an IDENT message has not yet been received,
552
    std::set<Session*> m_unidentified_sessions;
553

554
    // A map of the sessions whose client file identifier is known, i.e, those
555
    // for which an IDENT message has been received.
556
    std::map<file_ident_type, Session*> m_identified_sessions;
557

558
    // Used when a file used as partial view wants to allocate a client file
559
    // identifier from the reference Realm.
560
    file_ident_request_type m_file_ident_request = 0;
561

562
    struct FileIdentRequestInfo {
563
        FileIdentReceiver* receiver;
564
        file_ident_type proxy_file;
565
        ClientType client_type;
566
    };
567

568
    // When nonempty, it counts towards outstanding blocked work (see
569
    // `m_has_blocked_work`).
570
    std::map<file_ident_request_type, FileIdentRequestInfo> m_file_ident_requests;
571

572
    // Changesets received from the downstream clients, and waiting to be
573
    // integrated, as well as information about the clients progress in terms of
574
    // integrating changesets received from the server. When nonempty, it counts
575
    // towards outstanding blocked work (see `m_has_blocked_work`).
576
    //
577
    // At any given time, the set of changesets from a particular client-side
578
    // file may be comprised of changesets received via distinct sessions.
579
    //
580
    // See also `m_num_changesets_from_downstream`.
581
    IntegratableChangesets m_changesets_from_downstream;
582

583
    // Keeps track of the number of changesets in `m_changesets_from_downstream`.
584
    //
585
    // Its purpose is also to initialize
586
    // `Work::have_changesets_from_downstream`.
587
    std::size_t m_num_changesets_from_downstream = 0;
588

589
    // The total size, in bytes, of the changesets that were received from
590
    // clients, are targeting this file, and are currently part of the blocked
591
    // work unit.
592
    //
593
    // Together with `m_unblocked_changesets_from_downstream_byte_size`, its
594
    // purpose is to allow the server to keep track of the accumulated size of
595
    // changesets being processed, or waiting to be processed (metric
596
    // `upload.pending.bytes`) (see
597
    // ServerImpl::inc_byte_size_for_pending_downstream_changesets()).
598
    //
599
    // Its purpose is also to enable the "very poor man's" backpressure solution
600
    // (see can_add_changesets_from_downstream()).
601
    std::size_t m_blocked_changesets_from_downstream_byte_size = 0;
602

603
    // Same as `m_blocked_changesets_from_downstream_byte_size` but for the
604
    // currently unblocked work unit.
605
    std::size_t m_unblocked_changesets_from_downstream_byte_size = 0;
606

607
    // When nonempty, it counts towards outstanding blocked work (see
608
    // `m_has_blocked_work`).
609
    std::vector<std::string> m_permission_changes;
610

611
    // True iff this file, or any of its associated partial files (when
612
    // applicable), has a nonzero amount of outstanding work that is currently
613
    // held back from being passed to the worker thread because a previously
614
    // accumulated chunk of work related to this file is currently in progress.
615
    bool m_has_blocked_work = false;
616

617
    // A file, that is not a partial file, is considered *exposed to the worker
618
    // thread* from the point in time where it is submitted to the worker
619
    // (Worker::enqueue()) and up until the point in time where
620
    // group_postprocess_stage_1() starts to execute. A partial file is
621
    // considered *exposed to the worker thread* precisely when the associated
622
    // reference file is exposed to the worker thread, but only if it was in
623
    // `m_reference_file->m_work.partial_files` at the point in time where the
624
    // reference file was passed to the worker.
625
    //
626
    // While this file is exposed to the worker thread, all members of `m_work`
627
    // other than `changesets_from_downstream` may be accessed and modified by
628
    // the worker thread only.
629
    //
630
    // While this file is exposed to the worker thread,
631
    // `m_work.changesets_from_downstream` may be accessed by all threads, but
632
    // must not be modified by any thread. This special status of
633
    // `m_work.changesets_from_downstream` is required to allow
634
    // ServerFile::bootstrap_client_session() to read from it at any time.
635
    Work m_work;
636

637
    // For reference files, set to true when work is unblocked, and reset back
638
    // to false when the work finalization process completes
639
    // (group_postprocess_stage_3()). Always zero for partial files.
640
    bool m_has_work_in_progress = 0;
641

642
    // This one must only be accessed by the worker thread.
643
    //
644
    // More specifically, `m_worker_file.access()` must only be called by the
645
    // worker thread, and if it was ever called, it must be closed by the worker
646
    // thread before the ServerFile object is destroyed, if destruction happens
647
    // before the destruction of the server object itself.
648
    ServerFileAccessCache::Slot m_worker_file;
649

650
    std::vector<std::int_fast64_t> m_deleting_connections;
651

652
    DownloadCache m_download_cache;
653

654
    void on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes);
655
    void on_work_added();
656
    void group_unblock_work();
657
    void unblock_work();
658

659
    /// Resume history scanning in all sessions bound to this file. To be called
660
    /// after a successfull integration of a changeset.
661
    void resume_download() noexcept;
662

663
    // NOTE: These functions are executed by the worker thread
664
    void worker_allocate_file_identifiers();
665
    bool worker_integrate_changes_from_downstream(WorkerState&);
666
    ServerHistory& get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
667
                                           DBRef& sg_ptr);
668
    ServerHistory& get_reference_file_history(WorkerState& state);
669
    void group_postprocess_stage_1();
670
    void group_postprocess_stage_2();
671
    void group_postprocess_stage_3();
672
    void group_finalize_work_stage_1();
673
    void group_finalize_work_stage_2();
674
    void finalize_work_stage_1();
675
    void finalize_work_stage_2();
676
};
677

678

679
inline DownloadCache& ServerFile::get_download_cache() noexcept
680
{
40,212✔
681
    return m_download_cache;
40,212✔
682
}
40,212✔
683

684
inline void ServerFile::group_finalize_work_stage_1()
685
{
37,926✔
686
    finalize_work_stage_1(); // Throws
37,926✔
687
}
37,926✔
688

689
inline void ServerFile::group_finalize_work_stage_2()
690
{
37,926✔
691
    finalize_work_stage_2(); // Throws
37,926✔
692
}
37,926✔
693

694

695
// ============================ Worker ============================
696

697
// All write transaction on server-side Realm files performed on behalf of the
698
// server, must be performed by the worker thread, not the network event loop
699
// thread. This is to ensure that the network event loop thread never gets
700
// blocked waiting for the worker thread to end a long running write
701
// transaction.
702
//
703
// FIXME: Currently, the event loop thread does perform a number of write
704
// transactions, but only on subtier nodes of a star topology server cluster.
705
class Worker : public ServerHistory::Context {
706
public:
707
    util::PrefixLogger logger;
708

709
    explicit Worker(ServerImpl&);
710

711
    ServerFileAccessCache& get_file_access_cache() noexcept;
712

713
    void enqueue(ServerFile*);
714

715
    // Overriding members of ServerHistory::Context
716
    std::mt19937_64& server_history_get_random() noexcept override final;
717
    sync::Transformer& get_transformer() override final;
718
    util::Buffer<char>& get_transform_buffer() override final;
719

720
private:
721
    ServerImpl& m_server;
722
    std::mt19937_64 m_random;
723
    const std::unique_ptr<Transformer> m_transformer;
724
    util::Buffer<char> m_transform_buffer;
725
    ServerFileAccessCache m_file_access_cache;
726

727
    util::Mutex m_mutex;
728
    util::CondVar m_cond; // Protected by `m_mutex`
729

730
    bool m_stop = false; // Protected by `m_mutex`
731

732
    util::CircularBuffer<ServerFile*> m_queue; // Protected by `m_mutex`
733

734
    WorkerState m_state;
735

736
    void run();
737
    void stop() noexcept;
738

739
    friend class util::ThreadExecGuardWithParent<Worker, ServerImpl>;
740
};
741

742

743
inline ServerFileAccessCache& Worker::get_file_access_cache() noexcept
744
{
1,088✔
745
    return m_file_access_cache;
1,088✔
746
}
1,088✔
747

748

749
// ============================ ServerImpl ============================
750

751
class ServerImpl : public ServerImplBase, public ServerHistory::Context {
752
public:
753
    std::uint_fast64_t errors_seen = 0;
754

755
    std::atomic<milliseconds_type> m_par_time;
756
    std::atomic<milliseconds_type> m_seq_time;
757

758
    util::Mutex last_client_accesses_mutex;
759

760
    const std::shared_ptr<util::Logger> logger_ptr;
761
    util::Logger& logger;
762

763
    network::Service& get_service() noexcept
764
    {
78,900✔
765
        return m_service;
78,900✔
766
    }
78,900✔
767

768
    const network::Service& get_service() const noexcept
769
    {
×
770
        return m_service;
×
771
    }
×
772

773
    std::mt19937_64& get_random() noexcept
774
    {
63,462✔
775
        return m_random;
63,462✔
776
    }
63,462✔
777

778
    const Server::Config& get_config() const noexcept
779
    {
122,962✔
780
        return m_config;
122,962✔
781
    }
122,962✔
782

783
    std::size_t get_max_upload_backlog() const noexcept
784
    {
43,870✔
785
        return m_max_upload_backlog;
43,870✔
786
    }
43,870✔
787

788
    const std::string& get_root_dir() const noexcept
789
    {
5,644✔
790
        return m_root_dir;
5,644✔
791
    }
5,644✔
792

793
    network::ssl::Context& get_ssl_context() noexcept
794
    {
48✔
795
        return *m_ssl_context;
48✔
796
    }
48✔
797

798
    const AccessControl& get_access_control() const noexcept
799
    {
×
800
        return m_access_control;
×
801
    }
×
802

803
    ProtocolVersionRange get_protocol_version_range() const noexcept
804
    {
1,912✔
805
        return m_protocol_version_range;
1,912✔
806
    }
1,912✔
807

808
    ServerProtocol& get_server_protocol() noexcept
809
    {
131,244✔
810
        return m_server_protocol;
131,244✔
811
    }
131,244✔
812

813
    compression::CompressMemoryArena& get_compress_memory_arena() noexcept
814
    {
4,226✔
815
        return m_compress_memory_arena;
4,226✔
816
    }
4,226✔
817

818
    MiscBuffers& get_misc_buffers() noexcept
819
    {
46,352✔
820
        return m_misc_buffers;
46,352✔
821
    }
46,352✔
822

823
    int_fast64_t get_current_server_session_ident() const noexcept
824
    {
×
825
        return m_current_server_session_ident;
×
826
    }
×
827

828
    util::ScratchMemory& get_scratch_memory() noexcept
829
    {
×
830
        return m_scratch_memory;
×
831
    }
×
832

833
    Worker& get_worker() noexcept
834
    {
40,466✔
835
        return m_worker;
40,466✔
836
    }
40,466✔
837

838
    void get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
839
    {
×
840
        parallel_section = m_par_time;
×
841
        sequential_section = m_seq_time;
×
842
    }
×
843

844
    ServerImpl(const std::string& root_dir, util::Optional<sync::PKey>, Server::Config);
845
    ~ServerImpl() noexcept;
846

847
    void start();
848

849
    void start(std::string listen_address, std::string listen_port, bool reuse_address)
850
    {
672✔
851
        m_config.listen_address = listen_address;
672✔
852
        m_config.listen_port = listen_port;
672✔
853
        m_config.reuse_address = reuse_address;
672✔
854

338✔
855
        start(); // Throws
672✔
856
    }
672✔
857

858
    network::Endpoint listen_endpoint() const
859
    {
7,974✔
860
        return m_acceptor.local_endpoint();
7,974✔
861
    }
7,974✔
862

863
    void run();
864
    void stop() noexcept;
865

866
    void remove_http_connection(std::int_fast64_t conn_id) noexcept;
867

868
    void add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn);
869
    void remove_sync_connection(int_fast64_t connection_id);
870

871
    size_t get_number_of_http_connections()
872
    {
×
873
        return m_http_connections.size();
×
874
    }
×
875

876
    size_t get_number_of_sync_connections()
877
    {
×
878
        return m_sync_connections.size();
×
879
    }
×
880

881
    bool is_sync_stopped()
882
    {
40,294✔
883
        return m_sync_stopped;
40,294✔
884
    }
40,294✔
885

886
    const std::set<std::string>& get_realm_names() const noexcept
887
    {
×
888
        return m_realm_names;
×
889
    }
×
890

891
    // virt_path must be valid when get_or_create_file() is called.
892
    util::bind_ptr<ServerFile> get_or_create_file(const std::string& virt_path)
893
    {
5,614✔
894
        util::bind_ptr<ServerFile> file = get_file(virt_path);
5,614✔
895
        if (REALM_LIKELY(file))
5,614✔
896
            return file;
4,964✔
897

438✔
898
        _impl::VirtualPathComponents virt_path_components =
1,088✔
899
            _impl::parse_virtual_path(m_root_dir, virt_path); // Throws
1,088✔
900
        REALM_ASSERT(virt_path_components.is_valid);
1,088✔
901

438✔
902
        _impl::make_dirs(m_root_dir, virt_path); // Throws
1,088✔
903
        m_realm_names.insert(virt_path);         // Throws
1,088✔
904
        {
1,088✔
905
            bool disable_sync_to_disk = m_config.disable_sync_to_disk;
1,088✔
906
            file.reset(new ServerFile(*this, m_file_access_cache, virt_path, virt_path_components.real_realm_path,
1,088✔
907
                                      disable_sync_to_disk)); // Throws
1,088✔
908
        }
1,088✔
909

438✔
910
        file->initialize();
1,088✔
911
        m_files[virt_path] = file; // Throws
1,088✔
912
        file->activate();          // Throws
1,088✔
913
        return file;
1,088✔
914
    }
1,088✔
915

916
    std::unique_ptr<ServerHistory> make_history_for_path()
917
    {
×
918
        return std::make_unique<ServerHistory>(*this);
×
919
    }
×
920

921
    util::bind_ptr<ServerFile> get_file(const std::string& virt_path) noexcept
922
    {
5,614✔
923
        auto i = m_files.find(virt_path);
5,614✔
924
        if (REALM_LIKELY(i != m_files.end()))
5,614✔
925
            return i->second;
4,964✔
926
        return {};
1,088✔
927
    }
1,088✔
928

929
    // Returns the number of seconds since the Epoch of
930
    // std::chrono::system_clock.
931
    std::chrono::system_clock::time_point token_expiration_clock_now() const noexcept
932
    {
×
933
        if (REALM_UNLIKELY(m_config.token_expiration_clock))
×
934
            return m_config.token_expiration_clock->now();
×
935
        return std::chrono::system_clock::now();
×
936
    }
×
937

938
    void set_connection_reaper_timeout(milliseconds_type);
939

940
    void close_connections();
941
    bool map_virtual_to_real_path(const std::string& virt_path, std::string& real_path);
942

943
    void recognize_external_change(const std::string& virt_path);
944

945
    void stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
946
                                                  milliseconds_type timeout);
947

948
    // Server global outputbuffers that can be reused.
949
    // The server is single threaded, so there are no
950
    // synchronization issues.
951
    // output_buffers_count is equal to the
952
    // maximum number of buffers needed at any point.
953
    static constexpr int output_buffers_count = 1;
954
    OutputBuffer output_buffers[output_buffers_count];
955

956
    bool is_load_balancing_allowed() const
957
    {
×
958
        return m_allow_load_balancing;
×
959
    }
×
960

961
    // inc_byte_size_for_pending_downstream_changesets() must be called by
962
    // ServerFile objects when changesets from downstream clients have been
963
    // received.
964
    //
965
    // dec_byte_size_for_pending_downstream_changesets() must be called by
966
    // ServerFile objects when changesets from downstream clients have been
967
    // processed or discarded.
968
    //
969
    // ServerImpl uses this information to keep a running tally (metric
970
    // `upload.pending.bytes`) of the total byte size of pending changesets from
971
    // downstream clients.
972
    //
973
    // These functions must be called on the network thread.
974
    void inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
975
    void dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size);
976

977
    // Overriding member functions in _impl::ServerHistory::Context
978
    std::mt19937_64& server_history_get_random() noexcept override final;
979
    Transformer& get_transformer() noexcept override final;
980
    util::Buffer<char>& get_transform_buffer() noexcept override final;
981

982
private:
983
    Server::Config m_config;
984
    network::Service m_service;
985
    std::mt19937_64 m_random;
986
    const std::size_t m_max_upload_backlog;
987
    const std::string m_root_dir;
988
    const AccessControl m_access_control;
989
    const ProtocolVersionRange m_protocol_version_range;
990

991
    // The reserved files will be closed in situations where the server
992
    // runs out of file descriptors.
993
    std::unique_ptr<File> m_reserved_files[5];
994

995
    // The set of all Realm files known to this server, represented by their
996
    // virtual path.
997
    //
998
    // INVARIANT: If a Realm file is in the servers directory (i.e., it would be
999
    // reported by an invocation of _impl::get_realm_names()), then the
1000
    // corresponding virtual path is in `m_realm_names`, assuming no external
1001
    // file-system level intervention.
1002
    std::set<std::string> m_realm_names;
1003

1004
    std::unique_ptr<network::ssl::Context> m_ssl_context;
1005
    ServerFileAccessCache m_file_access_cache;
1006
    Worker m_worker;
1007
    std::map<std::string, util::bind_ptr<ServerFile>> m_files; // Key is virtual path
1008
    network::Acceptor m_acceptor;
1009
    std::int_fast64_t m_next_conn_id = 0;
1010
    std::unique_ptr<HTTPConnection> m_next_http_conn;
1011
    network::Endpoint m_next_http_conn_endpoint;
1012
    std::map<std::int_fast64_t, std::unique_ptr<HTTPConnection>> m_http_connections;
1013
    std::map<std::int_fast64_t, std::unique_ptr<SyncConnection>> m_sync_connections;
1014
    ServerProtocol m_server_protocol;
1015
    compression::CompressMemoryArena m_compress_memory_arena;
1016
    MiscBuffers m_misc_buffers;
1017
    std::unique_ptr<Transformer> m_transformer;
1018
    util::Buffer<char> m_transform_buffer;
1019
    int_fast64_t m_current_server_session_ident;
1020
    Optional<network::DeadlineTimer> m_connection_reaper_timer;
1021
    bool m_allow_load_balancing = false;
1022

1023
    util::Mutex m_mutex;
1024

1025
    bool m_stopped = false; // Protected by `m_mutex`
1026

1027
    // m_sync_stopped is used by stop_sync_and_wait_for_backup_completion().
1028
    // When m_sync_stopped is true, the server does not perform any sync.
1029
    bool m_sync_stopped = false;
1030

1031
    std::atomic<bool> m_running{false}; // Debugging facility
1032

1033
    std::size_t m_pending_changesets_from_downstream_byte_size = 0;
1034

1035
    util::CondVar m_wait_or_service_stopped_cond; // Protected by `m_mutex`
1036

1037
    util::ScratchMemory m_scratch_memory;
1038

1039
    void listen();
1040
    void initiate_accept();
1041
    void handle_accept(std::error_code);
1042

1043
    void reap_connections();
1044
    void initiate_connection_reaper_timer(milliseconds_type timeout);
1045
    void do_close_connections();
1046

1047
    static std::size_t determine_max_upload_backlog(Server::Config& config) noexcept
1048
    {
7,968✔
1049
        if (config.max_upload_backlog == 0)
7,968✔
1050
            return 4294967295; // 4GiB - 1 (largest allowable number on a 32-bit platform)
7,968✔
1051
        return config.max_upload_backlog;
×
1052
    }
×
1053

1054
    static ProtocolVersionRange determine_protocol_version_range(Server::Config& config)
1055
    {
7,968✔
1056
        const int actual_min = ServerImplBase::get_oldest_supported_protocol_version();
7,968✔
1057
        const int actual_max = get_current_protocol_version();
7,968✔
1058
        static_assert(actual_min <= actual_max, "");
7,968✔
1059
        int min = actual_min;
7,968✔
1060
        int max = actual_max;
7,968✔
1061
        if (config.max_protocol_version != 0 && config.max_protocol_version < max) {
7,968!
1062
            if (config.max_protocol_version < min)
×
1063
                throw Server::NoSupportedProtocolVersions();
×
1064
            max = config.max_protocol_version;
×
1065
        }
×
1066
        return {min, max};
7,968✔
1067
    }
7,968✔
1068

1069
    void do_recognize_external_change(const std::string& virt_path);
1070

1071
    void do_stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_complete)> completion_handler,
1072
                                                     milliseconds_type timeout);
1073
};
1074

1075
// ============================ SyncConnection ============================
1076

1077
class SyncConnection : public websocket::Config {
1078
public:
1079
    const std::shared_ptr<util::Logger> logger_ptr;
1080
    util::Logger& logger;
1081

1082
    // Clients with sync protocol version 8 or greater support pbs->flx migration
1083
    static constexpr int PBS_FLX_MIGRATION_PROTOCOL_VERSION = 8;
1084
    // Clients with sync protocol version less than 10 do not support log messages
1085
    static constexpr int SERVER_LOG_PROTOCOL_VERSION = 10;
1086

1087
    SyncConnection(ServerImpl& serv, std::int_fast64_t id, std::unique_ptr<network::Socket>&& socket,
1088
                   std::unique_ptr<network::ssl::Stream>&& ssl_stream,
1089
                   std::unique_ptr<network::ReadAheadBuffer>&& read_ahead_buffer, int client_protocol_version,
1090
                   std::string client_user_agent, std::string remote_endpoint, std::string appservices_request_id)
1091
        : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(id), serv.logger_ptr)} // Throws
1092
        , logger{*logger_ptr}
1093
        , m_server{serv}
1094
        , m_id{id}
1095
        , m_socket{std::move(socket)}
1096
        , m_ssl_stream{std::move(ssl_stream)}
1097
        , m_read_ahead_buffer{std::move(read_ahead_buffer)}
1098
        , m_websocket{*this}
1099
        , m_client_protocol_version{client_protocol_version}
1100
        , m_client_user_agent{std::move(client_user_agent)}
1101
        , m_remote_endpoint{std::move(remote_endpoint)}
1102
        , m_appservices_request_id{std::move(appservices_request_id)}
1103
    {
1,912✔
1104
        // Make the output buffer stream throw std::bad_alloc if it fails to
942✔
1105
        // expand the buffer
942✔
1106
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
1,912✔
1107

942✔
1108
        network::Service& service = m_server.get_service();
1,912✔
1109
        auto handler = [this](Status status) {
95,532✔
1110
            if (!status.is_ok())
95,532✔
1111
                return;
×
1112
            if (!m_is_sending)
95,532✔
1113
                send_next_message(); // Throws
41,696✔
1114
        };
95,532✔
1115
        m_send_trigger = std::make_unique<Trigger<network::Service>>(&service, std::move(handler)); // Throws
1,912✔
1116
    }
1,912✔
1117

1118
    ~SyncConnection() noexcept;
1119

1120
    ServerImpl& get_server() noexcept
1121
    {
112,602✔
1122
        return m_server;
112,602✔
1123
    }
112,602✔
1124

1125
    ServerProtocol& get_server_protocol() noexcept
1126
    {
131,246✔
1127
        return m_server.get_server_protocol();
131,246✔
1128
    }
131,246✔
1129

1130
    int get_client_protocol_version()
1131
    {
101,128✔
1132
        return m_client_protocol_version;
101,128✔
1133
    }
101,128✔
1134

1135
    const std::string& get_client_user_agent() const noexcept
1136
    {
5,616✔
1137
        return m_client_user_agent;
5,616✔
1138
    }
5,616✔
1139

1140
    const std::string& get_remote_endpoint() const noexcept
1141
    {
5,616✔
1142
        return m_remote_endpoint;
5,616✔
1143
    }
5,616✔
1144

1145
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept final
1146
    {
1,912✔
1147
        return logger_ptr;
1,912✔
1148
    }
1,912✔
1149

1150
    std::mt19937_64& websocket_get_random() noexcept final override
1151
    {
62,378✔
1152
        return m_server.get_random();
62,378✔
1153
    }
62,378✔
1154

1155
    bool websocket_binary_message_received(const char* data, size_t size) final override
1156
    {
69,300✔
1157
        using sf = _impl::SimulatedFailure;
69,300✔
1158
        if (sf::check_trigger(sf::sync_server__read_head)) {
69,300✔
1159
            // Suicide
190✔
1160
            read_error(sf::sync_server__read_head);
412✔
1161
            return false;
412✔
1162
        }
412✔
1163
        // After a connection level error has occurred, all incoming messages
34,590✔
1164
        // will be ignored. By continuing to read until end of input, the server
34,590✔
1165
        // is able to know when the client closes the connection, which in
34,590✔
1166
        // general means that is has received the ERROR message.
34,590✔
1167
        if (REALM_LIKELY(!m_is_closing)) {
68,888✔
1168
            m_last_activity_at = steady_clock_now();
68,866✔
1169
            handle_message_received(data, size);
68,866✔
1170
        }
68,866✔
1171
        return true;
68,888✔
1172
    }
68,888✔
1173

1174
    bool websocket_ping_message_received(const char* data, size_t size) final override
1175
    {
×
1176
        if (REALM_LIKELY(!m_is_closing)) {
×
1177
            m_last_activity_at = steady_clock_now();
×
1178
            handle_ping_received(data, size);
×
1179
        }
×
1180
        return true;
×
1181
    }
×
1182

1183
    void async_write(const char* data, size_t size, websocket::WriteCompletionHandler handler) final override
1184
    {
62,374✔
1185
        if (m_ssl_stream) {
62,374✔
1186
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
70✔
1187
        }
70✔
1188
        else {
62,304✔
1189
            m_socket->async_write(data, size, std::move(handler)); // Throws
62,304✔
1190
        }
62,304✔
1191
    }
62,374✔
1192

1193
    void async_read(char* buffer, size_t size, websocket::ReadCompletionHandler handler) final override
1194
    {
209,376✔
1195
        if (m_ssl_stream) {
209,376✔
1196
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
170✔
1197
        }
170✔
1198
        else {
209,206✔
1199
            m_socket->async_read(buffer, size, *m_read_ahead_buffer, std::move(handler)); // Throws
209,206✔
1200
        }
209,206✔
1201
    }
209,376✔
1202

1203
    void async_read_until(char* buffer, size_t size, char delim,
1204
                          websocket::ReadCompletionHandler handler) final override
1205
    {
×
1206
        if (m_ssl_stream) {
×
1207
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1208
                                           std::move(handler)); // Throws
×
1209
        }
×
1210
        else {
×
1211
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
×
1212
                                       std::move(handler)); // Throws
×
1213
        }
×
1214
    }
×
1215

1216
    void websocket_read_error_handler(std::error_code ec) final override
1217
    {
694✔
1218
        read_error(ec);
694✔
1219
    }
694✔
1220

1221
    void websocket_write_error_handler(std::error_code ec) final override
1222
    {
×
1223
        write_error(ec);
×
1224
    }
×
1225

1226
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
1227
                                           const std::string_view*) final override
1228
    {
×
1229
        // WebSocket class has already logged a message for this error
1230
        close_due_to_error(ec); // Throws
×
1231
    }
×
1232

1233
    void websocket_protocol_error_handler(std::error_code ec) final override
1234
    {
×
1235
        logger.error("WebSocket protocol error (%1): %2", ec, ec.message()); // Throws
×
1236
        close_due_to_error(ec);                                              // Throws
×
1237
    }
×
1238

1239
    void websocket_handshake_completion_handler(const HTTPHeaders&) final override
1240
    {
×
1241
        // This is not called since we handle HTTP request in handle_request_for_sync()
1242
        REALM_TERMINATE("websocket_handshake_completion_handler should not have been called");
×
1243
    }
×
1244

1245
    int_fast64_t get_id() const noexcept
1246
    {
×
1247
        return m_id;
×
1248
    }
×
1249

1250
    network::Socket& get_socket() noexcept
1251
    {
×
1252
        return *m_socket;
×
1253
    }
×
1254

1255
    void initiate();
1256

1257
    // Commits suicide
1258
    template <class... Params>
1259
    void terminate(Logger::Level, const char* log_message, Params... log_params);
1260

1261
    // Commits suicide
1262
    void terminate_if_dead(SteadyTimePoint now);
1263

1264
    void enlist_to_send(Session*) noexcept;
1265

1266
    // Sessions should get the output_buffer and insert a message, after which
1267
    // they call initiate_write_output_buffer().
1268
    OutputBuffer& get_output_buffer()
1269
    {
62,382✔
1270
        m_output_buffer.reset();
62,382✔
1271
        return m_output_buffer;
62,382✔
1272
    }
62,382✔
1273

1274
    // More advanced memory strategies can be implemented if needed.
1275
    void release_output_buffer() {}
62,174✔
1276

1277
    // When this function is called, the connection will initiate a write with
1278
    // its output_buffer. Sessions use this method.
1279
    void initiate_write_output_buffer();
1280

1281
    void initiate_pong_output_buffer();
1282

1283
    void handle_protocol_error(Status status);
1284

1285
    void receive_bind_message(session_ident_type, std::string path, std::string signed_user_token,
1286
                              bool need_client_file_ident, bool is_subserver);
1287

1288
    void receive_ident_message(session_ident_type, file_ident_type client_file_ident,
1289
                               salt_type client_file_ident_salt, version_type scan_server_version,
1290
                               version_type scan_client_version, version_type latest_server_version,
1291
                               salt_type latest_server_version_salt);
1292

1293
    void receive_upload_message(session_ident_type, version_type progress_client_version,
1294
                                version_type progress_server_version, version_type locked_server_version,
1295
                                const UploadChangesets&);
1296

1297
    void receive_mark_message(session_ident_type, request_ident_type);
1298

1299
    void receive_unbind_message(session_ident_type);
1300

1301
    void receive_ping(milliseconds_type timestamp, milliseconds_type rtt);
1302

1303
    void receive_error_message(session_ident_type, int error_code, std::string_view error_body);
1304

1305
    void protocol_error(ProtocolError, Session* = nullptr);
1306

1307
    void initiate_soft_close();
1308

1309
    void discard_session(session_ident_type) noexcept;
1310

1311
    void send_log_message(util::Logger::Level level, const std::string&& message, session_ident_type sess_ident = 0,
1312
                          std::optional<std::string> co_id = std::nullopt);
1313

1314
private:
1315
    ServerImpl& m_server;
1316
    const int_fast64_t m_id;
1317
    std::unique_ptr<network::Socket> m_socket;
1318
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1319
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1320

1321
    websocket::Socket m_websocket;
1322
    std::unique_ptr<char[]> m_input_body_buffer;
1323
    OutputBuffer m_output_buffer;
1324
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
1325

1326
    // The protocol version in use by the connected client.
1327
    const int m_client_protocol_version;
1328

1329
    // The user agent description passed by the client.
1330
    const std::string m_client_user_agent;
1331

1332
    const std::string m_remote_endpoint;
1333

1334
    const std::string m_appservices_request_id;
1335

1336
    // A queue of sessions that have enlisted for an opportunity to send a
1337
    // message. Sessions will be served in the order that they enlist. A session
1338
    // can only occur once in this queue (linked list). If the queue is not
1339
    // empty, and no message is currently being written to the socket, the first
1340
    // session is taken out of the queue, and then granted an opportunity to
1341
    // send a message.
1342
    //
1343
    // Sessions will never be destroyed while in this queue. This is ensured
1344
    // because the connection owns the sessions that are associated with it, and
1345
    // the connection only removes a session from m_sessions at points in time
1346
    // where that session is guaranteed to not be in m_sessions_enlisted_to_send
1347
    // (Connection::send_next_message() and Connection::~Connection()).
1348
    SessionQueue m_sessions_enlisted_to_send;
1349

1350
    Session* m_receiving_session = nullptr;
1351

1352
    bool m_is_sending = false;
1353
    bool m_is_closing = false;
1354

1355
    bool m_send_pong = false;
1356
    bool m_sending_pong = false;
1357

1358
    std::unique_ptr<Trigger<network::Service>> m_send_trigger;
1359

1360
    milliseconds_type m_last_ping_timestamp = 0;
1361

1362
    // If `m_is_closing` is true, this is the time at which `m_is_closing` was
1363
    // set to true (initiation of soft close). Otherwise, if no messages have
1364
    // been received from the client, this is the time at which the connection
1365
    // object was initiated (completion of WebSocket handshake). Otherwise this
1366
    // is the time at which the last message was received from the client.
1367
    SteadyTimePoint m_last_activity_at;
1368

1369
    // These are initialized by do_initiate_soft_close().
1370
    //
1371
    // With recent versions of the protocol (when the version is greater than,
1372
    // or equal to 23), `m_error_session_ident` is always zero.
1373
    ProtocolError m_error_code = {};
1374
    session_ident_type m_error_session_ident = 0;
1375

1376
    struct LogMessage {
1377
        session_ident_type sess_ident;
1378
        util::Logger::Level level;
1379
        std::string message;
1380
        std::optional<std::string> co_id;
1381
    };
1382

1383
    std::mutex m_log_mutex;
1384
    std::queue<LogMessage> m_log_messages;
1385

1386
    static std::string make_logger_prefix(int_fast64_t id)
1387
    {
1,912✔
1388
        std::ostringstream out;
1,912✔
1389
        out.imbue(std::locale::classic());
1,912✔
1390
        out << "Sync Connection[" << id << "]: "; // Throws
1,912✔
1391
        return out.str();                         // Throws
1,912✔
1392
    }
1,912✔
1393

1394
    // The return value of handle_message_received() designates whether
1395
    // message processing should continue. If the connection object is
1396
    // destroyed during execution of handle_message_received(), the return
1397
    // value must be false.
1398
    void handle_message_received(const char* data, size_t size);
1399

1400
    void handle_ping_received(const char* data, size_t size);
1401

1402
    void send_next_message();
1403
    void send_pong(milliseconds_type timestamp);
1404
    void send_log_message(const LogMessage& log_msg);
1405

1406
    void handle_write_output_buffer();
1407
    void handle_pong_output_buffer();
1408

1409
    void initiate_write_error(ProtocolError, session_ident_type);
1410
    void handle_write_error();
1411

1412
    void do_initiate_soft_close(ProtocolError, session_ident_type);
1413
    void read_error(std::error_code);
1414
    void write_error(std::error_code);
1415

1416
    void close_due_to_close_by_client(std::error_code);
1417
    void close_due_to_error(std::error_code);
1418

1419
    void terminate_sessions();
1420

1421
    void bad_session_ident(const char* message_type, session_ident_type);
1422
    void message_after_unbind(const char* message_type, session_ident_type);
1423
    void message_before_ident(const char* message_type, session_ident_type);
1424
};
1425

1426

1427
inline void SyncConnection::read_error(std::error_code ec)
1428
{
1,106✔
1429
    REALM_ASSERT(ec != util::error::operation_aborted);
1,106✔
1430
    if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
1,106✔
1431
        // Suicide
400✔
1432
        close_due_to_close_by_client(ec); // Throws
694✔
1433
        return;
694✔
1434
    }
694✔
1435
    if (ec == util::MiscExtErrors::delim_not_found) {
412✔
1436
        logger.error("Input message head delimited not found"); // Throws
×
1437
        protocol_error(ProtocolError::limits_exceeded);         // Throws
×
1438
        return;
×
1439
    }
×
1440

190✔
1441
    logger.error("Reading failed: %1", ec.message()); // Throws
412✔
1442

190✔
1443
    // Suicide
190✔
1444
    close_due_to_error(ec); // Throws
412✔
1445
}
412✔
1446

1447
inline void SyncConnection::write_error(std::error_code ec)
1448
{
×
1449
    REALM_ASSERT(ec != util::error::operation_aborted);
×
1450
    if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1451
        // Suicide
1452
        close_due_to_close_by_client(ec); // Throws
×
1453
        return;
×
1454
    }
×
1455
    logger.error("Writing failed: %1", ec.message()); // Throws
×
1456

1457
    // Suicide
1458
    close_due_to_error(ec); // Throws
×
1459
}
×
1460

1461

1462
// ============================ HTTPConnection ============================
1463

1464
std::string g_user_agent = "User-Agent";
1465

1466
class HTTPConnection {
1467
public:
1468
    const std::shared_ptr<Logger> logger_ptr;
1469
    util::Logger& logger;
1470

1471
    HTTPConnection(ServerImpl& serv, int_fast64_t id, bool is_ssl)
1472
        : logger_ptr{std::make_shared<PrefixLogger>(make_logger_prefix(id), serv.logger_ptr)} // Throws
1473
        , logger{*logger_ptr}
1474
        , m_server{serv}
1475
        , m_id{id}
1476
        , m_socket{new network::Socket{serv.get_service()}} // Throws
1477
        , m_read_ahead_buffer{new network::ReadAheadBuffer} // Throws
1478
        , m_http_server{*this, logger_ptr}
1479
    {
9,910✔
1480
        // Make the output buffer stream throw std::bad_alloc if it fails to
4,890✔
1481
        // expand the buffer
4,890✔
1482
        m_output_buffer.exceptions(std::ios_base::badbit | std::ios_base::failbit);
9,910✔
1483

4,890✔
1484
        if (is_ssl) {
9,910✔
1485
            using namespace network::ssl;
48✔
1486
            Context& ssl_context = serv.get_ssl_context();
48✔
1487
            m_ssl_stream = std::make_unique<Stream>(*m_socket, ssl_context,
48✔
1488
                                                    Stream::server); // Throws
48✔
1489
        }
48✔
1490
    }
9,910✔
1491

1492
    ServerImpl& get_server() noexcept
1493
    {
×
1494
        return m_server;
×
1495
    }
×
1496

1497
    int_fast64_t get_id() const noexcept
1498
    {
1,942✔
1499
        return m_id;
1,942✔
1500
    }
1,942✔
1501

1502
    network::Socket& get_socket() noexcept
1503
    {
11,530✔
1504
        return *m_socket;
11,530✔
1505
    }
11,530✔
1506

1507
    template <class H>
1508
    void async_write(const char* data, size_t size, H handler)
1509
    {
1,932✔
1510
        if (m_ssl_stream) {
1,932✔
1511
            m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
14✔
1512
        }
14✔
1513
        else {
1,918✔
1514
            m_socket->async_write(data, size, std::move(handler)); // Throws
1,918✔
1515
        }
1,918✔
1516
    }
1,932✔
1517

1518
    template <class H>
1519
    void async_read(char* buffer, size_t size, H handler)
1520
    {
8✔
1521
        if (m_ssl_stream) {
8✔
1522
            m_ssl_stream->async_read(buffer, size, *m_read_ahead_buffer,
×
1523
                                     std::move(handler)); // Throws
×
1524
        }
×
1525
        else {
8✔
1526
            m_socket->async_read(buffer, size, *m_read_ahead_buffer,
8✔
1527
                                 std::move(handler)); // Throws
8✔
1528
        }
8✔
1529
    }
8✔
1530

1531
    template <class H>
1532
    void async_read_until(char* buffer, size_t size, char delim, H handler)
1533
    {
17,268✔
1534
        if (m_ssl_stream) {
17,268✔
1535
            m_ssl_stream->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
126✔
1536
                                           std::move(handler)); // Throws
126✔
1537
        }
126✔
1538
        else {
17,142✔
1539
            m_socket->async_read_until(buffer, size, delim, *m_read_ahead_buffer,
17,142✔
1540
                                       std::move(handler)); // Throws
17,142✔
1541
        }
17,142✔
1542
    }
17,268✔
1543

1544
    void initiate(std::string remote_endpoint)
1545
    {
1,942✔
1546
        m_last_activity_at = steady_clock_now();
1,942✔
1547
        m_remote_endpoint = std::move(remote_endpoint);
1,942✔
1548

958✔
1549
        logger.detail("Connection from %1", m_remote_endpoint); // Throws
1,942✔
1550

958✔
1551
        if (m_ssl_stream) {
1,942✔
1552
            initiate_ssl_handshake(); // Throws
24✔
1553
        }
24✔
1554
        else {
1,918✔
1555
            initiate_http(); // Throws
1,918✔
1556
        }
1,918✔
1557
    }
1,942✔
1558

1559
    void respond_200_ok()
1560
    {
×
1561
        handle_text_response(HTTPStatus::Ok, "OK"); // Throws
×
1562
    }
×
1563

1564
    void respond_404_not_found()
1565
    {
×
1566
        handle_text_response(HTTPStatus::NotFound, "Not found"); // Throws
×
1567
    }
×
1568

1569
    void respond_503_service_unavailable()
1570
    {
×
1571
        handle_text_response(HTTPStatus::ServiceUnavailable, "Service unavailable"); // Throws
×
1572
    }
×
1573

1574
    // Commits suicide
1575
    template <class... Params>
1576
    void terminate(Logger::Level log_level, const char* log_message, Params... log_params)
1577
    {
28✔
1578
        logger.log(log_level, log_message, log_params...); // Throws
28✔
1579
        m_ssl_stream.reset();
28✔
1580
        m_socket.reset();
28✔
1581
        m_server.remove_http_connection(m_id); // Suicide
28✔
1582
    }
28✔
1583

1584
    // Commits suicide
1585
    void terminate_if_dead(SteadyTimePoint now)
1586
    {
×
1587
        milliseconds_type time = steady_duration(m_last_activity_at, now);
×
1588
        const Server::Config& config = m_server.get_config();
×
1589
        if (m_is_sending) {
×
1590
            if (time >= config.http_response_timeout) {
×
1591
                // Suicide
1592
                terminate(Logger::Level::detail,
×
1593
                          "HTTP connection closed (request timeout)"); // Throws
×
1594
            }
×
1595
        }
×
1596
        else {
×
1597
            if (time >= config.http_request_timeout) {
×
1598
                // Suicide
1599
                terminate(Logger::Level::detail,
×
1600
                          "HTTP connection closed (response timeout)"); // Throws
×
1601
            }
×
1602
        }
×
1603
    }
×
1604

1605
    std::string get_appservices_request_id() const
1606
    {
1,932✔
1607
        return m_appservices_request_id.to_string();
1,932✔
1608
    }
1,932✔
1609

1610
private:
1611
    ServerImpl& m_server;
1612
    const int_fast64_t m_id;
1613
    const ObjectId m_appservices_request_id = ObjectId::gen();
1614
    std::unique_ptr<network::Socket> m_socket;
1615
    std::unique_ptr<network::ssl::Stream> m_ssl_stream;
1616
    std::unique_ptr<network::ReadAheadBuffer> m_read_ahead_buffer;
1617
    HTTPServer<HTTPConnection> m_http_server;
1618
    OutputBuffer m_output_buffer;
1619
    bool m_is_sending = false;
1620
    SteadyTimePoint m_last_activity_at;
1621
    std::string m_remote_endpoint;
1622
    int m_negotiated_protocol_version = 0;
1623

1624
    void initiate_ssl_handshake()
1625
    {
24✔
1626
        auto handler = [this](std::error_code ec) {
22✔
1627
            if (ec != util::error::operation_aborted)
22✔
1628
                handle_ssl_handshake(ec); // Throws
22✔
1629
        };
22✔
1630
        m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
1631
    }
24✔
1632

1633
    void handle_ssl_handshake(std::error_code ec)
1634
    {
22✔
1635
        if (ec) {
22✔
1636
            logger.error("SSL handshake error (%1): %2", ec, ec.message()); // Throws
8✔
1637
            close_due_to_error(ec);                                         // Throws
8✔
1638
            return;
8✔
1639
        }
8✔
1640
        initiate_http(); // Throws
14✔
1641
    }
14✔
1642

1643
    void initiate_http()
1644
    {
1,932✔
1645
        logger.debug("Connection initiates HTTP receipt");
1,932✔
1646

952✔
1647
        auto handler = [this](HTTPRequest request, std::error_code ec) {
1,932✔
1648
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
1,932✔
1649
                return;
952✔
1650
            if (REALM_UNLIKELY(ec == HTTPParserError::MalformedRequest)) {
1,932✔
1651
                logger.error("Malformed HTTP request");
×
1652
                close_due_to_error(ec); // Throws
×
1653
                return;
×
1654
            }
×
1655
            if (REALM_UNLIKELY(ec == HTTPParserError::BadRequest)) {
1,932✔
1656
                logger.error("Bad HTTP request");
8✔
1657
                const char* body = "The HTTP request was corrupted";
8✔
1658
                handle_400_bad_request(body); // Throws
8✔
1659
                return;
8✔
1660
            }
8✔
1661
            if (REALM_UNLIKELY(ec)) {
1,924✔
1662
                read_error(ec); // Throws
×
1663
                return;
×
1664
            }
×
1665
            handle_http_request(std::move(request)); // Throws
1,924✔
1666
        };
1,924✔
1667
        m_http_server.async_receive_request(std::move(handler)); // Throws
1,932✔
1668
    }
1,932✔
1669

1670
    void handle_http_request(const HTTPRequest& request)
1671
    {
1,924✔
1672
        StringData path = request.path;
1,924✔
1673

948✔
1674
        logger.debug("HTTP request received, request = %1", request);
1,924✔
1675

948✔
1676
        m_is_sending = true;
1,924✔
1677
        m_last_activity_at = steady_clock_now();
1,924✔
1678

948✔
1679
        // FIXME: When thinking of this function as a switching device, it seem
948✔
1680
        // wrong that it requires a `%2F` after `/realm-sync/`. If `%2F` is
948✔
1681
        // supposed to be mandatory, then that check ought to be delegated to
948✔
1682
        // handle_request_for_sync(), as that will yield a sharper separation of
948✔
1683
        // concerns.
948✔
1684
        if (path == "/realm-sync" || path.begins_with("/realm-sync?") || path.begins_with("/realm-sync/%2F")) {
1,924✔
1685
            handle_request_for_sync(request); // Throws
1,912✔
1686
        }
1,912✔
1687
        else {
12✔
1688
            handle_404_not_found(request); // Throws
12✔
1689
        }
12✔
1690
    }
1,924✔
1691

1692
    void handle_request_for_sync(const HTTPRequest& request)
1693
    {
1,912✔
1694
        if (m_server.is_sync_stopped()) {
1,912✔
1695
            logger.debug("Attempt to create a sync connection to a server that has been "
×
1696
                         "stopped"); // Throws
×
1697
            handle_503_service_unavailable(request, "The server does not accept sync "
×
1698
                                                    "connections"); // Throws
×
1699
            return;
×
1700
        }
×
1701

942✔
1702
        util::Optional<std::string> sec_websocket_protocol = websocket::read_sec_websocket_protocol(request);
1,912✔
1703

942✔
1704
        // Figure out whether there are any protocol versions supported by both
942✔
1705
        // the client and the server, and if so, choose the newest one of them.
942✔
1706
        MiscBuffers& misc_buffers = m_server.get_misc_buffers();
1,912✔
1707
        using ProtocolVersionRanges = MiscBuffers::ProtocolVersionRanges;
1,912✔
1708
        ProtocolVersionRanges& protocol_version_ranges = misc_buffers.protocol_version_ranges;
1,912✔
1709
        {
1,912✔
1710
            protocol_version_ranges.clear();
1,912✔
1711
            util::MemoryInputStream in;
1,912✔
1712
            in.imbue(std::locale::classic());
1,912✔
1713
            in.unsetf(std::ios_base::skipws);
1,912✔
1714
            std::string_view value;
1,912✔
1715
            if (sec_websocket_protocol)
1,912✔
1716
                value = *sec_websocket_protocol;
1,912✔
1717
            HttpListHeaderValueParser parser{value};
1,912✔
1718
            std::string_view elem;
1,912✔
1719
            while (parser.next(elem)) {
19,120✔
1720
                // FIXME: Use std::string_view::begins_with() in C++20.
8,478✔
1721
                const StringData protocol{elem};
17,208✔
1722
                std::string_view prefix;
17,208✔
1723
                if (protocol.begins_with(get_pbs_websocket_protocol_prefix()))
17,208✔
1724
                    prefix = get_pbs_websocket_protocol_prefix();
17,208✔
1725
                else if (protocol.begins_with(get_old_pbs_websocket_protocol_prefix()))
×
1726
                    prefix = get_old_pbs_websocket_protocol_prefix();
×
1727
                if (!prefix.empty()) {
17,208✔
1728
                    auto parse_version = [&](std::string_view str) {
17,208✔
1729
                        in.set_buffer(str.data(), str.data() + str.size());
17,208✔
1730
                        int version = 0;
17,208✔
1731
                        in >> version;
17,208✔
1732
                        if (REALM_LIKELY(in && in.eof() && version >= 0))
17,208✔
1733
                            return version;
17,208✔
1734
                        return -1;
×
1735
                    };
×
1736
                    int min, max;
17,208✔
1737
                    std::string_view range = elem.substr(prefix.size());
17,208✔
1738
                    auto i = range.find('-');
17,208✔
1739
                    if (i != std::string_view::npos) {
17,208✔
1740
                        min = parse_version(range.substr(0, i));
×
1741
                        max = parse_version(range.substr(i + 1));
×
1742
                    }
×
1743
                    else {
17,208✔
1744
                        min = parse_version(range);
17,208✔
1745
                        max = min;
17,208✔
1746
                    }
17,208✔
1747
                    if (REALM_LIKELY(min >= 0 && max >= 0 && min <= max)) {
17,208✔
1748
                        protocol_version_ranges.emplace_back(min, max); // Throws
17,208✔
1749
                        continue;
17,208✔
1750
                    }
17,208✔
1751
                    logger.error("Protocol version negotiation failed: Client sent malformed "
×
1752
                                 "specification of supported protocol versions: '%1'",
×
1753
                                 elem); // Throws
×
1754
                    handle_400_bad_request("Protocol version negotiation failed: Malformed "
×
1755
                                           "specification of supported protocol "
×
1756
                                           "versions\n"); // Throws
×
1757
                    return;
×
1758
                }
×
1759
                logger.warn("Unrecognized protocol token in HTTP response header "
×
1760
                            "Sec-WebSocket-Protocol: '%1'",
×
1761
                            elem); // Throws
×
1762
            }
×
1763
            if (protocol_version_ranges.empty()) {
1,912✔
1764
                logger.error("Protocol version negotiation failed: Client did not send a "
×
1765
                             "specification of supported protocol versions"); // Throws
×
1766
                handle_400_bad_request("Protocol version negotiation failed: Missing specification "
×
1767
                                       "of supported protocol versions\n"); // Throws
×
1768
                return;
×
1769
            }
×
1770
        }
1,912✔
1771
        {
1,912✔
1772
            ProtocolVersionRange server_range = m_server.get_protocol_version_range();
1,912✔
1773
            int server_min = server_range.first;
1,912✔
1774
            int server_max = server_range.second;
1,912✔
1775
            int best_match = 0;
1,912✔
1776
            int overall_client_min = std::numeric_limits<int>::max();
1,912✔
1777
            int overall_client_max = std::numeric_limits<int>::min();
1,912✔
1778
            for (const auto& range : protocol_version_ranges) {
17,208✔
1779
                int client_min = range.first;
17,208✔
1780
                int client_max = range.second;
17,208✔
1781
                if (client_max >= server_min && client_min <= server_max) {
17,208✔
1782
                    // Overlap
8,478✔
1783
                    int version = std::min(client_max, server_max);
17,208✔
1784
                    if (version > best_match) {
17,208✔
1785
                        best_match = version;
1,912✔
1786
                    }
1,912✔
1787
                }
17,208✔
1788
                if (client_min < overall_client_min)
17,208✔
1789
                    overall_client_min = client_min;
17,208✔
1790
                if (client_max > overall_client_max)
17,208✔
1791
                    overall_client_max = client_max;
1,912✔
1792
            }
17,208✔
1793
            Formatter& formatter = misc_buffers.formatter;
1,912✔
1794
            if (REALM_UNLIKELY(best_match == 0)) {
1,912✔
1795
                const char* elaboration = "No version supported by both client and server";
×
1796
                const char* identifier_hint = nullptr;
×
1797
                if (overall_client_max < server_min) {
×
1798
                    // Client is too old
1799
                    elaboration = "Client is too old for server";
×
1800
                    identifier_hint = "CLIENT_TOO_OLD";
×
1801
                }
×
1802
                else if (overall_client_min > server_max) {
×
1803
                    // Client is too new
1804
                    elaboration = "Client is too new for server";
×
1805
                    identifier_hint = "CLIENT_TOO_NEW";
×
1806
                }
×
1807
                auto format_ranges = [&](const auto& list) {
×
1808
                    bool nonfirst = false;
×
1809
                    for (auto range : list) {
×
1810
                        if (nonfirst)
×
1811
                            formatter << ", "; // Throws
×
1812
                        int min = range.first, max = range.second;
×
1813
                        REALM_ASSERT(min <= max);
×
1814
                        formatter << min;
×
1815
                        if (max != min)
×
1816
                            formatter << "-" << max;
×
1817
                        nonfirst = true;
×
1818
                    }
×
1819
                };
×
1820
                using Range = ProtocolVersionRange;
×
1821
                formatter.reset();
×
1822
                format_ranges(protocol_version_ranges); // Throws
×
1823
                logger.error("Protocol version negotiation failed: %1 "
×
1824
                             "(client supports: %2)",
×
1825
                             elaboration, std::string_view(formatter.data(), formatter.size())); // Throws
×
1826
                formatter.reset();
×
1827
                formatter << "Protocol version negotiation failed: "
×
1828
                             ""
×
1829
                          << elaboration << ".\n\n";                                   // Throws
×
1830
                formatter << "Server supports: ";                                      // Throws
×
1831
                format_ranges(std::initializer_list<Range>{{server_min, server_max}}); // Throws
×
1832
                formatter << "\n";                                                     // Throws
×
1833
                formatter << "Client supports: ";                                      // Throws
×
1834
                format_ranges(protocol_version_ranges);                                // Throws
×
1835
                formatter << "\n\n";                                                   // Throws
×
1836
                formatter << "REALM_SYNC_PROTOCOL_MISMATCH";                           // Throws
×
1837
                if (identifier_hint)
×
1838
                    formatter << ":" << identifier_hint;                      // Throws
×
1839
                formatter << "\n";                                            // Throws
×
1840
                handle_400_bad_request({formatter.data(), formatter.size()}); // Throws
×
1841
                return;
×
1842
            }
×
1843
            m_negotiated_protocol_version = best_match;
1,912✔
1844
            logger.debug("Received: Sync HTTP request (negotiated_protocol_version=%1)",
1,912✔
1845
                         m_negotiated_protocol_version); // Throws
1,912✔
1846
            formatter.reset();
1,912✔
1847
        }
1,912✔
1848

942✔
1849
        std::string sec_websocket_protocol_2;
1,912✔
1850
        {
1,912✔
1851
            std::string_view prefix =
1,912✔
1852
                m_negotiated_protocol_version < SyncConnection::PBS_FLX_MIGRATION_PROTOCOL_VERSION
1,912✔
1853
                    ? get_old_pbs_websocket_protocol_prefix()
942✔
1854
                    : get_pbs_websocket_protocol_prefix();
1,912✔
1855
            std::ostringstream out;
1,912✔
1856
            out.imbue(std::locale::classic());
1,912✔
1857
            out << prefix << m_negotiated_protocol_version; // Throws
1,912✔
1858
            sec_websocket_protocol_2 = std::move(out).str();
1,912✔
1859
        }
1,912✔
1860

942✔
1861
        std::error_code ec;
1,912✔
1862
        util::Optional<HTTPResponse> response =
1,912✔
1863
            websocket::make_http_response(request, sec_websocket_protocol_2, ec); // Throws
1,912✔
1864

942✔
1865
        if (ec) {
1,912✔
1866
            if (ec == websocket::HttpError::bad_request_header_upgrade) {
×
1867
                logger.error("There must be a header of the form 'Upgrade: websocket'");
×
1868
            }
×
1869
            else if (ec == websocket::HttpError::bad_request_header_connection) {
×
1870
                logger.error("There must be a header of the form 'Connection: Upgrade'");
×
1871
            }
×
1872
            else if (ec == websocket::HttpError::bad_request_header_websocket_version) {
×
1873
                logger.error("There must be a header of the form 'Sec-WebSocket-Version: 13'");
×
1874
            }
×
1875
            else if (ec == websocket::HttpError::bad_request_header_websocket_key) {
×
1876
                logger.error("The header Sec-WebSocket-Key is missing");
×
1877
            }
×
1878

1879
            logger.error("The HTTP request with the error is:\n%1", request);
×
1880
            logger.error("Check the proxy configuration and make sure that the "
×
1881
                         "HTTP request is a valid Websocket request.");
×
1882
            close_due_to_error(ec);
×
1883
            return;
×
1884
        }
×
1885
        REALM_ASSERT(response);
1,912✔
1886
        add_common_http_response_headers(*response);
1,912✔
1887

942✔
1888
        std::string user_agent;
1,912✔
1889
        {
1,912✔
1890
            auto i = request.headers.find(g_user_agent);
1,912✔
1891
            if (i != request.headers.end())
1,912✔
1892
                user_agent = i->second; // Throws (copy)
1,912✔
1893
        }
1,912✔
1894

942✔
1895
        auto handler = [protocol_version = m_negotiated_protocol_version, user_agent = std::move(user_agent),
1,912✔
1896
                        this](std::error_code ec) {
1,912✔
1897
            // If the operation is aborted, the socket object may have been destroyed.
942✔
1898
            if (ec != util::error::operation_aborted) {
1,912✔
1899
                if (ec) {
1,912✔
1900
                    write_error(ec);
×
1901
                    return;
×
1902
                }
×
1903

942✔
1904
                std::unique_ptr<SyncConnection> sync_conn = std::make_unique<SyncConnection>(
1,912✔
1905
                    m_server, m_id, std::move(m_socket), std::move(m_ssl_stream), std::move(m_read_ahead_buffer),
1,912✔
1906
                    protocol_version, std::move(user_agent), std::move(m_remote_endpoint),
1,912✔
1907
                    get_appservices_request_id()); // Throws
1,912✔
1908
                SyncConnection& sync_conn_ref = *sync_conn;
1,912✔
1909
                m_server.add_sync_connection(m_id, std::move(sync_conn));
1,912✔
1910
                m_server.remove_http_connection(m_id);
1,912✔
1911
                sync_conn_ref.initiate();
1,912✔
1912
            }
1,912✔
1913
        };
1,912✔
1914
        m_http_server.async_send_response(*response, std::move(handler));
1,912✔
1915
    }
1,912✔
1916

1917
    void handle_text_response(HTTPStatus http_status, std::string_view body)
1918
    {
20✔
1919
        std::string body_2 = std::string(body); // Throws
20✔
1920

10✔
1921
        HTTPResponse response;
20✔
1922
        response.status = http_status;
20✔
1923
        add_common_http_response_headers(response);
20✔
1924
        response.headers["Connection"] = "close";
20✔
1925

10✔
1926
        if (!body_2.empty()) {
20✔
1927
            response.headers["Content-Length"] = util::to_string(body_2.size());
20✔
1928
            response.body = std::move(body_2);
20✔
1929
        }
20✔
1930

10✔
1931
        auto handler = [this](std::error_code ec) {
20✔
1932
            if (REALM_UNLIKELY(ec == util::error::operation_aborted))
20✔
1933
                return;
10✔
1934
            if (REALM_UNLIKELY(ec)) {
20✔
1935
                write_error(ec);
×
1936
                return;
×
1937
            }
×
1938
            terminate(Logger::Level::detail, "HTTP connection closed"); // Throws
20✔
1939
        };
20✔
1940
        m_http_server.async_send_response(response, std::move(handler));
20✔
1941
    }
20✔
1942

1943
    void handle_400_bad_request(std::string_view body)
1944
    {
8✔
1945
        logger.detail("400 Bad Request");
8✔
1946
        handle_text_response(HTTPStatus::BadRequest, body); // Throws
8✔
1947
    }
8✔
1948

1949
    void handle_404_not_found(const HTTPRequest&)
1950
    {
12✔
1951
        logger.detail("404 Not Found"); // Throws
12✔
1952
        handle_text_response(HTTPStatus::NotFound,
12✔
1953
                             "Realm sync server\n\nPage not found\n"); // Throws
12✔
1954
    }
12✔
1955

1956
    void handle_503_service_unavailable(const HTTPRequest&, std::string_view message)
1957
    {
×
1958
        logger.debug("503 Service Unavailable");                       // Throws
×
1959
        handle_text_response(HTTPStatus::ServiceUnavailable, message); // Throws
×
1960
    }
×
1961

1962
    void add_common_http_response_headers(HTTPResponse& response)
1963
    {
1,932✔
1964
        response.headers["Server"] = "RealmSync/" REALM_VERSION_STRING; // Throws
1,932✔
1965
        if (m_negotiated_protocol_version < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
1,932✔
1966
            // This isn't a real X-Appservices-Request-Id, but it should be enough to test with
10✔
1967
            response.headers["X-Appservices-Request-Id"] = get_appservices_request_id();
20✔
1968
        }
20✔
1969
    }
1,932✔
1970

1971
    void read_error(std::error_code ec)
1972
    {
×
1973
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1974
        if (ec == util::MiscExtErrors::end_of_input || ec == util::error::connection_reset) {
×
1975
            // Suicide
1976
            close_due_to_close_by_client(ec); // Throws
×
1977
            return;
×
1978
        }
×
1979
        if (ec == util::MiscExtErrors::delim_not_found) {
×
1980
            logger.error("Input message head delimited not found"); // Throws
×
1981
            close_due_to_error(ec);                                 // Throws
×
1982
            return;
×
1983
        }
×
1984

1985
        logger.error("Reading failed: %1", ec.message()); // Throws
×
1986

1987
        // Suicide
1988
        close_due_to_error(ec); // Throws
×
1989
    }
×
1990

1991
    void write_error(std::error_code ec)
1992
    {
×
1993
        REALM_ASSERT(ec != util::error::operation_aborted);
×
1994
        if (ec == util::error::broken_pipe || ec == util::error::connection_reset) {
×
1995
            // Suicide
1996
            close_due_to_close_by_client(ec); // Throws
×
1997
            return;
×
1998
        }
×
1999
        logger.error("Writing failed: %1", ec.message()); // Throws
×
2000

2001
        // Suicide
2002
        close_due_to_error(ec); // Throws
×
2003
    }
×
2004

2005
    void close_due_to_close_by_client(std::error_code ec)
2006
    {
×
2007
        auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
×
2008
        // Suicide
2009
        terminate(log_level, "HTTP connection closed by client: %1", ec.message()); // Throws
×
2010
    }
×
2011

2012
    void close_due_to_error(std::error_code ec)
2013
    {
8✔
2014
        // Suicide
6✔
2015
        terminate(Logger::Level::error, "HTTP connection closed due to error: %1",
8✔
2016
                  ec.message()); // Throws
8✔
2017
    }
8✔
2018

2019
    static std::string make_logger_prefix(int_fast64_t id)
2020
    {
9,910✔
2021
        std::ostringstream out;
9,910✔
2022
        out.imbue(std::locale::classic());
9,910✔
2023
        out << "HTTP Connection[" << id << "]: "; // Throws
9,910✔
2024
        return out.str();                         // Throws
9,910✔
2025
    }
9,910✔
2026
};
2027

2028

2029
class DownloadHistoryEntryHandler : public ServerHistory::HistoryEntryHandler {
2030
public:
2031
    std::size_t num_changesets = 0;
2032
    std::size_t accum_original_size = 0;
2033
    std::size_t accum_compacted_size = 0;
2034

2035
    DownloadHistoryEntryHandler(ServerProtocol& protocol, OutputBuffer& buffer, util::Logger& logger) noexcept
2036
        : m_protocol{protocol}
2037
        , m_buffer{buffer}
2038
        , m_logger{logger}
2039
    {
40,214✔
2040
    }
40,214✔
2041

2042
    void handle(version_type server_version, const HistoryEntry& entry, size_t original_size) override
2043
    {
41,482✔
2044
        version_type client_version = entry.remote_version;
41,482✔
2045
        ServerProtocol::ChangesetInfo info{server_version, client_version, entry, original_size};
41,482✔
2046
        m_protocol.insert_single_changeset_download_message(m_buffer, info, m_logger); // Throws
41,482✔
2047
        ++num_changesets;
41,482✔
2048
        accum_original_size += original_size;
41,482✔
2049
        accum_compacted_size += entry.changeset.size();
41,482✔
2050
    }
41,482✔
2051

2052
private:
2053
    ServerProtocol& m_protocol;
2054
    OutputBuffer& m_buffer;
2055
    util::Logger& m_logger;
2056
};
2057

2058

2059
// ============================ Session ============================
2060

2061
//                        Need cli-   Send     IDENT     UNBIND              ERROR
2062
//   Protocol             ent file    IDENT    message   message   Error     message
2063
//   state                identifier  message  received  received  occurred  sent
2064
// ---------------------------------------------------------------------------------
2065
//   AllocatingIdent      yes         yes      no        no        no        no
2066
//   SendIdent            no          yes      no        no        no        no
2067
//   WaitForIdent         no          no       no        no        no        no
2068
//   WaitForUnbind        maybe       no       yes       no        no        no
2069
//   SendError            maybe       maybe    maybe     no        yes       no
2070
//   WaitForUnbindErr     maybe       maybe    maybe     no        yes       yes
2071
//   SendUnbound          maybe       maybe    maybe     yes       maybe     no
2072
//
2073
//
2074
//   Condition                      Expression
2075
// ----------------------------------------------------------
2076
//   Need client file identifier    need_client_file_ident()
2077
//   Send IDENT message             must_send_ident_message()
2078
//   IDENT message received         ident_message_received()
2079
//   UNBIND message received        unbind_message_received()
2080
//   Error occurred                 error_occurred()
2081
//   ERROR message sent             m_error_message_sent
2082
//
2083
//
2084
//   Protocol
2085
//   state                Will send              Can receive
2086
// -----------------------------------------------------------------------
2087
//   AllocatingIdent      none                   UNBIND
2088
//   SendIdent            IDENT                  UNBIND
2089
//   WaitForIdent         none                   IDENT, UNBIND
2090
//   WaitForUnbind        DOWNLOAD, TRANSACT,    UPLOAD, TRANSACT, MARK,
2091
//                        MARK, ALLOC            ALLOC, UNBIND
2092
//   SendError            ERROR                  any
2093
//   WaitForUnbindErr     none                   any
2094
//   SendUnbound          UNBOUND                none
2095
//
2096
class Session final : private FileIdentReceiver {
2097
public:
2098
    util::PrefixLogger logger;
2099

2100
    Session(SyncConnection& conn, session_ident_type session_ident)
2101
        : logger{make_logger_prefix(session_ident), conn.logger_ptr} // Throws
2102
        , m_connection{conn}
2103
        , m_session_ident{session_ident}
2104
    {
5,644✔
2105
    }
5,644✔
2106

2107
    ~Session() noexcept
2108
    {
5,644✔
2109
        REALM_ASSERT(!is_enlisted_to_send());
5,644✔
2110
        detach_from_server_file();
5,644✔
2111
    }
5,644✔
2112

2113
    SyncConnection& get_connection() noexcept
2114
    {
40,232✔
2115
        return m_connection;
40,232✔
2116
    }
40,232✔
2117

2118
    const Optional<std::array<char, 64>>& get_encryption_key()
2119
    {
×
2120
        return m_connection.get_server().get_config().encryption_key;
×
2121
    }
×
2122

2123
    session_ident_type get_session_ident() const noexcept
2124
    {
156✔
2125
        return m_session_ident;
156✔
2126
    }
156✔
2127

2128
    ServerProtocol& get_server_protocol() noexcept
2129
    {
56,304✔
2130
        return m_connection.get_server_protocol();
56,304✔
2131
    }
56,304✔
2132

2133
    bool need_client_file_ident() const noexcept
2134
    {
6,682✔
2135
        return (m_file_ident_request != 0);
6,682✔
2136
    }
6,682✔
2137

2138
    bool must_send_ident_message() const noexcept
2139
    {
4,094✔
2140
        return m_send_ident_message;
4,094✔
2141
    }
4,094✔
2142

2143
    bool ident_message_received() const noexcept
2144
    {
335,384✔
2145
        return m_client_file_ident != 0;
335,384✔
2146
    }
335,384✔
2147

2148
    bool unbind_message_received() const noexcept
2149
    {
338,318✔
2150
        return m_unbind_message_received;
338,318✔
2151
    }
338,318✔
2152

2153
    bool error_occurred() const noexcept
2154
    {
330,070✔
2155
        return int(m_error_code) != 0;
330,070✔
2156
    }
330,070✔
2157

2158
    bool relayed_alloc_request_in_progress() const noexcept
2159
    {
×
2160
        return (need_client_file_ident() || m_allocated_file_ident.ident != 0);
×
2161
    }
×
2162

2163
    // Returns the file identifier (always a nonzero value) of the client side
2164
    // file if ident_message_received() returns true. Otherwise it returns zero.
2165
    file_ident_type get_client_file_ident() const noexcept
2166
    {
×
2167
        return m_client_file_ident;
×
2168
    }
×
2169

2170
    void initiate()
2171
    {
5,644✔
2172
        logger.detail("Session initiated", m_session_ident); // Throws
5,644✔
2173
    }
5,644✔
2174

2175
    void terminate()
2176
    {
4,434✔
2177
        logger.detail("Session terminated", m_session_ident); // Throws
4,434✔
2178
    }
4,434✔
2179

2180
    // Initiate the deactivation process, if it has not been initiated already
2181
    // by the client.
2182
    //
2183
    // IMPORTANT: This function must not be called with protocol versions
2184
    // earlier than 23.
2185
    //
2186
    // The deactivation process will eventually lead to termination of the
2187
    // session.
2188
    //
2189
    // The session will detach itself from the server file when the deactivation
2190
    // process is initiated, regardless of whether it is initiated by the
2191
    // client, or by calling this function.
2192
    void initiate_deactivation(ProtocolError error_code)
2193
    {
78✔
2194
        REALM_ASSERT(is_session_level_error(error_code));
78✔
2195
        REALM_ASSERT(!error_occurred()); // Must only be called once
78✔
2196

40✔
2197
        // If the UNBIND message has been received, then the client has
40✔
2198
        // initiated the deactivation process already.
40✔
2199
        if (REALM_LIKELY(!unbind_message_received())) {
78✔
2200
            detach_from_server_file();
78✔
2201
            m_error_code = error_code;
78✔
2202
            // Protocol state is now SendError
40✔
2203
            ensure_enlisted_to_send();
78✔
2204
            return;
78✔
2205
        }
78✔
2206
        // Protocol state was SendUnbound, and remains unchanged
40✔
2207
    }
78✔
2208

2209
    bool is_enlisted_to_send() const noexcept
2210
    {
268,602✔
2211
        return m_next != nullptr;
268,602✔
2212
    }
268,602✔
2213

2214
    void ensure_enlisted_to_send() noexcept
2215
    {
52,652✔
2216
        if (!is_enlisted_to_send())
52,652✔
2217
            enlist_to_send();
51,078✔
2218
    }
52,652✔
2219

2220
    void enlist_to_send() noexcept
2221
    {
107,426✔
2222
        m_connection.enlist_to_send(this);
107,426✔
2223
    }
107,426✔
2224

2225
    // Overriding memeber function in FileIdentReceiver
2226
    void receive_file_ident(SaltedFileIdent file_ident) override final
2227
    {
1,294✔
2228
        // Protocol state must be AllocatingIdent or WaitForUnbind
552✔
2229
        if (!ident_message_received()) {
1,294✔
2230
            REALM_ASSERT(need_client_file_ident());
1,294✔
2231
            REALM_ASSERT(m_send_ident_message);
1,294✔
2232
        }
1,294✔
2233
        else {
×
2234
            REALM_ASSERT(!m_send_ident_message);
×
2235
        }
×
2236
        REALM_ASSERT(!unbind_message_received());
1,294✔
2237
        REALM_ASSERT(!error_occurred());
1,294✔
2238
        REALM_ASSERT(!m_error_message_sent);
1,294✔
2239

552✔
2240
        m_file_ident_request = 0;
1,294✔
2241
        m_allocated_file_ident = file_ident;
1,294✔
2242

552✔
2243
        // If the protocol state was AllocatingIdent, it is now SendIdent,
552✔
2244
        // otherwise it continues to be WaitForUnbind.
552✔
2245

552✔
2246
        logger.debug("Acquired outbound salted file identifier (%1, %2)", file_ident.ident,
1,294✔
2247
                     file_ident.salt); // Throws
1,294✔
2248

552✔
2249
        ensure_enlisted_to_send();
1,294✔
2250
    }
1,294✔
2251

2252
    // Called by the associated connection object when this session is granted
2253
    // an opportunity to initiate the sending of a message.
2254
    //
2255
    // This function may lead to the destruction of the session object
2256
    // (suicide).
2257
    void send_message()
2258
    {
106,916✔
2259
        if (REALM_LIKELY(!unbind_message_received())) {
106,916✔
2260
            if (REALM_LIKELY(!error_occurred())) {
104,272✔
2261
                if (REALM_LIKELY(ident_message_received())) {
104,194✔
2262
                    // State is WaitForUnbind.
54,920✔
2263
                    bool relayed_alloc = (m_allocated_file_ident.ident != 0);
102,900✔
2264
                    if (REALM_LIKELY(!relayed_alloc)) {
102,902✔
2265
                        // Send DOWNLOAD or MARK.
54,922✔
2266
                        continue_history_scan(); // Throws
102,902✔
2267
                        // Session object may have been
54,922✔
2268
                        // destroyed at this point (suicide)
54,922✔
2269
                        return;
102,902✔
2270
                    }
102,902✔
2271
                    send_alloc_message(); // Throws
2,147,483,647✔
2272
                    return;
2,147,483,647✔
2273
                }
2,147,483,647✔
2274
                // State is SendIdent
552✔
2275
                send_ident_message(); // Throws
1,294✔
2276
                return;
1,294✔
2277
            }
1,294✔
2278
            // State is SendError
42✔
2279
            send_error_message(); // Throws
78✔
2280
            return;
78✔
2281
        }
78✔
2282
        // State is SendUnbound
1,110✔
2283
        send_unbound_message(); // Throws
2,644✔
2284
        terminate();            // Throws
2,644✔
2285
        m_connection.discard_session(m_session_ident);
2,644✔
2286
        // This session is now destroyed!
1,110✔
2287
    }
2,644✔
2288

2289
    bool receive_bind_message(std::string path, std::string signed_user_token, bool need_client_file_ident,
2290
                              bool is_subserver, ProtocolError& error)
2291
    {
5,644✔
2292
        if (logger.would_log(util::Logger::Level::info)) {
5,644✔
2293
            logger.detail("Received: BIND(server_path=%1, signed_user_token='%2', "
392✔
2294
                          "need_client_file_ident=%3, is_subserver=%4)",
392✔
2295
                          path, short_token_fmt(signed_user_token), int(need_client_file_ident),
392✔
2296
                          int(is_subserver)); // Throws
392✔
2297
        }
392✔
2298

2,814✔
2299
        ServerImpl& server = m_connection.get_server();
5,644✔
2300
        _impl::VirtualPathComponents virt_path_components =
5,644✔
2301
            _impl::parse_virtual_path(server.get_root_dir(), path); // Throws
5,644✔
2302

2,814✔
2303
        if (!virt_path_components.is_valid) {
5,644✔
2304
            logger.error("Bad virtual path (message_type='bind', path='%1', "
28✔
2305
                         "signed_user_token='%2')",
28✔
2306
                         path,
28✔
2307
                         short_token_fmt(signed_user_token)); // Throws
28✔
2308
            error = ProtocolError::illegal_realm_path;
28✔
2309
            return false;
28✔
2310
        }
28✔
2311

2,800✔
2312
        // The user has proper permissions at this stage.
2,800✔
2313

2,800✔
2314
        m_server_file = server.get_or_create_file(path); // Throws
5,616✔
2315

2,800✔
2316
        m_server_file->add_unidentified_session(this); // Throws
5,616✔
2317

2,800✔
2318
        logger.info("Client info: (path='%1', from=%2, protocol=%3) %4", path, m_connection.get_remote_endpoint(),
5,616✔
2319
                    m_connection.get_client_protocol_version(),
5,616✔
2320
                    m_connection.get_client_user_agent()); // Throws
5,616✔
2321

2,800✔
2322
        m_is_subserver = is_subserver;
5,616✔
2323
        if (REALM_LIKELY(!need_client_file_ident)) {
5,616✔
2324
            // Protocol state is now WaitForUnbind
1,580✔
2325
            return true;
2,976✔
2326
        }
2,976✔
2327

1,220✔
2328
        // FIXME: We must make a choice about client file ident for read only
1,220✔
2329
        // sessions. They should have a special read-only client file ident.
1,220✔
2330
        file_ident_type proxy_file = 0; // No proxy
2,640✔
2331
        ClientType client_type = (is_subserver ? ClientType::subserver : ClientType::regular);
2,640✔
2332
        m_file_ident_request = m_server_file->request_file_ident(*this, proxy_file, client_type); // Throws
2,640✔
2333
        m_send_ident_message = true;
2,640✔
2334
        // Protocol state is now AllocatingIdent
1,220✔
2335

1,220✔
2336
        return true;
2,640✔
2337
    }
2,640✔
2338

2339
    bool receive_ident_message(file_ident_type client_file_ident, salt_type client_file_ident_salt,
2340
                               version_type scan_server_version, version_type scan_client_version,
2341
                               version_type latest_server_version, salt_type latest_server_version_salt,
2342
                               ProtocolError& error)
2343
    {
4,094✔
2344
        // Protocol state must be WaitForIdent
2,084✔
2345
        REALM_ASSERT(!need_client_file_ident());
4,094✔
2346
        REALM_ASSERT(!m_send_ident_message);
4,094✔
2347
        REALM_ASSERT(!ident_message_received());
4,094✔
2348
        REALM_ASSERT(!unbind_message_received());
4,094✔
2349
        REALM_ASSERT(!error_occurred());
4,094✔
2350
        REALM_ASSERT(!m_error_message_sent);
4,094✔
2351

2,084✔
2352
        logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
4,094✔
2353
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
4,094✔
2354
                     "latest_server_version_salt=%6)",
4,094✔
2355
                     client_file_ident, client_file_ident_salt, scan_server_version, scan_client_version,
4,094✔
2356
                     latest_server_version, latest_server_version_salt); // Throws
4,094✔
2357

2,084✔
2358
        SaltedFileIdent client_file_ident_2 = {client_file_ident, client_file_ident_salt};
4,094✔
2359
        DownloadCursor download_progress = {scan_server_version, scan_client_version};
4,094✔
2360
        SaltedVersion server_version_2 = {latest_server_version, latest_server_version_salt};
4,094✔
2361
        ClientType client_type = (m_is_subserver ? ClientType::subserver : ClientType::regular);
4,094✔
2362
        UploadCursor upload_threshold = {0, 0};
4,094✔
2363
        version_type locked_server_version = 0;
4,094✔
2364
        BootstrapError error_2 =
4,094✔
2365
            m_server_file->bootstrap_client_session(client_file_ident_2, download_progress, server_version_2,
4,094✔
2366
                                                    client_type, upload_threshold, locked_server_version,
4,094✔
2367
                                                    logger); // Throws
4,094✔
2368
        switch (error_2) {
4,094✔
2369
            case BootstrapError::no_error:
4,062✔
2370
                break;
4,062✔
2371
            case BootstrapError::client_file_expired:
✔
2372
                logger.warn("Client (%1) expired", client_file_ident); // Throws
×
2373
                error = ProtocolError::client_file_expired;
×
2374
                return false;
×
2375
            case BootstrapError::bad_client_file_ident:
✔
2376
                logger.error("Bad client file ident (%1) in IDENT message",
×
2377
                             client_file_ident); // Throws
×
2378
                error = ProtocolError::bad_client_file_ident;
×
2379
                return false;
×
2380
            case BootstrapError::bad_client_file_ident_salt:
4✔
2381
                logger.error("Bad client file identifier salt (%1) in IDENT message",
4✔
2382
                             client_file_ident_salt); // Throws
4✔
2383
                error = ProtocolError::diverging_histories;
4✔
2384
                return false;
4✔
2385
            case BootstrapError::bad_download_server_version:
✔
2386
                logger.error("Bad download progress server version in IDENT message"); // Throws
×
2387
                error = ProtocolError::bad_server_version;
×
2388
                return false;
×
2389
            case BootstrapError::bad_download_client_version:
4✔
2390
                logger.error("Bad download progress client version in IDENT message"); // Throws
4✔
2391
                error = ProtocolError::bad_client_version;
4✔
2392
                return false;
4✔
2393
            case BootstrapError::bad_server_version:
20✔
2394
                logger.error("Bad server version (message_type='ident')"); // Throws
20✔
2395
                error = ProtocolError::bad_server_version;
20✔
2396
                return false;
20✔
2397
            case BootstrapError::bad_server_version_salt:
4✔
2398
                logger.error("Bad server version salt in IDENT message"); // Throws
4✔
2399
                error = ProtocolError::diverging_histories;
4✔
2400
                return false;
4✔
2401
            case BootstrapError::bad_client_type:
✔
2402
                logger.error("Bad client type (%1) in IDENT message", int(client_type)); // Throws
×
2403
                error = ProtocolError::bad_client_file_ident; // FIXME: Introduce new protocol-level error
×
2404
                                                              // `bad_client_type`.
2405
                return false;
×
2406
        }
4,062✔
2407

2,068✔
2408
        // Make sure there is no other session currently associcated with the
2,068✔
2409
        // same client-side file
2,068✔
2410
        if (Session* other_sess = m_server_file->get_identified_session(client_file_ident)) {
4,062✔
2411
            SyncConnection& other_conn = other_sess->get_connection();
×
2412
            // It is a protocol violation if the other session is associated
2413
            // with the same connection
2414
            if (&other_conn == &m_connection) {
×
2415
                logger.error("Client file already bound in other session associated with "
×
2416
                             "the same connection"); // Throws
×
2417
                error = ProtocolError::bound_in_other_session;
×
2418
                return false;
×
2419
            }
×
2420
            // When the other session is associated with a different connection
2421
            // (`other_conn`), the clash may be due to the server not yet having
2422
            // realized that the other connection has been closed by the
2423
            // client. If so, the other connention is a "zombie". In the
2424
            // interest of getting rid of zombie connections as fast as
2425
            // possible, we shall assume that a clash with a session in another
2426
            // connection is always due to that other connection being a
2427
            // zombie. And when such a situation is detected, we want to close
2428
            // the zombie connection immediately.
2429
            auto log_level = Logger::Level::detail;
×
2430
            other_conn.terminate(log_level,
×
2431
                                 "Sync connection closed (superseded session)"); // Throws
×
2432
        }
×
2433

2,068✔
2434
        logger.info("Bound to client file (client_file_ident=%1)", client_file_ident); // Throws
4,062✔
2435

2,068✔
2436
        send_log_message(util::Logger::Level::debug, util::format("Session %1 bound to client file ident %2",
4,062✔
2437
                                                                  m_session_ident, client_file_ident));
4,062✔
2438

2,068✔
2439
        m_server_file->identify_session(this, client_file_ident); // Throws
4,062✔
2440

2,068✔
2441
        m_client_file_ident = client_file_ident;
4,062✔
2442
        m_download_progress = download_progress;
4,062✔
2443
        m_upload_threshold = upload_threshold;
4,062✔
2444
        m_locked_server_version = locked_server_version;
4,062✔
2445

2,068✔
2446
        ServerImpl& server = m_connection.get_server();
4,062✔
2447
        const Server::Config& config = server.get_config();
4,062✔
2448
        m_disable_download = (config.disable_download_for.count(client_file_ident) != 0);
4,062✔
2449

2,068✔
2450
        if (REALM_UNLIKELY(config.session_bootstrap_callback)) {
4,062✔
2451
            config.session_bootstrap_callback(m_server_file->get_virt_path(),
×
2452
                                              client_file_ident); // Throws
×
2453
        }
×
2454

2,068✔
2455
        // Protocol  state is now WaitForUnbind
2,068✔
2456
        enlist_to_send();
4,062✔
2457
        return true;
4,062✔
2458
    }
4,062✔
2459

2460
    bool receive_upload_message(version_type progress_client_version, version_type progress_server_version,
2461
                                version_type locked_server_version, const UploadChangesets& upload_changesets,
2462
                                ProtocolError& error)
2463
    {
43,872✔
2464
        // Protocol state must be WaitForUnbind
22,216✔
2465
        REALM_ASSERT(!m_send_ident_message);
43,872✔
2466
        REALM_ASSERT(ident_message_received());
43,872✔
2467
        REALM_ASSERT(!unbind_message_received());
43,872✔
2468
        REALM_ASSERT(!error_occurred());
43,872✔
2469
        REALM_ASSERT(!m_error_message_sent);
43,872✔
2470

22,216✔
2471
        logger.detail("Received: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
43,872✔
2472
                      "locked_server_version=%3, num_changesets=%4)",
43,872✔
2473
                      progress_client_version, progress_server_version, locked_server_version,
43,872✔
2474
                      upload_changesets.size()); // Throws
43,872✔
2475

22,216✔
2476
        // We are unable to reproduce the cursor object for the upload progress
22,216✔
2477
        // when the protocol version is less than 29, because the client does
22,216✔
2478
        // not provide the required information. When the protocol version is
22,216✔
2479
        // less than 25, we can always get a consistent cursor by taking it from
22,216✔
2480
        // the changeset that was uploaded last, but in protocol versions 25,
22,216✔
2481
        // 26, 27, and 28, things are more complicated. Here, we receive new
22,216✔
2482
        // values for `last_integrated_server_version` which we cannot afford to
22,216✔
2483
        // ignore, but we do not know what client versions they correspond
22,216✔
2484
        // to. Fortunately, we can produce a cursor that works, and is mutually
22,216✔
2485
        // consistent with previous cursors, by simply bumping
22,216✔
2486
        // `upload_progress.client_version` when
22,216✔
2487
        // `upload_progress.last_intgerated_server_version` grows.
22,216✔
2488
        //
22,216✔
2489
        // To see that this scheme works, consider the last changeset, A, that
22,216✔
2490
        // will have already been uploaded and integrated at the beginning of
22,216✔
2491
        // the next session, and the first changeset, B, that follows A in the
22,216✔
2492
        // client side history, and is not upload skippable (of local origin and
22,216✔
2493
        // nonempty). We then need to show that A will be skipped, if uploaded
22,216✔
2494
        // in the next session, but B will not.
22,216✔
2495
        //
22,216✔
2496
        // Let V be the client version produced by A, and let T be the value of
22,216✔
2497
        // `upload_progress.client_version` as determined in this session, which
22,216✔
2498
        // is used as threshold in the next session. Then we know that A is
22,216✔
2499
        // skipped during the next session if V is less than, or equal to T. If
22,216✔
2500
        // the protocol version is at least 29, the protocol requires that T is
22,216✔
2501
        // greater than, or equal to V. If the protocol version is less than 25,
22,216✔
2502
        // T will be equal to V. Finally, if the protocol version is 25, 26, 27,
22,216✔
2503
        // or 28, we construct T such that it is always greater than, or equal
22,216✔
2504
        // to V, so in all cases, A will be skipped during the next session.
22,216✔
2505
        //
22,216✔
2506
        // Let W be the client version on which B is based. We then know that B
22,216✔
2507
        // will be retained if, and only if W is greater than, or equalto T. If
22,216✔
2508
        // the protocol version is at least 29, we know that T is less than, or
22,216✔
2509
        // equal to W, since B is not integrated until the next session. If the
22,216✔
2510
        // protocol version is less tahn 25, we know that T is V. Since V must
22,216✔
2511
        // be less than, or equal to W, we again know that T is less than, or
22,216✔
2512
        // equal to W. Finally, if the protocol version is 25, 26, 27, or 28, we
22,216✔
2513
        // construct T such that it is equal to V + N, where N is the number of
22,216✔
2514
        // observed increments in `last_integrated_server_version` since the
22,216✔
2515
        // client version prodiced by A. For each of these observed increments,
22,216✔
2516
        // there must have been a distinct new client version, but all these
22,216✔
2517
        // client versions must be less than, or equal to W, since B is not
22,216✔
2518
        // integrated until the next session. Therefore, we know that T = V + N
22,216✔
2519
        // is less than, or qual to W. So, in all cases, B will not skipped
22,216✔
2520
        // during the next session.
22,216✔
2521
        int protocol_version = m_connection.get_client_protocol_version();
43,872✔
2522
        static_cast<void>(protocol_version); // No protocol diversion (yet)
43,872✔
2523

22,216✔
2524
        UploadCursor upload_progress;
43,872✔
2525
        upload_progress = {progress_client_version, progress_server_version};
43,872✔
2526

22,216✔
2527
        // `upload_progress.client_version` must be nondecreasing across the
22,216✔
2528
        // session.
22,216✔
2529
        bool good_1 = (upload_progress.client_version >= m_upload_progress.client_version);
43,872✔
2530
        if (REALM_UNLIKELY(!good_1)) {
43,872✔
2531
            logger.error("Decreasing client version in upload progress (%1 < %2)", upload_progress.client_version,
×
2532
                         m_upload_progress.client_version); // Throws
×
2533
            error = ProtocolError::bad_client_version;
×
2534
            return false;
×
2535
        }
×
2536
        // `upload_progress.last_integrated_server_version` must be a version
22,216✔
2537
        // that the client can have heard about.
22,216✔
2538
        bool good_2 = (upload_progress.last_integrated_server_version <= m_download_progress.server_version);
43,872✔
2539
        if (REALM_UNLIKELY(!good_2)) {
43,872✔
2540
            logger.error("Bad last integrated server version in upload progress (%1 > %2)",
×
2541
                         upload_progress.last_integrated_server_version,
×
2542
                         m_download_progress.server_version); // Throws
×
2543
            error = ProtocolError::bad_server_version;
×
2544
            return false;
×
2545
        }
×
2546

22,216✔
2547
        // `upload_progress` must be consistent.
22,216✔
2548
        if (REALM_UNLIKELY(!is_consistent(upload_progress))) {
43,872✔
2549
            logger.error("Upload progress is inconsistent (%1, %2)", upload_progress.client_version,
×
2550
                         upload_progress.last_integrated_server_version); // Throws
×
2551
            error = ProtocolError::bad_server_version;
×
2552
            return false;
×
2553
        }
×
2554
        // `upload_progress` and `m_upload_threshold` must be mutually
22,216✔
2555
        // consistent.
22,216✔
2556
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_threshold))) {
43,872✔
2557
            logger.error("Upload progress (%1, %2) is mutually inconsistent with "
×
2558
                         "threshold (%3, %4)",
×
2559
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2560
                         m_upload_threshold.client_version,
×
2561
                         m_upload_threshold.last_integrated_server_version); // Throws
×
2562
            error = ProtocolError::bad_server_version;
×
2563
            return false;
×
2564
        }
×
2565
        // `upload_progress` and `m_upload_progress` must be mutually
22,216✔
2566
        // consistent.
22,216✔
2567
        if (REALM_UNLIKELY(!are_mutually_consistent(upload_progress, m_upload_progress))) {
43,872✔
2568
            logger.error("Upload progress (%1, %2) is mutually inconsistent with previous "
×
2569
                         "upload progress (%3, %4)",
×
2570
                         upload_progress.client_version, upload_progress.last_integrated_server_version,
×
2571
                         m_upload_progress.client_version,
×
2572
                         m_upload_progress.last_integrated_server_version); // Throws
×
2573
            error = ProtocolError::bad_server_version;
×
2574
            return false;
×
2575
        }
×
2576

22,216✔
2577
        version_type locked_server_version_2 = locked_server_version;
43,872✔
2578

22,216✔
2579
        // `locked_server_version_2` must be nondecreasing over the lifetime of
22,216✔
2580
        // the client-side file.
22,216✔
2581
        if (REALM_UNLIKELY(locked_server_version_2 < m_locked_server_version)) {
43,872✔
2582
            logger.error("Decreasing locked server version (%1 < %2)", locked_server_version_2,
×
2583
                         m_locked_server_version); // Throws
×
2584
            error = ProtocolError::bad_server_version;
×
2585
            return false;
×
2586
        }
×
2587
        // `locked_server_version_2` must be a version that the client can have
22,216✔
2588
        // heard about.
22,216✔
2589
        if (REALM_UNLIKELY(locked_server_version_2 > m_download_progress.server_version)) {
43,872✔
2590
            logger.error("Bad locked server version (%1 > %2)", locked_server_version_2,
×
2591
                         m_download_progress.server_version); // Throws
×
2592
            error = ProtocolError::bad_server_version;
×
2593
            return false;
×
2594
        }
×
2595

22,216✔
2596
        std::size_t num_previously_integrated_changesets = 0;
43,872✔
2597
        if (!upload_changesets.empty()) {
43,872✔
2598
            UploadCursor up = m_upload_progress;
22,876✔
2599
            for (const ServerProtocol::UploadChangeset& uc : upload_changesets) {
36,748✔
2600
                // `uc.upload_cursor.client_version` must be increasing across
17,844✔
2601
                // all the changesets in this UPLOAD message, and all must be
17,844✔
2602
                // greater than upload_progress.client_version of previous
17,844✔
2603
                // UPLOAD message.
17,844✔
2604
                if (REALM_UNLIKELY(uc.upload_cursor.client_version <= up.client_version)) {
36,748✔
2605
                    logger.error("Nonincreasing client version in upload cursor of uploaded "
×
2606
                                 "changeset (%1 <= %2)",
×
2607
                                 uc.upload_cursor.client_version,
×
2608
                                 up.client_version); // Throws
×
2609
                    error = ProtocolError::bad_client_version;
×
2610
                    return false;
×
2611
                }
×
2612
                // `uc.upload_progress` must be consistent.
17,844✔
2613
                if (REALM_UNLIKELY(!is_consistent(uc.upload_cursor))) {
36,748✔
2614
                    logger.error("Upload cursor of uploaded changeset is inconsistent (%1, %2)",
×
2615
                                 uc.upload_cursor.client_version,
×
2616
                                 uc.upload_cursor.last_integrated_server_version); // Throws
×
2617
                    error = ProtocolError::bad_server_version;
×
2618
                    return false;
×
2619
                }
×
2620
                // `uc.upload_progress` must be mutually consistent with
17,844✔
2621
                // previous upload cursor.
17,844✔
2622
                if (REALM_UNLIKELY(!are_mutually_consistent(uc.upload_cursor, up))) {
36,748✔
2623
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2624
                                 "inconsistent with previous upload cursor (%3, %4)",
×
2625
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2626
                                 up.client_version, up.last_integrated_server_version); // Throws
×
2627
                    error = ProtocolError::bad_server_version;
×
2628
                    return false;
×
2629
                }
×
2630
                // `uc.upload_progress` must be mutually consistent with
17,844✔
2631
                // threshold, that is, for changesets that have not previously
17,844✔
2632
                // been integrated, it is important that the specified value of
17,844✔
2633
                // `last_integrated_server_version` is greater than, or equal to
17,844✔
2634
                // the reciprocal history base version.
17,844✔
2635
                bool consistent_with_threshold = are_mutually_consistent(uc.upload_cursor, m_upload_threshold);
36,748✔
2636
                if (REALM_UNLIKELY(!consistent_with_threshold)) {
36,748✔
2637
                    logger.error("Upload cursor of uploaded changeset (%1, %2) is mutually "
×
2638
                                 "inconsistent with threshold (%3, %4)",
×
2639
                                 uc.upload_cursor.client_version, uc.upload_cursor.last_integrated_server_version,
×
2640
                                 m_upload_threshold.client_version,
×
2641
                                 m_upload_threshold.last_integrated_server_version); // Throws
×
2642
                    error = ProtocolError::bad_server_version;
×
2643
                    return false;
×
2644
                }
×
2645
                bool previously_integrated = (uc.upload_cursor.client_version <= m_upload_threshold.client_version);
36,748✔
2646
                if (previously_integrated)
36,748✔
2647
                    ++num_previously_integrated_changesets;
2,350✔
2648
                up = uc.upload_cursor;
36,748✔
2649
            }
36,748✔
2650
            // `upload_progress.client_version` must be greater than, or equal
11,844✔
2651
            // to client versions produced by each of the changesets in this
11,844✔
2652
            // UPLOAD message.
11,844✔
2653
            if (REALM_UNLIKELY(up.client_version > upload_progress.client_version)) {
22,876✔
2654
                logger.error("Upload progress less than client version produced by uploaded "
×
2655
                             "changeset (%1 > %2)",
×
2656
                             up.client_version,
×
2657
                             upload_progress.client_version); // Throws
×
2658
                error = ProtocolError::bad_client_version;
×
2659
                return false;
×
2660
            }
×
2661
            // The upload cursor of last uploaded changeset must be mutually
11,844✔
2662
            // consistent with the reported upload progress.
11,844✔
2663
            if (REALM_UNLIKELY(!are_mutually_consistent(up, upload_progress))) {
22,876✔
2664
                logger.error("Upload cursor (%1, %2) of last uploaded changeset is mutually "
×
2665
                             "inconsistent with upload progress (%3, %4)",
×
2666
                             up.client_version, up.last_integrated_server_version, upload_progress.client_version,
×
2667
                             upload_progress.last_integrated_server_version); // Throws
×
2668
                error = ProtocolError::bad_server_version;
×
2669
                return false;
×
2670
            }
×
2671
        }
43,872✔
2672

22,216✔
2673
        // FIXME: Part of a very poor man's substitute for a proper backpressure
22,216✔
2674
        // scheme.
22,216✔
2675
        if (REALM_UNLIKELY(!m_server_file->can_add_changesets_from_downstream())) {
43,872✔
2676
            logger.debug("Terminating uploading session because buffer is full"); // Throws
×
2677
            // Using this exact error code, because it causes `try_again` flag
2678
            // to be set to true, which causes the client to wait for about 5
2679
            // minuites before trying to connect again.
2680
            error = ProtocolError::connection_closed;
×
2681
            return false;
×
2682
        }
×
2683

22,216✔
2684
        m_upload_progress = upload_progress;
43,872✔
2685

22,216✔
2686
        bool have_real_upload_progress = (upload_progress.client_version > m_upload_threshold.client_version);
43,872✔
2687
        bool bump_locked_server_version = (locked_server_version_2 > m_locked_server_version);
43,872✔
2688

22,216✔
2689
        std::size_t num_changesets_to_integrate = upload_changesets.size() - num_previously_integrated_changesets;
43,872✔
2690
        REALM_ASSERT(have_real_upload_progress || num_changesets_to_integrate == 0);
43,872✔
2691

22,216✔
2692
        bool have_anything_to_do = (have_real_upload_progress || bump_locked_server_version);
43,872✔
2693
        if (!have_anything_to_do)
43,872✔
2694
            return true;
254✔
2695

22,058✔
2696
        if (!have_real_upload_progress)
43,618✔
2697
            upload_progress = m_upload_threshold;
×
2698

22,058✔
2699
        if (num_previously_integrated_changesets > 0) {
43,618✔
2700
            logger.detail("Ignoring %1 previously integrated changesets",
724✔
2701
                          num_previously_integrated_changesets); // Throws
724✔
2702
        }
724✔
2703
        if (num_changesets_to_integrate > 0) {
43,618✔
2704
            logger.detail("Initiate integration of %1 remote changesets",
22,524✔
2705
                          num_changesets_to_integrate); // Throws
22,524✔
2706
        }
22,524✔
2707

22,058✔
2708
        REALM_ASSERT(m_server_file);
43,618✔
2709
        ServerFile& file = *m_server_file;
43,618✔
2710
        std::size_t offset = num_previously_integrated_changesets;
43,618✔
2711
        file.add_changesets_from_downstream(m_client_file_ident, upload_progress, locked_server_version_2,
43,618✔
2712
                                            upload_changesets.data() + offset, num_changesets_to_integrate); // Throws
43,618✔
2713

22,058✔
2714
        m_locked_server_version = locked_server_version_2;
43,618✔
2715
        return true;
43,618✔
2716
    }
43,618✔
2717

2718
    bool receive_mark_message(request_ident_type request_ident, ProtocolError&)
2719
    {
12,084✔
2720
        // Protocol state must be WaitForUnbind
5,976✔
2721
        REALM_ASSERT(!m_send_ident_message);
12,084✔
2722
        REALM_ASSERT(ident_message_received());
12,084✔
2723
        REALM_ASSERT(!unbind_message_received());
12,084✔
2724
        REALM_ASSERT(!error_occurred());
12,084✔
2725
        REALM_ASSERT(!m_error_message_sent);
12,084✔
2726

5,976✔
2727
        logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
12,084✔
2728

5,976✔
2729
        m_download_completion_request = request_ident;
12,084✔
2730

5,976✔
2731
        ensure_enlisted_to_send();
12,084✔
2732
        return true;
12,084✔
2733
    }
12,084✔
2734

2735
    // Returns true if the deactivation process has been completed, at which
2736
    // point the caller (SyncConnection::receive_unbind_message()) should
2737
    // terminate the session.
2738
    //
2739
    // CAUTION: This function may commit suicide!
2740
    void receive_unbind_message()
2741
    {
2,966✔
2742
        // Protocol state may be anything but SendUnbound
1,418✔
2743
        REALM_ASSERT(!m_unbind_message_received);
2,966✔
2744

1,418✔
2745
        logger.detail("Received: UNBIND"); // Throws
2,966✔
2746

1,418✔
2747
        detach_from_server_file();
2,966✔
2748
        m_unbind_message_received = true;
2,966✔
2749

1,418✔
2750
        // Detect completion of the deactivation process
1,418✔
2751
        if (m_error_message_sent) {
2,966✔
2752
            // Deactivation process completed
14✔
2753
            terminate(); // Throws
28✔
2754
            m_connection.discard_session(m_session_ident);
28✔
2755
            // This session is now destroyed!
14✔
2756
            return;
28✔
2757
        }
28✔
2758

1,404✔
2759
        // Protocol state is now SendUnbound
1,404✔
2760
        ensure_enlisted_to_send();
2,938✔
2761
    }
2,938✔
2762

2763
    void receive_error_message(session_ident_type, int, std::string_view)
2764
    {
×
2765
        REALM_ASSERT(!m_unbind_message_received);
×
2766

2767
        logger.detail("Received: ERROR"); // Throws
×
2768
    }
×
2769

2770
private:
2771
    SyncConnection& m_connection;
2772

2773
    const session_ident_type m_session_ident;
2774

2775
    // Not null if, and only if this session is in
2776
    // m_connection.m_sessions_enlisted_to_send.
2777
    Session* m_next = nullptr;
2778

2779
    // Becomes nonnull when the BIND message is received, if no error occurs. Is
2780
    // reset to null when the deactivation process is initiated, either when the
2781
    // UNBIND message is recieved, or when initiate_deactivation() is called.
2782
    util::bind_ptr<ServerFile> m_server_file;
2783

2784
    bool m_disable_download = false;
2785
    bool m_is_subserver = false;
2786

2787
    using file_ident_request_type = ServerFile::file_ident_request_type;
2788

2789
    // When nonzero, this session has an outstanding request for a client file
2790
    // identifier.
2791
    file_ident_request_type m_file_ident_request = 0;
2792

2793
    // Payload for next outgoing ALLOC message.
2794
    SaltedFileIdent m_allocated_file_ident = {0, 0};
2795

2796
    // Zero until the session receives an IDENT message from the client.
2797
    file_ident_type m_client_file_ident = 0;
2798

2799
    // Zero until initiate_deactivation() is called.
2800
    ProtocolError m_error_code = {};
2801

2802
    // The current point of progression of the download process. Set to (<server
2803
    // version>, <client version>) of the IDENT message when the IDENT message
2804
    // is received. At the time of return from continue_history_scan(), it
2805
    // points to the latest server version such that all preceding changesets in
2806
    // the server-side history have been downloaded, are currently being
2807
    // downloaded, or are *download excluded*.
2808
    DownloadCursor m_download_progress = {0, 0};
2809

2810
    request_ident_type m_download_completion_request = 0;
2811

2812
    // Records the progress of the upload process. Used to check that the client
2813
    // uploads changesets in order. Also, when m_upload_progress >
2814
    // m_upload_threshold, m_upload_progress works as a cache of the persisted
2815
    // version of the upload progress.
2816
    UploadCursor m_upload_progress = {0, 0};
2817

2818
    // Initialized on reception of the IDENT message. Specifies the actual
2819
    // upload progress (as recorded on the server-side) at the beginning of the
2820
    // session, and it remains fixed throughout the session.
2821
    //
2822
    // m_upload_threshold includes the progress resulting from the received
2823
    // changesets that have not yet been integrated (only relevant for
2824
    // synchronous backup).
2825
    UploadCursor m_upload_threshold = {0, 0};
2826

2827
    // Works partially as a cache of the persisted value, and partially as a way
2828
    // of checking that the client respects that it can never decrease.
2829
    version_type m_locked_server_version = 0;
2830

2831
    bool m_send_ident_message = false;
2832
    bool m_unbind_message_received = false;
2833
    bool m_error_message_sent = false;
2834

2835
    /// m_one_download_message_sent denotes whether at least one DOWNLOAD message
2836
    /// has been sent in the current session. The variable is used to ensure
2837
    /// that a DOWNLOAD message is always sent in a session. The received
2838
    /// DOWNLOAD message is needed by the client to ensure that its current
2839
    /// download progress is up to date.
2840
    bool m_one_download_message_sent = false;
2841

2842
    static std::string make_logger_prefix(session_ident_type session_ident)
2843
    {
5,644✔
2844
        std::ostringstream out;
5,644✔
2845
        out.imbue(std::locale::classic());
5,644✔
2846
        out << "Session[" << session_ident << "]: "; // Throws
5,644✔
2847
        return out.str();                            // Throws
5,644✔
2848
    }
5,644✔
2849

2850
    // Scan the history for changesets to be downloaded.
2851
    // If the history is longer than the end point of the previous scan,
2852
    // a DOWNLOAD message will be sent.
2853
    // A MARK message is sent if no DOWNLOAD message is sent, and the client has
2854
    // requested to be notified about download completion.
2855
    // In case neither a DOWNLOAD nor a MARK is sent, no message is sent.
2856
    //
2857
    // This function may lead to the destruction of the session object
2858
    // (suicide).
2859
    void continue_history_scan()
2860
    {
102,902✔
2861
        // Protocol state must be WaitForUnbind
54,922✔
2862
        REALM_ASSERT(!m_send_ident_message);
102,902✔
2863
        REALM_ASSERT(ident_message_received());
102,902✔
2864
        REALM_ASSERT(!unbind_message_received());
102,902✔
2865
        REALM_ASSERT(!error_occurred());
102,902✔
2866
        REALM_ASSERT(!m_error_message_sent);
102,902✔
2867
        REALM_ASSERT(!is_enlisted_to_send());
102,902✔
2868

54,922✔
2869
        SaltedVersion last_server_version = m_server_file->get_salted_sync_version();
102,902✔
2870
        REALM_ASSERT(last_server_version.version >= m_download_progress.server_version);
102,902✔
2871

54,922✔
2872
        ServerImpl& server = m_connection.get_server();
102,902✔
2873
        const Server::Config& config = server.get_config();
102,902✔
2874
        if (REALM_UNLIKELY(m_disable_download))
102,902✔
2875
            return;
54,922✔
2876

54,922✔
2877
        bool have_more_to_scan =
102,902✔
2878
            (last_server_version.version > m_download_progress.server_version || !m_one_download_message_sent);
102,902✔
2879
        if (have_more_to_scan) {
102,902✔
2880
            m_server_file->register_client_access(m_client_file_ident);     // Throws
40,212✔
2881
            const ServerHistory& history = m_server_file->access().history; // Throws
40,212✔
2882
            const char* body;
40,212✔
2883
            std::size_t uncompressed_body_size;
40,212✔
2884
            std::size_t compressed_body_size = 0;
40,212✔
2885
            bool body_is_compressed = false;
40,212✔
2886
            version_type end_version = last_server_version.version;
40,212✔
2887
            DownloadCursor download_progress;
40,212✔
2888
            UploadCursor upload_progress = {0, 0};
40,212✔
2889
            std::uint_fast64_t downloadable_bytes = 0;
40,212✔
2890
            std::size_t num_changesets;
40,212✔
2891
            std::size_t accum_original_size;
40,212✔
2892
            std::size_t accum_compacted_size;
40,212✔
2893
            ServerProtocol& protocol = get_server_protocol();
40,212✔
2894
            bool disable_download_compaction = config.disable_download_compaction;
40,212✔
2895
            bool enable_cache = (config.enable_download_bootstrap_cache && m_download_progress.server_version == 0 &&
40,212!
2896
                                 m_upload_progress.client_version == 0 && m_upload_threshold.client_version == 0);
21,826!
2897
            DownloadCache& cache = m_server_file->get_download_cache();
40,212✔
2898
            bool fetch_from_cache = (enable_cache && cache.body && end_version == cache.end_version);
40,212!
2899
            if (fetch_from_cache) {
40,212✔
2900
                body = cache.body.get();
×
2901
                uncompressed_body_size = cache.uncompressed_body_size;
×
2902
                compressed_body_size = cache.compressed_body_size;
×
2903
                body_is_compressed = cache.body_is_compressed;
×
2904
                download_progress = cache.download_progress;
×
2905
                downloadable_bytes = cache.downloadable_bytes;
×
2906
                num_changesets = cache.num_changesets;
×
2907
                accum_original_size = cache.accum_original_size;
×
2908
                accum_compacted_size = cache.accum_compacted_size;
×
2909
            }
×
2910
            else {
40,212✔
2911
                // Discard the old cached DOWNLOAD body before generating a new
21,826✔
2912
                // one to be cached. This can make a big difference because the
21,826✔
2913
                // size of that body can be very large (10GiB has been seen in a
21,826✔
2914
                // real-world case).
21,826✔
2915
                if (enable_cache)
40,212✔
2916
                    cache.body = {};
×
2917

21,826✔
2918
                OutputBuffer& out = server.get_misc_buffers().download_message;
40,212✔
2919
                out.reset();
40,212✔
2920
                download_progress = m_download_progress;
40,212✔
2921
                auto fetch_and_compress = [&](std::size_t max_download_size) {
40,214✔
2922
                    DownloadHistoryEntryHandler handler{protocol, out, logger};
40,214✔
2923
                    std::uint_fast64_t cumulative_byte_size_current;
40,214✔
2924
                    std::uint_fast64_t cumulative_byte_size_total;
40,214✔
2925
                    bool not_expired = history.fetch_download_info(
40,214✔
2926
                        m_client_file_ident, download_progress, end_version, upload_progress, handler,
40,214✔
2927
                        cumulative_byte_size_current, cumulative_byte_size_total, disable_download_compaction,
40,214✔
2928
                        max_download_size); // Throws
40,214✔
2929
                    REALM_ASSERT(upload_progress.client_version >= download_progress.last_integrated_client_version);
40,214✔
2930
                    SyncConnection& conn = get_connection();
40,214✔
2931
                    if (REALM_UNLIKELY(!not_expired)) {
40,214✔
2932
                        logger.debug("History scanning failed: Client file entry "
×
2933
                                     "expired during session"); // Throws
×
2934
                        conn.protocol_error(ProtocolError::client_file_expired, this);
×
2935
                        // Session object may have been destroyed at this point
2936
                        // (suicide).
2937
                        return false;
×
2938
                    }
×
2939

21,826✔
2940
                    downloadable_bytes = cumulative_byte_size_total - cumulative_byte_size_current;
40,214✔
2941
                    uncompressed_body_size = out.size();
40,214✔
2942
                    BinaryData uncompressed = {out.data(), uncompressed_body_size};
40,214✔
2943
                    body = uncompressed.data();
40,214✔
2944
                    std::size_t max_uncompressed = 1024;
40,214✔
2945
                    if (uncompressed.size() > max_uncompressed) {
40,214✔
2946
                        compression::CompressMemoryArena& arena = server.get_compress_memory_arena();
4,226✔
2947
                        std::vector<char>& buffer = server.get_misc_buffers().compress;
4,226✔
2948
                        compression::allocate_and_compress(arena, uncompressed, buffer); // Throws
4,226✔
2949
                        if (buffer.size() < uncompressed.size()) {
4,226✔
2950
                            body = buffer.data();
4,226✔
2951
                            compressed_body_size = buffer.size();
4,226✔
2952
                            body_is_compressed = true;
4,226✔
2953
                        }
4,226✔
2954
                    }
4,226✔
2955
                    num_changesets = handler.num_changesets;
40,214✔
2956
                    accum_original_size = handler.accum_original_size;
40,214✔
2957
                    accum_compacted_size = handler.accum_compacted_size;
40,214✔
2958
                    return true;
40,214✔
2959
                };
40,214✔
2960
                if (enable_cache) {
40,212✔
2961
                    std::size_t max_download_size = std::numeric_limits<size_t>::max();
×
2962
                    if (!fetch_and_compress(max_download_size)) { // Throws
×
2963
                        // Session object may have been destroyed at this point
2964
                        // (suicide).
2965
                        return;
×
2966
                    }
×
2967
                    REALM_ASSERT(upload_progress.client_version == 0);
×
2968
                    std::size_t body_size = (body_is_compressed ? compressed_body_size : uncompressed_body_size);
×
2969
                    cache.body = std::make_unique<char[]>(body_size); // Throws
×
2970
                    std::copy(body, body + body_size, cache.body.get());
×
2971
                    cache.uncompressed_body_size = uncompressed_body_size;
×
2972
                    cache.compressed_body_size = compressed_body_size;
×
2973
                    cache.body_is_compressed = body_is_compressed;
×
2974
                    cache.end_version = end_version;
×
2975
                    cache.download_progress = download_progress;
×
2976
                    cache.downloadable_bytes = downloadable_bytes;
×
2977
                    cache.num_changesets = num_changesets;
×
2978
                    cache.accum_original_size = accum_original_size;
×
2979
                    cache.accum_compacted_size = accum_compacted_size;
×
2980
                }
×
2981
                else {
40,212✔
2982
                    std::size_t max_download_size = config.max_download_size;
40,212✔
2983
                    if (!fetch_and_compress(max_download_size)) { // Throws
40,212✔
2984
                        // Session object may have been destroyed at this point
2985
                        // (suicide).
2986
                        return;
×
2987
                    }
×
2988
                }
40,212✔
2989
            }
40,212✔
2990

21,826✔
2991
            OutputBuffer& out = m_connection.get_output_buffer();
40,212✔
2992
            protocol.make_download_message(
40,212✔
2993
                m_connection.get_client_protocol_version(), out, m_session_ident, download_progress.server_version,
40,212✔
2994
                download_progress.last_integrated_client_version, last_server_version.version,
40,212✔
2995
                last_server_version.salt, upload_progress.client_version,
40,212✔
2996
                upload_progress.last_integrated_server_version, downloadable_bytes, num_changesets, body,
40,212✔
2997
                uncompressed_body_size, compressed_body_size, body_is_compressed, logger); // Throws
40,212✔
2998

21,826✔
2999
            if (!disable_download_compaction) {
40,214✔
3000
                std::size_t saved = accum_original_size - accum_compacted_size;
40,214✔
3001
                double saved_2 = (accum_original_size == 0 ? 0 : std::round(saved * 100.0 / accum_original_size));
31,512✔
3002
                logger.detail("Download compaction: Saved %1 bytes (%2%%)", saved, saved_2); // Throws
40,214✔
3003
            }
40,214✔
3004

21,826✔
3005
            m_download_progress = download_progress;
40,212✔
3006
            logger.debug("Setting of m_download_progress.server_version = %1",
40,212✔
3007
                         m_download_progress.server_version); // Throws
40,212✔
3008
            send_download_message();
40,212✔
3009
            m_one_download_message_sent = true;
40,212✔
3010

21,826✔
3011
            enlist_to_send();
40,212✔
3012
        }
40,212✔
3013
        else if (m_download_completion_request) {
62,690✔
3014
            // Send a MARK message
5,972✔
3015
            request_ident_type request_ident = m_download_completion_request;
12,076✔
3016
            send_mark_message(request_ident);  // Throws
12,076✔
3017
            m_download_completion_request = 0; // Request handled
12,076✔
3018
            enlist_to_send();
12,076✔
3019
        }
12,076✔
3020
    }
102,902✔
3021

3022
    void send_ident_message()
3023
    {
1,294✔
3024
        // Protocol state must be SendIdent
552✔
3025
        REALM_ASSERT(!need_client_file_ident());
1,294✔
3026
        REALM_ASSERT(m_send_ident_message);
1,294✔
3027
        REALM_ASSERT(!ident_message_received());
1,294✔
3028
        REALM_ASSERT(!unbind_message_received());
1,294✔
3029
        REALM_ASSERT(!error_occurred());
1,294✔
3030
        REALM_ASSERT(!m_error_message_sent);
1,294✔
3031

552✔
3032
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
1,294✔
3033

552✔
3034
        file_ident_type client_file_ident = m_allocated_file_ident.ident;
1,294✔
3035
        salt_type client_file_ident_salt = m_allocated_file_ident.salt;
1,294✔
3036

552✔
3037
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident,
1,294✔
3038
                     client_file_ident_salt); // Throws
1,294✔
3039

552✔
3040
        ServerProtocol& protocol = get_server_protocol();
1,294✔
3041
        OutputBuffer& out = m_connection.get_output_buffer();
1,294✔
3042
        int protocol_version = m_connection.get_client_protocol_version();
1,294✔
3043
        protocol.make_ident_message(protocol_version, out, m_session_ident, client_file_ident,
1,294✔
3044
                                    client_file_ident_salt); // Throws
1,294✔
3045
        m_connection.initiate_write_output_buffer();         // Throws
1,294✔
3046

552✔
3047
        m_allocated_file_ident.ident = 0; // Consumed
1,294✔
3048
        m_send_ident_message = false;
1,294✔
3049
        // Protocol state is now WaitForStateRequest or WaitForIdent
552✔
3050
    }
1,294✔
3051

3052
    void send_download_message()
3053
    {
40,212✔
3054
        m_connection.initiate_write_output_buffer(); // Throws
40,212✔
3055
    }
40,212✔
3056

3057
    void send_mark_message(request_ident_type request_ident)
3058
    {
12,076✔
3059
        logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
12,076✔
3060

5,972✔
3061
        ServerProtocol& protocol = get_server_protocol();
12,076✔
3062
        OutputBuffer& out = m_connection.get_output_buffer();
12,076✔
3063
        protocol.make_mark_message(out, m_session_ident, request_ident); // Throws
12,076✔
3064
        m_connection.initiate_write_output_buffer();                     // Throws
12,076✔
3065
    }
12,076✔
3066

3067
    void send_alloc_message()
3068
    {
×
3069
        // Protocol state must be WaitForUnbind
3070
        REALM_ASSERT(!m_send_ident_message);
×
3071
        REALM_ASSERT(ident_message_received());
×
3072
        REALM_ASSERT(!unbind_message_received());
×
3073
        REALM_ASSERT(!error_occurred());
×
3074
        REALM_ASSERT(!m_error_message_sent);
×
3075

3076
        REALM_ASSERT(m_allocated_file_ident.ident != 0);
×
3077

3078
        // Relayed allocations are only allowed from protocol version 23 (old protocol).
3079
        REALM_ASSERT(false);
×
3080

3081
        file_ident_type file_ident = m_allocated_file_ident.ident;
×
3082

3083
        logger.debug("Sending: ALLOC(file_ident=%1)", file_ident); // Throws
×
3084

3085
        ServerProtocol& protocol = get_server_protocol();
×
3086
        OutputBuffer& out = m_connection.get_output_buffer();
×
3087
        protocol.make_alloc_message(out, m_session_ident, file_ident); // Throws
×
3088
        m_connection.initiate_write_output_buffer();                   // Throws
×
3089

3090
        m_allocated_file_ident.ident = 0; // Consumed
×
3091

3092
        // Other messages may be waiting to be sent.
3093
        enlist_to_send();
×
3094
    }
×
3095

3096
    void send_unbound_message()
3097
    {
2,644✔
3098
        // Protocol state must be SendUnbound
1,110✔
3099
        REALM_ASSERT(unbind_message_received());
2,644✔
3100
        REALM_ASSERT(!m_error_message_sent);
2,644✔
3101

1,110✔
3102
        logger.debug("Sending: UNBOUND"); // Throws
2,644✔
3103

1,110✔
3104
        ServerProtocol& protocol = get_server_protocol();
2,644✔
3105
        OutputBuffer& out = m_connection.get_output_buffer();
2,644✔
3106
        protocol.make_unbound_message(out, m_session_ident); // Throws
2,644✔
3107
        m_connection.initiate_write_output_buffer();         // Throws
2,644✔
3108
    }
2,644✔
3109

3110
    void send_error_message()
3111
    {
76✔
3112
        // Protocol state must be SendError
40✔
3113
        REALM_ASSERT(!unbind_message_received());
76✔
3114
        REALM_ASSERT(error_occurred());
76✔
3115
        REALM_ASSERT(!m_error_message_sent);
76✔
3116

40✔
3117
        REALM_ASSERT(is_session_level_error(m_error_code));
76✔
3118

40✔
3119
        ProtocolError error_code = m_error_code;
76✔
3120
        const char* message = get_protocol_error_message(int(error_code));
76✔
3121
        std::size_t message_size = std::strlen(message);
76✔
3122
        bool try_again = determine_try_again(error_code);
76✔
3123

40✔
3124
        logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3)", int(error_code), message_size,
76✔
3125
                      try_again); // Throws
76✔
3126

40✔
3127
        ServerProtocol& protocol = get_server_protocol();
76✔
3128
        OutputBuffer& out = m_connection.get_output_buffer();
76✔
3129
        int protocol_version = m_connection.get_client_protocol_version();
76✔
3130
        protocol.make_error_message(protocol_version, out, error_code, message, message_size, try_again,
76✔
3131
                                    m_session_ident); // Throws
76✔
3132
        m_connection.initiate_write_output_buffer();  // Throws
76✔
3133

40✔
3134
        m_error_message_sent = true;
76✔
3135
        // Protocol state is now WaitForUnbindErr
40✔
3136
    }
76✔
3137

3138
    void send_log_message(util::Logger::Level level, const std::string&& message)
3139
    {
4,062✔
3140
        if (m_connection.get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
4,062✔
3141
            return logger.log(level, message.c_str());
×
3142
        }
×
3143

2,068✔
3144
        m_connection.send_log_message(level, std::move(message), m_session_ident);
4,062✔
3145
    }
4,062✔
3146

3147
    // Idempotent
3148
    void detach_from_server_file() noexcept
3149
    {
8,688✔
3150
        if (!m_server_file)
8,688✔
3151
            return;
3,072✔
3152
        ServerFile& file = *m_server_file;
5,616✔
3153
        if (ident_message_received()) {
5,616✔
3154
            file.remove_identified_session(m_client_file_ident);
4,062✔
3155
        }
4,062✔
3156
        else {
1,554✔
3157
            file.remove_unidentified_session(this);
1,554✔
3158
        }
1,554✔
3159
        if (m_file_ident_request != 0)
5,616✔
3160
            file.cancel_file_ident_request(m_file_ident_request);
1,346✔
3161
        m_server_file.reset();
5,616✔
3162
    }
5,616✔
3163

3164
    friend class SessionQueue;
3165
};
3166

3167

3168
// ============================ SessionQueue implementation ============================
3169

3170
void SessionQueue::push_back(Session* sess) noexcept
3171
{
107,424✔
3172
    REALM_ASSERT(!sess->m_next);
107,424✔
3173
    if (m_back) {
107,424✔
3174
        sess->m_next = m_back->m_next;
38,930✔
3175
        m_back->m_next = sess;
38,930✔
3176
    }
38,930✔
3177
    else {
68,494✔
3178
        sess->m_next = sess;
68,494✔
3179
    }
68,494✔
3180
    m_back = sess;
107,424✔
3181
}
107,424✔
3182

3183

3184
Session* SessionQueue::pop_front() noexcept
3185
{
154,484✔
3186
    Session* sess = nullptr;
154,484✔
3187
    if (m_back) {
154,484✔
3188
        sess = m_back->m_next;
106,916✔
3189
        if (sess != m_back) {
106,916✔
3190
            m_back->m_next = sess->m_next;
38,564✔
3191
        }
38,564✔
3192
        else {
68,352✔
3193
            m_back = nullptr;
68,352✔
3194
        }
68,352✔
3195
        sess->m_next = nullptr;
106,916✔
3196
    }
106,916✔
3197
    return sess;
154,484✔
3198
}
154,484✔
3199

3200

3201
void SessionQueue::clear() noexcept
3202
{
3,062✔
3203
    if (m_back) {
3,062✔
3204
        Session* sess = m_back;
142✔
3205
        for (;;) {
508✔
3206
            Session* next = sess->m_next;
508✔
3207
            sess->m_next = nullptr;
508✔
3208
            if (next == m_back)
508✔
3209
                break;
142✔
3210
            sess = next;
366✔
3211
        }
366✔
3212
        m_back = nullptr;
142✔
3213
    }
142✔
3214
}
3,062✔
3215

3216

3217
// ============================ ServerFile implementation ============================
3218

3219
ServerFile::ServerFile(ServerImpl& server, ServerFileAccessCache& cache, const std::string& virt_path,
3220
                       std::string real_path, bool disable_sync_to_disk)
3221
    : logger{"ServerFile[" + virt_path + "]: ", server.logger_ptr}           // Throws
3222
    , wlogger{"ServerFile[" + virt_path + "]: ", server.get_worker().logger} // Throws
3223
    , m_server{server}
3224
    , m_file{cache, real_path, virt_path, false, disable_sync_to_disk} // Throws
3225
    , m_worker_file{server.get_worker().get_file_access_cache(), real_path, virt_path, true, disable_sync_to_disk}
3226
{
1,088✔
3227
}
1,088✔
3228

3229

3230
ServerFile::~ServerFile() noexcept
3231
{
1,088✔
3232
    REALM_ASSERT(m_unidentified_sessions.empty());
1,088✔
3233
    REALM_ASSERT(m_identified_sessions.empty());
1,088✔
3234
    REALM_ASSERT(m_file_ident_request == 0);
1,088✔
3235
}
1,088✔
3236

3237

3238
void ServerFile::initialize()
3239
{
1,088✔
3240
    const ServerHistory& history = access().history; // Throws
1,088✔
3241
    file_ident_type partial_file_ident = 0;
1,088✔
3242
    version_type partial_progress_reference_version = 0;
1,088✔
3243
    bool has_upstream_sync_status;
1,088✔
3244
    history.get_status(m_version_info, has_upstream_sync_status, partial_file_ident,
1,088✔
3245
                       partial_progress_reference_version); // Throws
1,088✔
3246
    REALM_ASSERT(!has_upstream_sync_status);
1,088✔
3247
    REALM_ASSERT(partial_file_ident == 0);
1,088✔
3248
}
1,088✔
3249

3250

3251
void ServerFile::activate() {}
1,088✔
3252

3253

3254
// This function must be called only after a completed invocation of
3255
// initialize(). Both functinos must only ever be called by the network event
3256
// loop thread.
3257
void ServerFile::register_client_access(file_ident_type) {}
87,888✔
3258

3259

3260
auto ServerFile::request_file_ident(FileIdentReceiver& receiver, file_ident_type proxy_file, ClientType client_type)
3261
    -> file_ident_request_type
3262
{
2,640✔
3263
    auto request = ++m_last_file_ident_request;
2,640✔
3264
    m_file_ident_requests[request] = {&receiver, proxy_file, client_type}; // Throws
2,640✔
3265

1,220✔
3266
    on_work_added(); // Throws
2,640✔
3267
    return request;
2,640✔
3268
}
2,640✔
3269

3270

3271
void ServerFile::cancel_file_ident_request(file_ident_request_type request) noexcept
3272
{
1,346✔
3273
    auto i = m_file_ident_requests.find(request);
1,346✔
3274
    REALM_ASSERT(i != m_file_ident_requests.end());
1,346✔
3275
    FileIdentRequestInfo& info = i->second;
1,346✔
3276
    REALM_ASSERT(info.receiver);
1,346✔
3277
    info.receiver = nullptr;
1,346✔
3278
}
1,346✔
3279

3280

3281
void ServerFile::add_unidentified_session(Session* sess)
3282
{
5,614✔
3283
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 0);
5,614✔
3284
    m_unidentified_sessions.insert(sess); // Throws
5,614✔
3285
}
5,614✔
3286

3287

3288
void ServerFile::identify_session(Session* sess, file_ident_type client_file_ident)
3289
{
4,062✔
3290
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
4,062✔
3291
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 0);
4,062✔
3292

2,068✔
3293
    m_identified_sessions[client_file_ident] = sess; // Throws
4,062✔
3294
    m_unidentified_sessions.erase(sess);
4,062✔
3295
}
4,062✔
3296

3297

3298
void ServerFile::remove_unidentified_session(Session* sess) noexcept
3299
{
1,554✔
3300
    REALM_ASSERT(m_unidentified_sessions.count(sess) == 1);
1,554✔
3301
    m_unidentified_sessions.erase(sess);
1,554✔
3302
}
1,554✔
3303

3304

3305
void ServerFile::remove_identified_session(file_ident_type client_file_ident) noexcept
3306
{
4,062✔
3307
    REALM_ASSERT(m_identified_sessions.count(client_file_ident) == 1);
4,062✔
3308
    m_identified_sessions.erase(client_file_ident);
4,062✔
3309
}
4,062✔
3310

3311

3312
Session* ServerFile::get_identified_session(file_ident_type client_file_ident) noexcept
3313
{
4,062✔
3314
    auto i = m_identified_sessions.find(client_file_ident);
4,062✔
3315
    if (i == m_identified_sessions.end())
4,062✔
3316
        return nullptr;
4,062✔
3317
    return i->second;
×
3318
}
×
3319

3320
bool ServerFile::can_add_changesets_from_downstream() const noexcept
3321
{
43,870✔
3322
    return (m_blocked_changesets_from_downstream_byte_size < m_server.get_max_upload_backlog());
43,870✔
3323
}
43,870✔
3324

3325

3326
void ServerFile::add_changesets_from_downstream(file_ident_type client_file_ident, UploadCursor upload_progress,
3327
                                                version_type locked_server_version, const UploadChangeset* changesets,
3328
                                                std::size_t num_changesets)
3329
{
43,616✔
3330
    register_client_access(client_file_ident); // Throws
43,616✔
3331

22,056✔
3332
    bool dirty = false;
43,616✔
3333

22,056✔
3334
    IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident]; // Throws
43,616✔
3335
    std::size_t num_bytes = 0;
43,616✔
3336
    for (std::size_t i = 0; i < num_changesets; ++i) {
78,012✔
3337
        const UploadChangeset& uc = changesets[i];
34,396✔
3338
        auto& changesets = list.changesets;
34,396✔
3339
        changesets.emplace_back(client_file_ident, uc.origin_timestamp, uc.origin_file_ident, uc.upload_cursor,
34,396✔
3340
                                uc.changeset); // Throws
34,396✔
3341
        num_bytes += uc.changeset.size();
34,396✔
3342
        dirty = true;
34,396✔
3343
    }
34,396✔
3344

22,056✔
3345
    REALM_ASSERT(upload_progress.client_version >= list.upload_progress.client_version);
43,616✔
3346
    REALM_ASSERT(are_mutually_consistent(upload_progress, list.upload_progress));
43,616✔
3347
    if (upload_progress.client_version > list.upload_progress.client_version) {
43,616✔
3348
        list.upload_progress = upload_progress;
43,616✔
3349
        dirty = true;
43,616✔
3350
    }
43,616✔
3351

22,056✔
3352
    REALM_ASSERT(locked_server_version >= list.locked_server_version);
43,616✔
3353
    if (locked_server_version > list.locked_server_version) {
43,616✔
3354
        list.locked_server_version = locked_server_version;
38,520✔
3355
        dirty = true;
38,520✔
3356
    }
38,520✔
3357

22,056✔
3358
    if (REALM_LIKELY(dirty)) {
43,616✔
3359
        if (num_changesets > 0) {
43,616✔
3360
            on_changesets_from_downstream_added(num_changesets, num_bytes); // Throws
22,522✔
3361
        }
22,522✔
3362
        else {
21,094✔
3363
            on_work_added(); // Throws
21,094✔
3364
        }
21,094✔
3365
    }
43,616✔
3366
}
43,616✔
3367

3368

3369
BootstrapError ServerFile::bootstrap_client_session(SaltedFileIdent client_file_ident,
3370
                                                    DownloadCursor download_progress, SaltedVersion server_version,
3371
                                                    ClientType client_type, UploadCursor& upload_progress,
3372
                                                    version_type& locked_server_version, Logger& logger)
3373
{
4,094✔
3374
    // The Realm file may contain a later snapshot than the one reflected by
2,084✔
3375
    // `m_sync_version`, but if so, the client cannot "legally" know about it.
2,084✔
3376
    if (server_version.version > m_version_info.sync_version.version)
4,094✔
3377
        return BootstrapError::bad_server_version;
20✔
3378

2,074✔
3379
    const ServerHistory& hist = access().history; // Throws
4,074✔
3380
    BootstrapError error = hist.bootstrap_client_session(client_file_ident, download_progress, server_version,
4,074✔
3381
                                                         client_type, upload_progress, locked_server_version,
4,074✔
3382
                                                         logger); // Throws
4,074✔
3383

2,074✔
3384
    // FIXME: Rather than taking previously buffered changesets from the same
2,074✔
3385
    // client file into account when determining the upload progress, and then
2,074✔
3386
    // allowing for an error during the integration of those changesets to be
2,074✔
3387
    // reported to, and terminate the new session, consider to instead postpone
2,074✔
3388
    // the bootstrapping of the new session until all previously buffered
2,074✔
3389
    // changesets from same client file have been fully processed.
2,074✔
3390

2,074✔
3391
    if (error == BootstrapError::no_error) {
4,074✔
3392
        register_client_access(client_file_ident.ident); // Throws
4,062✔
3393

2,068✔
3394
        // If upload, or releaseing of server versions progressed further during
2,068✔
3395
        // previous sessions than the persisted points, take that into account
2,068✔
3396
        auto i = m_work.changesets_from_downstream.find(client_file_ident.ident);
4,062✔
3397
        if (i != m_work.changesets_from_downstream.end()) {
4,062✔
3398
            const IntegratableChangesetList& list = i->second;
1,324✔
3399
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
1,324✔
3400
            upload_progress = list.upload_progress;
1,324✔
3401
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
1,324✔
3402
            locked_server_version = list.locked_server_version;
1,324✔
3403
        }
1,324✔
3404
        auto j = m_changesets_from_downstream.find(client_file_ident.ident);
4,062✔
3405
        if (j != m_changesets_from_downstream.end()) {
4,062✔
3406
            const IntegratableChangesetList& list = j->second;
112✔
3407
            REALM_ASSERT(list.upload_progress.client_version >= upload_progress.client_version);
112✔
3408
            upload_progress = list.upload_progress;
112✔
3409
            REALM_ASSERT(list.locked_server_version >= locked_server_version);
112✔
3410
            locked_server_version = list.locked_server_version;
112✔
3411
        }
112✔
3412
    }
4,062✔
3413

2,074✔
3414
    return error;
4,074✔
3415
}
4,074✔
3416

3417
// NOTE: This function is executed by the worker thread
3418
void ServerFile::worker_process_work_unit(WorkerState& state)
3419
{
38,274✔
3420
    SteadyTimePoint start_time = steady_clock_now();
38,274✔
3421
    milliseconds_type parallel_time = 0;
38,274✔
3422

19,212✔
3423
    Work& work = m_work;
38,274✔
3424
    wlogger.debug("Work unit execution started"); // Throws
38,274✔
3425

19,212✔
3426
    if (work.has_primary_work) {
38,276✔
3427
        if (REALM_UNLIKELY(!m_work.file_ident_alloc_slots.empty()))
38,276✔
3428
            worker_allocate_file_identifiers(); // Throws
20,446✔
3429

19,212✔
3430
        if (!m_work.changesets_from_downstream.empty())
38,276✔
3431
            worker_integrate_changes_from_downstream(state); // Throws
36,470✔
3432
    }
38,276✔
3433

19,212✔
3434
    wlogger.debug("Work unit execution completed"); // Throws
38,274✔
3435

19,212✔
3436
    milliseconds_type time = steady_duration(start_time);
38,274✔
3437
    milliseconds_type seq_time = time - parallel_time;
38,274✔
3438
    m_server.m_seq_time.fetch_add(seq_time, std::memory_order_relaxed);
38,274✔
3439
    m_server.m_par_time.fetch_add(parallel_time, std::memory_order_relaxed);
38,274✔
3440

19,212✔
3441
    // Pass control back to the network event loop thread
19,212✔
3442
    network::Service& service = m_server.get_service();
38,274✔
3443
    service.post([this](Status) {
38,164✔
3444
        // FIXME: The safety of capturing `this` here, relies on the fact
18,972✔
3445
        // that ServerFile objects currently are not destroyed until the
18,972✔
3446
        // server object is destroyed.
18,972✔
3447
        group_postprocess_stage_1(); // Throws
37,924✔
3448
        // Suicide may have happened at this point
18,972✔
3449
    }); // Throws
37,924✔
3450
}
38,274✔
3451

3452

3453
void ServerFile::on_changesets_from_downstream_added(std::size_t num_changesets, std::size_t num_bytes)
3454
{
22,522✔
3455
    m_num_changesets_from_downstream += num_changesets;
22,522✔
3456

11,630✔
3457
    if (num_bytes > 0) {
22,522✔
3458
        m_blocked_changesets_from_downstream_byte_size += num_bytes;
22,522✔
3459
        get_server().inc_byte_size_for_pending_downstream_changesets(num_bytes); // Throws
22,522✔
3460
    }
22,522✔
3461

11,630✔
3462
    on_work_added(); // Throws
22,522✔
3463
}
22,522✔
3464

3465

3466
void ServerFile::on_work_added()
3467
{
46,254✔
3468
    if (m_has_blocked_work)
46,254✔
3469
        return;
7,838✔
3470
    m_has_blocked_work = true;
38,416✔
3471
    // Reference file
19,248✔
3472
    if (m_has_work_in_progress)
38,416✔
3473
        return;
11,074✔
3474
    group_unblock_work(); // Throws
27,342✔
3475
}
27,342✔
3476

3477

3478
void ServerFile::group_unblock_work()
3479
{
38,382✔
3480
    REALM_ASSERT(!m_has_work_in_progress);
38,382✔
3481
    if (REALM_LIKELY(!m_server.is_sync_stopped())) {
38,382✔
3482
        unblock_work(); // Throws
38,378✔
3483
        const Work& work = m_work;
38,378✔
3484
        if (REALM_LIKELY(work.has_primary_work)) {
38,378✔
3485
            logger.trace("Work unit unblocked"); // Throws
38,292✔
3486
            m_has_work_in_progress = true;
38,292✔
3487
            Worker& worker = m_server.get_worker();
38,292✔
3488
            worker.enqueue(this); // Throws
38,292✔
3489
        }
38,292✔
3490
    }
38,378✔
3491
}
38,382✔
3492

3493

3494
void ServerFile::unblock_work()
3495
{
38,380✔
3496
    REALM_ASSERT(m_has_blocked_work);
38,380✔
3497

19,216✔
3498
    m_work.reset();
38,380✔
3499

19,216✔
3500
    // Discard requests for file identifiers whose receiver is no longer
19,216✔
3501
    // waiting.
19,216✔
3502
    {
38,380✔
3503
        auto i = m_file_ident_requests.begin();
38,380✔
3504
        auto end = m_file_ident_requests.end();
38,380✔
3505
        while (i != end) {
40,720✔
3506
            auto j = i++;
2,340✔
3507
            const FileIdentRequestInfo& info = j->second;
2,340✔
3508
            if (!info.receiver)
2,340✔
3509
                m_file_ident_requests.erase(j);
496✔
3510
        }
2,340✔
3511
    }
38,380✔
3512
    std::size_t n = m_file_ident_requests.size();
38,380✔
3513
    if (n > 0) {
38,380✔
3514
        m_work.file_ident_alloc_slots.resize(n); // Throws
1,808✔
3515
        std::size_t i = 0;
1,808✔
3516
        for (const auto& pair : m_file_ident_requests) {
1,844✔
3517
            const FileIdentRequestInfo& info = pair.second;
1,844✔
3518
            FileIdentAllocSlot& slot = m_work.file_ident_alloc_slots[i];
1,844✔
3519
            slot.proxy_file = info.proxy_file;
1,844✔
3520
            slot.client_type = info.client_type;
1,844✔
3521
            ++i;
1,844✔
3522
        }
1,844✔
3523
        m_work.has_primary_work = true;
1,808✔
3524
    }
1,808✔
3525

19,216✔
3526
    // FIXME: `ServerFile::m_changesets_from_downstream` and
19,216✔
3527
    // `Work::changesets_from_downstream` should be renamed to something else,
19,216✔
3528
    // as it may contain kinds of data other than changesets.
19,216✔
3529

19,216✔
3530
    using std::swap;
38,380✔
3531
    swap(m_changesets_from_downstream, m_work.changesets_from_downstream);
38,380✔
3532
    m_work.have_changesets_from_downstream = (m_num_changesets_from_downstream > 0);
38,380✔
3533
    bool has_changesets = !m_work.changesets_from_downstream.empty();
38,380✔
3534
    if (has_changesets) {
38,380✔
3535
        m_work.has_primary_work = true;
36,484✔
3536
    }
36,484✔
3537

19,216✔
3538
    // Keep track of the size of pending changesets
19,216✔
3539
    REALM_ASSERT(m_unblocked_changesets_from_downstream_byte_size == 0);
38,380✔
3540
    m_unblocked_changesets_from_downstream_byte_size = m_blocked_changesets_from_downstream_byte_size;
38,380✔
3541
    m_blocked_changesets_from_downstream_byte_size = 0;
38,380✔
3542

19,216✔
3543
    m_num_changesets_from_downstream = 0;
38,380✔
3544
    m_has_blocked_work = false;
38,380✔
3545
}
38,380✔
3546

3547

3548
void ServerFile::resume_download() noexcept
3549
{
22,532✔
3550
    for (const auto& entry : m_identified_sessions) {
36,258✔
3551
        Session& sess = *entry.second;
36,258✔
3552
        sess.ensure_enlisted_to_send();
36,258✔
3553
    }
36,258✔
3554
}
22,532✔
3555

3556

3557
void ServerFile::recognize_external_change()
3558
{
4,800✔
3559
    VersionInfo prev_version_info = m_version_info;
4,800✔
3560
    const ServerHistory& history = access().history;       // Throws
4,800✔
3561
    bool has_upstream_status;                              // Dummy
4,800✔
3562
    sync::file_ident_type partial_file_ident;              // Dummy
4,800✔
3563
    sync::version_type partial_progress_reference_version; // Dummy
4,800✔
3564
    history.get_status(m_version_info, has_upstream_status, partial_file_ident,
4,800✔
3565
                       partial_progress_reference_version); // Throws
4,800✔
3566

2,400✔
3567
    REALM_ASSERT(m_version_info.realm_version >= prev_version_info.realm_version);
4,800✔
3568
    REALM_ASSERT(m_version_info.sync_version.version >= prev_version_info.sync_version.version);
4,800✔
3569
    if (m_version_info.sync_version.version > prev_version_info.sync_version.version) {
4,800✔
3570
        REALM_ASSERT(m_version_info.realm_version > prev_version_info.realm_version);
4,800✔
3571
        resume_download();
4,800✔
3572
    }
4,800✔
3573
}
4,800✔
3574

3575

3576
// NOTE: This function is executed by the worker thread
3577
void ServerFile::worker_allocate_file_identifiers()
3578
{
1,806✔
3579
    Work& work = m_work;
1,806✔
3580
    REALM_ASSERT(!work.file_ident_alloc_slots.empty());
1,806✔
3581
    ServerHistory& hist = worker_access().history;                                      // Throws
1,806✔
3582
    hist.allocate_file_identifiers(m_work.file_ident_alloc_slots, m_work.version_info); // Throws
1,806✔
3583
    m_work.produced_new_realm_version = true;
1,806✔
3584
}
1,806✔
3585

3586

3587
// Returns true when, and only when this function produces a new sync version
3588
// (adds a new entry to the sync history).
3589
//
3590
// NOTE: This function is executed by the worker thread
3591
bool ServerFile::worker_integrate_changes_from_downstream(WorkerState& state)
3592
{
36,470✔
3593
    REALM_ASSERT(!m_work.changesets_from_downstream.empty());
36,470✔
3594

18,640✔
3595
    std::unique_ptr<ServerHistory> hist_ptr;
36,470✔
3596
    DBRef sg_ptr;
36,470✔
3597
    ServerHistory& hist = get_client_file_history(state, hist_ptr, sg_ptr);
36,470✔
3598
    bool backup_whole_realm = false;
36,470✔
3599
    bool produced_new_realm_version = hist.integrate_client_changesets(
36,470✔
3600
        m_work.changesets_from_downstream, m_work.version_info, backup_whole_realm, m_work.integration_result,
36,470✔
3601
        wlogger); // Throws
36,470✔
3602
    bool produced_new_sync_version = !m_work.integration_result.integrated_changesets.empty();
36,470✔
3603
    REALM_ASSERT(!produced_new_sync_version || produced_new_realm_version);
36,470✔
3604
    if (produced_new_realm_version) {
36,470✔
3605
        m_work.produced_new_realm_version = true;
36,446✔
3606
        if (produced_new_sync_version) {
36,446✔
3607
            m_work.produced_new_sync_version = true;
17,740✔
3608
        }
17,740✔
3609
    }
36,446✔
3610
    return produced_new_sync_version;
36,470✔
3611
}
36,470✔
3612

3613
ServerHistory& ServerFile::get_client_file_history(WorkerState& state, std::unique_ptr<ServerHistory>& hist_ptr,
3614
                                                   DBRef& sg_ptr)
3615
{
36,468✔
3616
    if (state.use_file_cache)
36,468✔
3617
        return worker_access().history; // Throws
36,468✔
UNCOV
3618
    const std::string& path = m_worker_file.realm_path;
×
UNCOV
3619
    hist_ptr = m_server.make_history_for_path();                   // Throws
×
UNCOV
3620
    DBOptions options = m_worker_file.make_shared_group_options(); // Throws
×
UNCOV
3621
    sg_ptr = DB::create(*hist_ptr, path, options);                 // Throws
×
UNCOV
3622
    sg_ptr->claim_sync_agent();                                    // Throws
×
UNCOV
3623
    return *hist_ptr;                                              // Throws
×
UNCOV
3624
}
×
3625

3626

3627
// When worker thread finishes work unit.
3628
void ServerFile::group_postprocess_stage_1()
3629
{
37,924✔
3630
    REALM_ASSERT(m_has_work_in_progress);
37,924✔
3631

18,972✔
3632
    group_finalize_work_stage_1(); // Throws
37,924✔
3633
    group_finalize_work_stage_2(); // Throws
37,924✔
3634
    group_postprocess_stage_2();   // Throws
37,924✔
3635
}
37,924✔
3636

3637

3638
void ServerFile::group_postprocess_stage_2()
3639
{
37,924✔
3640
    REALM_ASSERT(m_has_work_in_progress);
37,924✔
3641
    group_postprocess_stage_3(); // Throws
37,924✔
3642
    // Suicide may have happened at this point
18,972✔
3643
}
37,924✔
3644

3645

3646
// When all files, including the reference file, have been backed up.
3647
void ServerFile::group_postprocess_stage_3()
3648
{
37,924✔
3649
    REALM_ASSERT(m_has_work_in_progress);
37,924✔
3650
    m_has_work_in_progress = false;
37,924✔
3651

18,972✔
3652
    logger.trace("Work unit postprocessing complete"); // Throws
37,924✔
3653
    if (m_has_blocked_work)
37,924✔
3654
        group_unblock_work(); // Throws
11,036✔
3655
}
37,924✔
3656

3657

3658
void ServerFile::finalize_work_stage_1()
3659
{
37,926✔
3660
    if (m_unblocked_changesets_from_downstream_byte_size > 0) {
37,926✔
3661
        // Report the byte size of completed downstream changesets.
9,292✔
3662
        std::size_t byte_size = m_unblocked_changesets_from_downstream_byte_size;
17,750✔
3663
        get_server().dec_byte_size_for_pending_downstream_changesets(byte_size); // Throws
17,750✔
3664
        m_unblocked_changesets_from_downstream_byte_size = 0;
17,750✔
3665
    }
17,750✔
3666

18,974✔
3667
    // Deal with errors (bad changesets) pertaining to downstream clients
18,974✔
3668
    std::size_t num_changesets_removed = 0;
37,926✔
3669
    std::size_t num_bytes_removed = 0;
37,926✔
3670
    for (const auto& entry : m_work.integration_result.excluded_client_files) {
18,982✔
3671
        file_ident_type client_file_ident = entry.first;
18✔
3672
        ExtendedIntegrationError error = entry.second;
18✔
3673
        ProtocolError error_2 = ProtocolError::other_session_error;
18✔
3674
        switch (error) {
18✔
3675
            case ExtendedIntegrationError::client_file_expired:
✔
3676
                logger.debug("Changeset integration failed: Client file entry "
×
3677
                             "expired during session"); // Throws
×
3678
                error_2 = ProtocolError::client_file_expired;
×
3679
                break;
×
3680
            case ExtendedIntegrationError::bad_origin_file_ident:
✔
3681
                error_2 = ProtocolError::bad_origin_file_ident;
×
3682
                break;
×
3683
            case ExtendedIntegrationError::bad_changeset:
18✔
3684
                error_2 = ProtocolError::bad_changeset;
18✔
3685
                break;
18✔
3686
        }
18✔
3687
        auto i = m_identified_sessions.find(client_file_ident);
18✔
3688
        if (i != m_identified_sessions.end()) {
18✔
3689
            Session& sess = *i->second;
18✔
3690
            SyncConnection& conn = sess.get_connection();
18✔
3691
            conn.protocol_error(error_2, &sess); // Throws
18✔
3692
        }
18✔
3693
        const IntegratableChangesetList& list = m_changesets_from_downstream[client_file_ident];
18✔
3694
        std::size_t num_changesets = list.changesets.size();
18✔
3695
        std::size_t num_bytes = 0;
18✔
3696
        for (const IntegratableChangeset& ic : list.changesets)
18✔
3697
            num_bytes += ic.changeset.size();
×
3698
        logger.info("Excluded %1 changesets of combined byte size %2 for client file %3", num_changesets, num_bytes,
18✔
3699
                    client_file_ident); // Throws
18✔
3700
        num_changesets_removed += num_changesets;
18✔
3701
        num_bytes_removed += num_bytes;
18✔
3702
        m_changesets_from_downstream.erase(client_file_ident);
18✔
3703
    }
18✔
3704

18,974✔
3705
    REALM_ASSERT(num_changesets_removed <= m_num_changesets_from_downstream);
37,926✔
3706
    REALM_ASSERT(num_bytes_removed <= m_blocked_changesets_from_downstream_byte_size);
37,926✔
3707

18,974✔
3708
    if (num_changesets_removed == 0)
37,926✔
3709
        return;
37,924✔
3710

2✔
3711
    m_num_changesets_from_downstream -= num_changesets_removed;
2✔
3712

2✔
3713
    // The byte size of the blocked changesets must be decremented.
2✔
3714
    if (num_bytes_removed > 0) {
2!
3715
        m_blocked_changesets_from_downstream_byte_size -= num_bytes_removed;
×
3716
        get_server().dec_byte_size_for_pending_downstream_changesets(num_bytes_removed); // Throws
×
3717
    }
×
3718
}
2✔
3719

3720

3721
void ServerFile::finalize_work_stage_2()
3722
{
37,924✔
3723
    // Expose new snapshot to remote peers
18,972✔
3724
    REALM_ASSERT(m_work.produced_new_realm_version || m_work.version_info.realm_version == 0);
37,924✔
3725
    if (m_work.version_info.realm_version > m_version_info.realm_version) {
37,924✔
3726
        REALM_ASSERT(m_work.version_info.sync_version.version >= m_version_info.sync_version.version);
37,906✔
3727
        m_version_info = m_work.version_info;
37,906✔
3728
    }
37,906✔
3729

18,972✔
3730
    bool resume_download_and_upload = m_work.produced_new_sync_version;
37,924✔
3731

18,972✔
3732
    // Deliver allocated file identifiers to requesters
18,972✔
3733
    REALM_ASSERT(m_file_ident_requests.size() >= m_work.file_ident_alloc_slots.size());
37,924✔
3734
    auto begin = m_file_ident_requests.begin();
37,924✔
3735
    auto i = begin;
37,924✔
3736
    for (const FileIdentAllocSlot& slot : m_work.file_ident_alloc_slots) {
20,218✔
3737
        FileIdentRequestInfo& info = i->second;
1,820✔
3738
        REALM_ASSERT(info.proxy_file == slot.proxy_file);
1,820✔
3739
        REALM_ASSERT(info.client_type == slot.client_type);
1,820✔
3740
        if (FileIdentReceiver* receiver = info.receiver) {
1,820✔
3741
            info.receiver = nullptr;
1,294✔
3742
            receiver->receive_file_ident(slot.file_ident); // Throws
1,294✔
3743
        }
1,294✔
3744
        ++i;
1,820✔
3745
    }
1,820✔
3746
    m_file_ident_requests.erase(begin, i);
37,924✔
3747

18,972✔
3748
    // Resume download to downstream clients
18,972✔
3749
    if (resume_download_and_upload) {
37,924✔
3750
        resume_download();
17,732✔
3751
    }
17,732✔
3752
}
37,924✔
3753

3754
// ============================ Worker implementation ============================
3755

3756
Worker::Worker(ServerImpl& server)
3757
    : logger{"Worker: ", server.logger_ptr} // Throws
3758
    , m_server{server}
3759
    , m_transformer{make_transformer()} // Throws
3760
    , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
3761
{
7,968✔
3762
    util::seed_prng_nondeterministically(m_random); // Throws
7,968✔
3763
}
7,968✔
3764

3765

3766
void Worker::enqueue(ServerFile* file)
3767
{
38,292✔
3768
    util::LockGuard lock{m_mutex};
38,292✔
3769
    m_queue.push_back(file); // Throws
38,292✔
3770
    m_cond.notify_all();
38,292✔
3771
}
38,292✔
3772

3773

3774
std::mt19937_64& Worker::server_history_get_random() noexcept
3775
{
2,876✔
3776
    return m_random;
2,876✔
3777
}
2,876✔
3778

3779

3780
sync::Transformer& Worker::get_transformer()
3781
{
19,378✔
3782
    return *m_transformer;
19,378✔
3783
}
19,378✔
3784

3785

3786
util::Buffer<char>& Worker::get_transform_buffer()
3787
{
×
3788
    return m_transform_buffer;
×
3789
}
×
3790

3791
void Worker::run()
3792
{
7,916✔
3793
    for (;;) {
46,192✔
3794
        ServerFile* file = nullptr;
46,192✔
3795
        {
46,192✔
3796
            util::LockGuard lock{m_mutex};
46,192✔
3797
            for (;;) {
91,788✔
3798
                if (REALM_UNLIKELY(m_stop))
91,788✔
3799
                    return;
49,868✔
3800
                if (!m_queue.empty()) {
83,872✔
3801
                    file = m_queue.front();
38,274✔
3802
                    m_queue.pop_front();
38,274✔
3803
                    break;
38,274✔
3804
                }
38,274✔
3805
                m_cond.wait(lock);
45,598✔
3806
            }
45,598✔
3807
        }
46,192✔
3808
        file->worker_process_work_unit(m_state); // Throws
42,182✔
3809
    }
38,276✔
3810
}
7,916✔
3811

3812

3813
void Worker::stop() noexcept
3814
{
7,916✔
3815
    util::LockGuard lock{m_mutex};
7,916✔
3816
    m_stop = true;
7,916✔
3817
    m_cond.notify_all();
7,916✔
3818
}
7,916✔
3819

3820

3821
// ============================ ServerImpl implementation ============================
3822

3823

3824
ServerImpl::ServerImpl(const std::string& root_dir, util::Optional<sync::PKey> pkey, Server::Config config)
3825
    : logger_ptr{config.logger ? std::move(config.logger) : Logger::get_default_logger()}
3826
    , logger{*logger_ptr}
3827
    , m_config{std::move(config)}
3828
    , m_max_upload_backlog{determine_max_upload_backlog(config)}
3829
    , m_root_dir{root_dir} // Throws
3830
    , m_access_control{std::move(pkey)}
3831
    , m_protocol_version_range{determine_protocol_version_range(config)}                 // Throws
3832
    , m_file_access_cache{m_config.max_open_files, logger, *this, config.encryption_key} // Throws
3833
    , m_worker{*this}                                                                    // Throws
3834
    , m_acceptor{get_service()}
3835
    , m_server_protocol{}       // Throws
3836
    , m_compress_memory_arena{} // Throws
3837
{
7,966✔
3838
    if (m_config.ssl) {
7,966✔
3839
        m_ssl_context = std::make_unique<network::ssl::Context>();                // Throws
24✔
3840
        m_ssl_context->use_certificate_chain_file(m_config.ssl_certificate_path); // Throws
24✔
3841
        m_ssl_context->use_private_key_file(m_config.ssl_certificate_key_path);   // Throws
24✔
3842
    }
24✔
3843
}
7,966✔
3844

3845

3846
ServerImpl::~ServerImpl() noexcept
3847
{
7,968✔
3848
    bool server_destroyed_while_still_running = m_running;
7,968✔
3849
    REALM_ASSERT_RELEASE(!server_destroyed_while_still_running);
7,968✔
3850
}
7,968✔
3851

3852

3853
void ServerImpl::start()
3854
{
7,968✔
3855
    logger.info("Realm sync server started (%1)", REALM_VER_CHUNK); // Throws
7,968✔
3856
    logger.info("Supported protocol versions: %1-%2 (%3-%4 configured)",
7,968✔
3857
                ServerImplBase::get_oldest_supported_protocol_version(), get_current_protocol_version(),
7,968✔
3858
                m_protocol_version_range.first,
7,968✔
3859
                m_protocol_version_range.second); // Throws
7,968✔
3860
    logger.info("Platform: %1", util::get_platform_info());
7,968✔
3861
    bool is_debug_build = false;
7,968✔
3862
#if REALM_DEBUG
7,968✔
3863
    is_debug_build = true;
7,968✔
3864
#endif
7,968✔
3865
    {
7,968✔
3866
        const char* lead_text = "Build mode";
7,968✔
3867
        if (is_debug_build) {
7,968✔
3868
            logger.info("%1: Debug", lead_text); // Throws
7,968✔
3869
        }
7,968✔
3870
        else {
×
3871
            logger.info("%1: Release", lead_text); // Throws
×
3872
        }
×
3873
    }
7,968✔
3874
    if (is_debug_build) {
7,968✔
3875
        logger.warn("Build mode is Debug! CAN SEVERELY IMPACT PERFORMANCE - "
7,968✔
3876
                    "NOT RECOMMENDED FOR PRODUCTION"); // Throws
7,968✔
3877
    }
7,968✔
3878
    logger.info("Directory holding persistent state: %1", m_root_dir);        // Throws
7,968✔
3879
    logger.info("Maximum number of open files: %1", m_config.max_open_files); // Throws
7,968✔
3880
    {
7,968✔
3881
        const char* lead_text = "Encryption";
7,968✔
3882
        if (m_config.encryption_key) {
7,968✔
3883
            logger.info("%1: Yes", lead_text); // Throws
4✔
3884
        }
4✔
3885
        else {
7,964✔
3886
            logger.info("%1: No", lead_text); // Throws
7,964✔
3887
        }
7,964✔
3888
    }
7,968✔
3889
    logger.info("Log level: %1", logger.get_level_threshold()); // Throws
7,968✔
3890
    {
7,968✔
3891
        const char* lead_text = "Disable sync to disk";
7,968✔
3892
        if (m_config.disable_sync_to_disk) {
7,968✔
3893
            logger.info("%1: All files", lead_text); // Throws
7,276✔
3894
        }
7,276✔
3895
        else {
692✔
3896
            logger.info("%1: No", lead_text); // Throws
692✔
3897
        }
692✔
3898
    }
7,968✔
3899
    if (m_config.disable_sync_to_disk) {
7,968✔
3900
        logger.warn("Testing/debugging feature 'disable sync to disk' enabled - "
7,276✔
3901
                    "never do this in production!"); // Throws
7,276✔
3902
    }
7,276✔
3903
    logger.info("Download compaction: %1",
7,968✔
3904
                (m_config.disable_download_compaction ? "No" : "Yes")); // Throws
7,968✔
3905
    logger.info("Download bootstrap caching: %1",
7,968✔
3906
                (m_config.enable_download_bootstrap_cache ? "Yes" : "No"));                // Throws
7,968✔
3907
    logger.info("Max download size: %1 bytes", m_config.max_download_size);                // Throws
7,968✔
3908
    logger.info("Max upload backlog: %1 bytes", m_max_upload_backlog);                     // Throws
7,968✔
3909
    logger.info("HTTP request timeout: %1 ms", m_config.http_request_timeout);             // Throws
7,968✔
3910
    logger.info("HTTP response timeout: %1 ms", m_config.http_response_timeout);           // Throws
7,968✔
3911
    logger.info("Connection reaper timeout: %1 ms", m_config.connection_reaper_timeout);   // Throws
7,968✔
3912
    logger.info("Connection reaper interval: %1 ms", m_config.connection_reaper_interval); // Throws
7,968✔
3913
    logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout);      // Throws
7,968✔
3914
    logger.debug("Authorization header name: %1", m_config.authorization_header_name);     // Throws
7,968✔
3915

3,932✔
3916
    m_transformer = make_transformer(); // Throws
7,968✔
3917

3,932✔
3918
    m_realm_names = _impl::find_realm_files(m_root_dir); // Throws
7,968✔
3919

3,932✔
3920
    initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
7,968✔
3921

3,932✔
3922
    listen(); // Throws
7,968✔
3923
}
7,968✔
3924

3925

3926
void ServerImpl::run()
3927
{
7,916✔
3928
    auto ta = util::make_temp_assign(m_running, true);
7,916✔
3929

3,906✔
3930
    {
7,916✔
3931
        auto worker_thread = util::make_thread_exec_guard(m_worker, *this); // Throws
7,916✔
3932
        std::string name;
7,916✔
3933
        if (util::Thread::get_name(name)) {
7,916✔
3934
            name += "-worker";
7,916✔
3935
            worker_thread.start_with_signals_blocked(name); // Throws
7,916✔
3936
        }
7,916✔
3937
        else {
×
3938
            worker_thread.start_with_signals_blocked(); // Throws
×
3939
        }
×
3940

3,906✔
3941
        m_service.run(); // Throws
7,916✔
3942

3,906✔
3943
        worker_thread.stop_and_rethrow(); // Throws
7,916✔
3944
    }
7,916✔
3945

3,906✔
3946
    logger.info("Realm sync server stopped");
7,916✔
3947
}
7,916✔
3948

3949

3950
void ServerImpl::stop() noexcept
3951
{
8,810✔
3952
    util::LockGuard lock{m_mutex};
8,810✔
3953
    if (m_stopped)
8,810✔
3954
        return;
842✔
3955
    m_stopped = true;
7,968✔
3956
    m_wait_or_service_stopped_cond.notify_all();
7,968✔
3957
    m_service.stop();
7,968✔
3958
}
7,968✔
3959

3960

3961
void ServerImpl::inc_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3962
{
22,524✔
3963
    m_pending_changesets_from_downstream_byte_size += byte_size;
22,524✔
3964
    logger.debug("Byte size for pending downstream changesets incremented by "
22,524✔
3965
                 "%1 to reach a total of %2",
22,524✔
3966
                 byte_size,
22,524✔
3967
                 m_pending_changesets_from_downstream_byte_size); // Throws
22,524✔
3968
}
22,524✔
3969

3970

3971
void ServerImpl::dec_byte_size_for_pending_downstream_changesets(std::size_t byte_size)
3972
{
17,750✔
3973
    REALM_ASSERT(byte_size <= m_pending_changesets_from_downstream_byte_size);
17,750✔
3974
    m_pending_changesets_from_downstream_byte_size -= byte_size;
17,750✔
3975
    logger.debug("Byte size for pending downstream changesets decremented by "
17,750✔
3976
                 "%1 to reach a total of %2",
17,750✔
3977
                 byte_size,
17,750✔
3978
                 m_pending_changesets_from_downstream_byte_size); // Throws
17,750✔
3979
}
17,750✔
3980

3981

3982
std::mt19937_64& ServerImpl::server_history_get_random() noexcept
3983
{
1,088✔
3984
    return get_random();
1,088✔
3985
}
1,088✔
3986

3987

3988
Transformer& ServerImpl::get_transformer() noexcept
3989
{
×
3990
    return *m_transformer;
×
3991
}
×
3992

3993

3994
util::Buffer<char>& ServerImpl::get_transform_buffer() noexcept
3995
{
×
3996
    return m_transform_buffer;
×
3997
}
×
3998

3999

4000
void ServerImpl::listen()
4001
{
7,968✔
4002
    network::Resolver resolver{get_service()};
7,968✔
4003
    network::Resolver::Query query(m_config.listen_address, m_config.listen_port,
7,968✔
4004
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
7,968✔
4005
    network::Endpoint::List endpoints = resolver.resolve(query); // Throws
7,968✔
4006

3,932✔
4007
    auto i = endpoints.begin();
7,968✔
4008
    auto end = endpoints.end();
7,968✔
4009
    for (;;) {
7,968✔
4010
        std::error_code ec;
7,968✔
4011
        m_acceptor.open(i->protocol(), ec);
7,968✔
4012
        if (!ec) {
7,968✔
4013
            using SocketBase = network::SocketBase;
7,968✔
4014
            m_acceptor.set_option(SocketBase::reuse_address(m_config.reuse_address), ec);
7,968✔
4015
            if (!ec) {
7,968✔
4016
                m_acceptor.bind(*i, ec);
7,968✔
4017
                if (!ec)
7,968✔
4018
                    break;
7,968✔
4019
            }
×
4020
            m_acceptor.close();
×
4021
        }
×
4022
        if (i + 1 == end) {
3,932!
4023
            for (auto i2 = endpoints.begin(); i2 != i; ++i2) {
×
4024
                // FIXME: We don't have the error code for previous attempts, so
4025
                // can't print a nice message.
4026
                logger.error("Failed to bind to %1:%2", i2->address(),
×
4027
                             i2->port()); // Throws
×
4028
            }
×
4029
            logger.error("Failed to bind to %1:%2: %3", i->address(), i->port(),
×
4030
                         ec.message()); // Throws
×
4031
            throw std::runtime_error("Could not create a listening socket: All endpoints failed");
×
4032
        }
×
4033
    }
×
4034

3,932✔
4035
    m_acceptor.listen(m_config.listen_backlog);
7,968✔
4036

3,932✔
4037
    network::Endpoint local_endpoint = m_acceptor.local_endpoint();
7,968✔
4038
    const char* ssl_mode = (m_ssl_context ? "TLS" : "non-TLS");
7,958✔
4039
    logger.info("Listening on %1:%2 (max backlog is %3, %4)", local_endpoint.address(), local_endpoint.port(),
7,968✔
4040
                m_config.listen_backlog, ssl_mode); // Throws
7,968✔
4041

3,932✔
4042
    initiate_accept();
7,968✔
4043
}
7,968✔
4044

4045

4046
void ServerImpl::initiate_accept()
4047
{
9,910✔
4048
    auto handler = [this](std::error_code ec) {
5,874✔
4049
        if (ec != util::error::operation_aborted)
1,942✔
4050
            handle_accept(ec);
1,942✔
4051
    };
1,942✔
4052
    bool is_ssl = bool(m_ssl_context);
9,910✔
4053
    m_next_http_conn.reset(new HTTPConnection(*this, ++m_next_conn_id, is_ssl));                            // Throws
9,910✔
4054
    m_acceptor.async_accept(m_next_http_conn->get_socket(), m_next_http_conn_endpoint, std::move(handler)); // Throws
9,910✔
4055
}
9,910✔
4056

4057

4058
void ServerImpl::handle_accept(std::error_code ec)
4059
{
1,942✔
4060
    if (ec) {
1,942✔
4061
        if (ec != util::error::connection_aborted) {
×
4062
            REALM_ASSERT(ec != util::error::operation_aborted);
×
4063

4064
            // We close the reserved files to get a few extra file descriptors.
4065
            for (size_t i = 0; i < sizeof(m_reserved_files) / sizeof(m_reserved_files[0]); ++i) {
×
4066
                m_reserved_files[i].reset();
×
4067
            }
×
4068

4069
            // FIXME: There are probably errors that need to be treated
4070
            // specially, and not cause the server to "crash".
4071

4072
            if (ec == make_basic_system_error_code(EMFILE)) {
×
4073
                logger.error("Failed to accept a connection due to the file descriptor limit, "
×
4074
                             "consider increasing the limit in your system config"); // Throws
×
4075
                throw OutOfFilesError(ec);
×
4076
            }
×
4077
            else {
×
4078
                throw std::system_error(ec);
×
4079
            }
×
4080
        }
×
4081
        logger.debug("Skipping aborted connection"); // Throws
×
4082
    }
×
4083
    else {
1,942✔
4084
        HTTPConnection& conn = *m_next_http_conn;
1,942✔
4085
        if (m_config.tcp_no_delay)
1,942✔
4086
            conn.get_socket().set_option(network::SocketBase::no_delay(true));       // Throws
1,622✔
4087
        m_http_connections.emplace(conn.get_id(), std::move(m_next_http_conn));      // Throws
1,942✔
4088
        Formatter& formatter = m_misc_buffers.formatter;
1,942✔
4089
        formatter.reset();
1,942✔
4090
        formatter << "[" << m_next_http_conn_endpoint.address() << "]:" << m_next_http_conn_endpoint.port(); // Throws
1,942✔
4091
        std::string remote_endpoint = {formatter.data(), formatter.size()};                                  // Throws
1,942✔
4092
        conn.initiate(std::move(remote_endpoint));                                                           // Throws
1,942✔
4093
    }
1,942✔
4094
    initiate_accept(); // Throws
1,942✔
4095
}
1,942✔
4096

4097

4098
void ServerImpl::remove_http_connection(std::int_fast64_t conn_id) noexcept
4099
{
1,940✔
4100
    m_http_connections.erase(conn_id);
1,940✔
4101
}
1,940✔
4102

4103

4104
void ServerImpl::add_sync_connection(int_fast64_t connection_id, std::unique_ptr<SyncConnection>&& sync_conn)
4105
{
1,912✔
4106
    m_sync_connections.emplace(connection_id, std::move(sync_conn));
1,912✔
4107
}
1,912✔
4108

4109

4110
void ServerImpl::remove_sync_connection(int_fast64_t connection_id)
4111
{
1,110✔
4112
    m_sync_connections.erase(connection_id);
1,110✔
4113
}
1,110✔
4114

4115

4116
void ServerImpl::set_connection_reaper_timeout(milliseconds_type timeout)
4117
{
4✔
4118
    get_service().post([this, timeout](Status) {
4✔
4119
        m_config.connection_reaper_timeout = timeout;
4✔
4120
    });
4✔
4121
}
4✔
4122

4123

4124
void ServerImpl::close_connections()
4125
{
20✔
4126
    get_service().post([this](Status) {
20✔
4127
        do_close_connections(); // Throws
20✔
4128
    });
20✔
4129
}
20✔
4130

4131

4132
bool ServerImpl::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4133
{
72✔
4134
    return _impl::map_virt_to_real_realm_path(m_root_dir, virt_path, real_path); // Throws
72✔
4135
}
72✔
4136

4137

4138
void ServerImpl::recognize_external_change(const std::string& virt_path)
4139
{
4,800✔
4140
    std::string virt_path_2 = virt_path; // Throws (copy)
4,800✔
4141
    get_service().post([this, virt_path = std::move(virt_path_2)](Status) {
4,800✔
4142
        do_recognize_external_change(virt_path); // Throws
4,800✔
4143
    });                                          // Throws
4,800✔
4144
}
4,800✔
4145

4146

4147
void ServerImpl::stop_sync_and_wait_for_backup_completion(
4148
    util::UniqueFunction<void(bool did_backup)> completion_handler, milliseconds_type timeout)
4149
{
×
4150
    logger.info("stop_sync_and_wait_for_backup_completion() called with "
×
4151
                "timeout = %1",
×
4152
                timeout); // Throws
×
4153

4154
    get_service().post([this, completion_handler = std::move(completion_handler), timeout](Status) mutable {
×
4155
        do_stop_sync_and_wait_for_backup_completion(std::move(completion_handler),
×
4156
                                                    timeout); // Throws
×
4157
    });
×
4158
}
×
4159

4160

4161
void ServerImpl::initiate_connection_reaper_timer(milliseconds_type timeout)
4162
{
8,042✔
4163
    m_connection_reaper_timer.emplace(get_service());
8,042✔
4164
    m_connection_reaper_timer->async_wait(std::chrono::milliseconds(timeout), [this, timeout](Status status) {
4,006✔
4165
        if (status != ErrorCodes::OperationAborted) {
74✔
4166
            reap_connections();                        // Throws
74✔
4167
            initiate_connection_reaper_timer(timeout); // Throws
74✔
4168
        }
74✔
4169
    }); // Throws
74✔
4170
}
8,042✔
4171

4172

4173
void ServerImpl::reap_connections()
4174
{
74✔
4175
    logger.debug("Discarding dead connections"); // Throws
74✔
4176
    SteadyTimePoint now = steady_clock_now();
74✔
4177
    {
74✔
4178
        auto end = m_http_connections.end();
74✔
4179
        auto i = m_http_connections.begin();
74✔
4180
        while (i != end) {
74✔
4181
            HTTPConnection& conn = *i->second;
×
4182
            ++i;
×
4183
            // Suicide
4184
            conn.terminate_if_dead(now); // Throws
×
4185
        }
×
4186
    }
74✔
4187
    {
74✔
4188
        auto end = m_sync_connections.end();
74✔
4189
        auto i = m_sync_connections.begin();
74✔
4190
        while (i != end) {
144✔
4191
            SyncConnection& conn = *i->second;
70✔
4192
            ++i;
70✔
4193
            // Suicide
64✔
4194
            conn.terminate_if_dead(now); // Throws
70✔
4195
        }
70✔
4196
    }
74✔
4197
}
74✔
4198

4199

4200
void ServerImpl::do_close_connections()
4201
{
20✔
4202
    for (auto& entry : m_sync_connections) {
20✔
4203
        SyncConnection& conn = *entry.second;
20✔
4204
        conn.initiate_soft_close(); // Throws
20✔
4205
    }
20✔
4206
}
20✔
4207

4208

4209
void ServerImpl::do_recognize_external_change(const std::string& virt_path)
4210
{
4,800✔
4211
    auto i = m_files.find(virt_path);
4,800✔
4212
    if (i == m_files.end())
4,800✔
UNCOV
4213
        return;
×
4214
    ServerFile& file = *i->second;
4,800✔
4215
    file.recognize_external_change();
4,800✔
4216
}
4,800✔
4217

4218

4219
void ServerImpl::do_stop_sync_and_wait_for_backup_completion(
4220
    util::UniqueFunction<void(bool did_complete)> completion_handler, milliseconds_type timeout)
4221
{
×
4222
    static_cast<void>(timeout);
×
4223
    if (m_sync_stopped)
×
4224
        return;
×
4225
    do_close_connections(); // Throws
×
4226
    m_sync_stopped = true;
×
4227
    bool completion_reached = false;
×
4228
    completion_handler(completion_reached); // Throws
×
4229
}
×
4230

4231

4232
// ============================ SyncConnection implementation ============================
4233

4234
SyncConnection::~SyncConnection() noexcept
4235
{
1,912✔
4236
    m_sessions_enlisted_to_send.clear();
1,912✔
4237
    m_sessions.clear();
1,912✔
4238
}
1,912✔
4239

4240

4241
void SyncConnection::initiate()
4242
{
1,912✔
4243
    m_last_activity_at = steady_clock_now();
1,912✔
4244
    logger.debug("Sync Connection initiated");
1,912✔
4245
    m_websocket.initiate_server_websocket_after_handshake();
1,912✔
4246
    send_log_message(util::Logger::Level::info, "Client connection established with server", 0,
1,912✔
4247
                     m_appservices_request_id);
1,912✔
4248
}
1,912✔
4249

4250

4251
template <class... Params>
4252
void SyncConnection::terminate(Logger::Level log_level, const char* log_message, Params... log_params)
4253
{
1,110✔
4254
    terminate_sessions();                              // Throws
1,110✔
4255
    logger.log(log_level, log_message, log_params...); // Throws
1,110✔
4256
    m_websocket.stop();
1,110✔
4257
    m_ssl_stream.reset();
1,110✔
4258
    m_socket.reset();
1,110✔
4259
    // Suicide
592✔
4260
    m_server.remove_sync_connection(m_id);
1,110✔
4261
}
1,110✔
4262

4263

4264
void SyncConnection::terminate_if_dead(SteadyTimePoint now)
4265
{
70✔
4266
    milliseconds_type time = steady_duration(m_last_activity_at, now);
70✔
4267
    const Server::Config& config = m_server.get_config();
70✔
4268
    if (m_is_closing) {
70✔
4269
        if (time >= config.soft_close_timeout) {
×
4270
            // Suicide
4271
            terminate(Logger::Level::detail,
×
4272
                      "Sync connection closed (timeout during soft close)"); // Throws
×
4273
        }
×
4274
    }
×
4275
    else {
70✔
4276
        if (time >= config.connection_reaper_timeout) {
70✔
4277
            // Suicide
2✔
4278
            terminate(Logger::Level::detail,
4✔
4279
                      "Sync connection closed (no heartbeat)"); // Throws
4✔
4280
        }
4✔
4281
    }
70✔
4282
}
70✔
4283

4284

4285
void SyncConnection::enlist_to_send(Session* sess) noexcept
4286
{
107,428✔
4287
    REALM_ASSERT(m_send_trigger);
107,428✔
4288
    REALM_ASSERT(!m_is_closing);
107,428✔
4289
    REALM_ASSERT(!sess->is_enlisted_to_send());
107,428✔
4290
    m_sessions_enlisted_to_send.push_back(sess);
107,428✔
4291
    m_send_trigger->trigger();
107,428✔
4292
}
107,428✔
4293

4294

4295
void SyncConnection::handle_protocol_error(Status status)
4296
{
×
4297
    logger.error("%1", status);
×
4298
    switch (status.code()) {
×
4299
        case ErrorCodes::SyncProtocolInvariantFailed:
×
4300
            protocol_error(ProtocolError::bad_syntax); // Throws
×
4301
            break;
×
4302
        case ErrorCodes::LimitExceeded:
×
4303
            protocol_error(ProtocolError::limits_exceeded); // Throws
×
4304
            break;
×
4305
        default:
×
4306
            protocol_error(ProtocolError::other_error);
×
4307
            break;
×
4308
    }
×
4309
}
×
4310

4311
void SyncConnection::receive_bind_message(session_ident_type session_ident, std::string path,
4312
                                          std::string signed_user_token, bool need_client_file_ident,
4313
                                          bool is_subserver)
4314
{
5,644✔
4315
    auto p = m_sessions.emplace(session_ident, nullptr); // Throws
5,644✔
4316
    bool was_inserted = p.second;
5,644✔
4317
    if (REALM_UNLIKELY(!was_inserted)) {
5,644✔
4318
        logger.error("Overlapping reuse of session identifier %1 in BIND message",
×
4319
                     session_ident);                           // Throws
×
4320
        protocol_error(ProtocolError::reuse_of_session_ident); // Throws
×
4321
        return;
×
4322
    }
×
4323
    try {
5,644✔
4324
        p.first->second.reset(new Session(*this, session_ident)); // Throws
5,644✔
4325
    }
5,644✔
4326
    catch (...) {
2,814✔
4327
        m_sessions.erase(p.first);
×
4328
        throw;
×
4329
    }
×
4330

2,814✔
4331
    Session& sess = *p.first->second;
5,644✔
4332
    sess.initiate(); // Throws
5,644✔
4333
    ProtocolError error;
5,644✔
4334
    bool success =
5,644✔
4335
        sess.receive_bind_message(std::move(path), std::move(signed_user_token), need_client_file_ident, is_subserver,
5,644✔
4336
                                  error); // Throws
5,644✔
4337
    if (REALM_UNLIKELY(!success))         // Throws
5,644✔
4338
        protocol_error(error, &sess);     // Throws
2,828✔
4339
}
5,644✔
4340

4341

4342
void SyncConnection::receive_ident_message(session_ident_type session_ident, file_ident_type client_file_ident,
4343
                                           salt_type client_file_ident_salt, version_type scan_server_version,
4344
                                           version_type scan_client_version, version_type latest_server_version,
4345
                                           salt_type latest_server_version_salt)
4346
{
4,110✔
4347
    auto i = m_sessions.find(session_ident);
4,110✔
4348
    if (REALM_UNLIKELY(i == m_sessions.end())) {
4,110✔
4349
        bad_session_ident("IDENT", session_ident); // Throws
×
4350
        return;
×
4351
    }
×
4352
    Session& sess = *i->second;
4,110✔
4353
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
4,110✔
4354
        message_after_unbind("IDENT", session_ident); // Throws
×
4355
        return;
×
4356
    }
×
4357
    if (REALM_UNLIKELY(sess.error_occurred())) {
4,110✔
4358
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
8✔
4359
        // messages, other than UNBIND, must be ignored.
8✔
4360
        return;
16✔
4361
    }
16✔
4362
    if (REALM_UNLIKELY(sess.must_send_ident_message())) {
4,094✔
4363
        logger.error("Received IDENT message before IDENT message was sent"); // Throws
×
4364
        protocol_error(ProtocolError::bad_message_order);                     // Throws
×
4365
        return;
×
4366
    }
×
4367
    if (REALM_UNLIKELY(sess.ident_message_received())) {
4,094✔
4368
        logger.error("Received second IDENT message for session"); // Throws
×
4369
        protocol_error(ProtocolError::bad_message_order);          // Throws
×
4370
        return;
×
4371
    }
×
4372

2,084✔
4373
    ProtocolError error = {};
4,094✔
4374
    bool success = sess.receive_ident_message(client_file_ident, client_file_ident_salt, scan_server_version,
4,094✔
4375
                                              scan_client_version, latest_server_version, latest_server_version_salt,
4,094✔
4376
                                              error); // Throws
4,094✔
4377
    if (REALM_UNLIKELY(!success))                     // Throws
4,094✔
4378
        protocol_error(error, &sess);                 // Throws
2,100✔
4379
}
4,094✔
4380

4381
void SyncConnection::receive_upload_message(session_ident_type session_ident, version_type progress_client_version,
4382
                                            version_type progress_server_version, version_type locked_server_version,
4383
                                            const UploadChangesets& upload_changesets)
4384
{
43,866✔
4385
    auto i = m_sessions.find(session_ident);
43,866✔
4386
    if (REALM_UNLIKELY(i == m_sessions.end())) {
43,866✔
4387
        bad_session_ident("UPLOAD", session_ident); // Throws
×
4388
        return;
×
4389
    }
×
4390
    Session& sess = *i->second;
43,866✔
4391
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
43,866✔
4392
        message_after_unbind("UPLOAD", session_ident); // Throws
×
4393
        return;
×
4394
    }
×
4395
    if (REALM_UNLIKELY(sess.error_occurred())) {
43,866✔
4396
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
4397
        // messages, other than UNBIND, must be ignored.
4398
        return;
×
4399
    }
×
4400
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
43,866✔
4401
        message_before_ident("UPLOAD", session_ident); // Throws
×
4402
        return;
×
4403
    }
×
4404

22,212✔
4405
    ProtocolError error = {};
43,866✔
4406
    bool success = sess.receive_upload_message(progress_client_version, progress_server_version,
43,866✔
4407
                                               locked_server_version, upload_changesets, error); // Throws
43,866✔
4408
    if (REALM_UNLIKELY(!success))                                                                // Throws
43,866✔
4409
        protocol_error(error, &sess);                                                            // Throws
22,212✔
4410
}
43,866✔
4411

4412

4413
void SyncConnection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
4414
{
12,134✔
4415
    auto i = m_sessions.find(session_ident);
12,134✔
4416
    if (REALM_UNLIKELY(i == m_sessions.end())) {
12,134✔
4417
        bad_session_ident("MARK", session_ident);
×
4418
        return;
×
4419
    }
×
4420
    Session& sess = *i->second;
12,134✔
4421
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
12,134✔
4422
        message_after_unbind("MARK", session_ident); // Throws
×
4423
        return;
×
4424
    }
×
4425
    if (REALM_UNLIKELY(sess.error_occurred())) {
12,134✔
4426
        // Protocol state is SendError or WaitForUnbindErr. In these states, all
24✔
4427
        // messages, other than UNBIND, must be ignored.
24✔
4428
        return;
48✔
4429
    }
48✔
4430
    if (REALM_UNLIKELY(!sess.ident_message_received())) {
12,086✔
4431
        message_before_ident("MARK", session_ident); // Throws
×
4432
        return;
×
4433
    }
×
4434

5,978✔
4435
    ProtocolError error;
12,086✔
4436
    bool success = sess.receive_mark_message(request_ident, error); // Throws
12,086✔
4437
    if (REALM_UNLIKELY(!success))                                   // Throws
12,086✔
4438
        protocol_error(error, &sess);                               // Throws
5,978✔
4439
}
12,086✔
4440

4441

4442
void SyncConnection::receive_unbind_message(session_ident_type session_ident)
4443
{
2,966✔
4444
    auto i = m_sessions.find(session_ident); // Throws
2,966✔
4445
    if (REALM_UNLIKELY(i == m_sessions.end())) {
2,966✔
4446
        bad_session_ident("UNBIND", session_ident); // Throws
×
4447
        return;
×
4448
    }
×
4449
    Session& sess = *i->second;
2,966✔
4450
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
2,966✔
4451
        message_after_unbind("UNBIND", session_ident); // Throws
×
4452
        return;
×
4453
    }
×
4454

1,418✔
4455
    sess.receive_unbind_message(); // Throws
2,966✔
4456
    // NOTE: The session might have gotten destroyed at this time!
1,418✔
4457
}
2,966✔
4458

4459

4460
void SyncConnection::receive_ping(milliseconds_type timestamp, milliseconds_type rtt)
4461
{
146✔
4462
    logger.debug("Received: PING(timestamp=%1, rtt=%2)", timestamp, rtt); // Throws
146✔
4463
    m_send_pong = true;
146✔
4464
    m_last_ping_timestamp = timestamp;
146✔
4465
    if (!m_is_sending)
146✔
4466
        send_next_message();
144✔
4467
}
146✔
4468

4469

4470
void SyncConnection::receive_error_message(session_ident_type session_ident, int error_code,
4471
                                           std::string_view error_body)
4472
{
×
4473
    logger.debug("Received: ERROR(error_code=%1, message_size=%2, session_ident=%3)", error_code, error_body.size(),
×
4474
                 session_ident); // Throws
×
4475
    auto i = m_sessions.find(session_ident);
×
4476
    if (REALM_UNLIKELY(i == m_sessions.end())) {
×
4477
        bad_session_ident("ERROR", session_ident);
×
4478
        return;
×
4479
    }
×
4480
    Session& sess = *i->second;
×
4481
    if (REALM_UNLIKELY(sess.unbind_message_received())) {
×
4482
        message_after_unbind("ERROR", session_ident); // Throws
×
4483
        return;
×
4484
    }
×
4485

4486
    sess.receive_error_message(session_ident, error_code, error_body); // Throws
×
4487
}
×
4488

4489
void SyncConnection::send_log_message(util::Logger::Level level, const std::string&& message,
4490
                                      session_ident_type sess_ident, std::optional<std::string> co_id)
4491
{
5,974✔
4492
    if (get_client_protocol_version() < SyncConnection::SERVER_LOG_PROTOCOL_VERSION) {
5,974✔
4493
        return logger.log(level, message.c_str());
×
4494
    }
×
4495

3,010✔
4496
    LogMessage log_msg{sess_ident, level, std::move(message), std::move(co_id)};
5,974✔
4497
    {
5,974✔
4498
        std::lock_guard lock(m_log_mutex);
5,974✔
4499
        m_log_messages.push(std::move(log_msg));
5,974✔
4500
    }
5,974✔
4501
    m_send_trigger->trigger();
5,974✔
4502
}
5,974✔
4503

4504

4505
void SyncConnection::bad_session_ident(const char* message_type, session_ident_type session_ident)
4506
{
×
4507
    logger.error("Bad session identifier in %1 message, session_ident = %2", message_type,
×
4508
                 session_ident);                      // Throws
×
4509
    protocol_error(ProtocolError::bad_session_ident); // Throws
×
4510
}
×
4511

4512

4513
void SyncConnection::message_after_unbind(const char* message_type, session_ident_type session_ident)
4514
{
×
4515
    logger.error("Received %1 message after UNBIND message, session_ident = %2", message_type,
×
4516
                 session_ident);                      // Throws
×
4517
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4518
}
×
4519

4520

4521
void SyncConnection::message_before_ident(const char* message_type, session_ident_type session_ident)
4522
{
×
4523
    logger.error("Received %1 message before IDENT message, session_ident = %2", message_type,
×
4524
                 session_ident);                      // Throws
×
4525
    protocol_error(ProtocolError::bad_message_order); // Throws
×
4526
}
×
4527

4528

4529
void SyncConnection::handle_message_received(const char* data, size_t size)
4530
{
68,868✔
4531
    // parse_message_received() parses the message and calls the
34,578✔
4532
    // proper handler on the SyncConnection object (this).
34,578✔
4533
    get_server_protocol().parse_message_received<SyncConnection>(*this, std::string_view(data, size));
68,868✔
4534
    return;
68,868✔
4535
}
68,868✔
4536

4537

4538
void SyncConnection::handle_ping_received(const char* data, size_t size)
4539
{
×
4540
    // parse_message_received() parses the message and calls the
4541
    // proper handler on the SyncConnection object (this).
4542
    get_server_protocol().parse_ping_received<SyncConnection>(*this, std::string_view(data, size));
×
4543
    return;
×
4544
}
×
4545

4546

4547
void SyncConnection::send_next_message()
4548
{
104,010✔
4549
    REALM_ASSERT(!m_is_sending);
104,010✔
4550
    REALM_ASSERT(!m_sending_pong);
104,010✔
4551
    if (m_send_pong) {
104,010✔
4552
        send_pong(m_last_ping_timestamp);
146✔
4553
        if (m_sending_pong)
146✔
4554
            return;
146✔
4555
    }
103,864✔
4556
    for (;;) {
154,482✔
4557
        Session* sess = m_sessions_enlisted_to_send.pop_front();
154,482✔
4558
        if (!sess) {
154,482✔
4559
            // No sessions were enlisted to send
24,484✔
4560
            if (REALM_LIKELY(!m_is_closing))
47,568✔
4561
                break; // Check to see if there are any log messages to go out
47,558✔
4562
            // Send a connection level ERROR
10✔
4563
            REALM_ASSERT(!is_session_level_error(m_error_code));
20✔
4564
            initiate_write_error(m_error_code, m_error_session_ident); // Throws
20✔
4565
            return;
20✔
4566
        }
20✔
4567
        sess->send_message(); // Throws
106,914✔
4568
        // NOTE: The session might have gotten destroyed at this time!
56,622✔
4569

56,622✔
4570
        // At this point, `m_is_sending` is true if, and only if the session
56,622✔
4571
        // chose to send a message. If it chose to not send a message, we must
56,622✔
4572
        // loop back and give the next session in `m_sessions_enlisted_to_send`
56,622✔
4573
        // a chance.
56,622✔
4574
        if (m_is_sending)
106,914✔
4575
            return;
56,300✔
4576
    }
106,914✔
4577
    {
77,052✔
4578
        std::lock_guard lock(m_log_mutex);
47,544✔
4579
        if (!m_log_messages.empty()) {
47,544✔
4580
            send_log_message(m_log_messages.front());
5,914✔
4581
            m_log_messages.pop();
5,914✔
4582
        }
5,914✔
4583
    }
47,544✔
4584
    // Otherwise, nothing to do
24,474✔
4585
}
47,544✔
4586

4587

4588
void SyncConnection::initiate_write_output_buffer()
4589
{
62,214✔
4590
    auto handler = [this]() {
62,100✔
4591
        handle_write_output_buffer();
62,022✔
4592
    };
62,022✔
4593

32,482✔
4594
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
62,214✔
4595
                                   std::move(handler)); // Throws
62,214✔
4596
    m_is_sending = true;
62,214✔
4597
}
62,214✔
4598

4599

4600
void SyncConnection::initiate_pong_output_buffer()
4601
{
146✔
4602
    auto handler = [this]() {
146✔
4603
        handle_pong_output_buffer();
146✔
4604
    };
146✔
4605

42✔
4606
    REALM_ASSERT(!m_is_sending);
146✔
4607
    REALM_ASSERT(!m_sending_pong);
146✔
4608
    m_websocket.async_write_binary(m_output_buffer.data(), m_output_buffer.size(),
146✔
4609
                                   std::move(handler)); // Throws
146✔
4610

42✔
4611
    m_is_sending = true;
146✔
4612
    m_sending_pong = true;
146✔
4613
}
146✔
4614

4615

4616
void SyncConnection::send_pong(milliseconds_type timestamp)
4617
{
146✔
4618
    REALM_ASSERT(m_send_pong);
146✔
4619
    REALM_ASSERT(!m_sending_pong);
146✔
4620
    m_send_pong = false;
146✔
4621
    logger.debug("Sending: PONG(timestamp=%1)", timestamp); // Throws
146✔
4622

42✔
4623
    OutputBuffer& out = get_output_buffer();
146✔
4624
    get_server_protocol().make_pong(out, timestamp); // Throws
146✔
4625

42✔
4626
    initiate_pong_output_buffer(); // Throws
146✔
4627
}
146✔
4628

4629
void SyncConnection::send_log_message(const LogMessage& log_msg)
4630
{
5,914✔
4631
    OutputBuffer& out = get_output_buffer();
5,914✔
4632
    get_server_protocol().make_log_message(out, log_msg.level, log_msg.message, log_msg.sess_ident,
5,914✔
4633
                                           log_msg.co_id); // Throws
5,914✔
4634

2,984✔
4635
    initiate_write_output_buffer(); // Throws
5,914✔
4636
}
5,914✔
4637

4638

4639
void SyncConnection::handle_write_output_buffer()
4640
{
62,022✔
4641
    release_output_buffer();
62,022✔
4642
    m_is_sending = false;
62,022✔
4643
    send_next_message(); // Throws
62,022✔
4644
}
62,022✔
4645

4646

4647
void SyncConnection::handle_pong_output_buffer()
4648
{
146✔
4649
    release_output_buffer();
146✔
4650
    REALM_ASSERT(m_is_sending);
146✔
4651
    REALM_ASSERT(m_sending_pong);
146✔
4652
    m_is_sending = false;
146✔
4653
    m_sending_pong = false;
146✔
4654
    send_next_message(); // Throws
146✔
4655
}
146✔
4656

4657

4658
void SyncConnection::initiate_write_error(ProtocolError error_code, session_ident_type session_ident)
4659
{
20✔
4660
    const char* message = get_protocol_error_message(int(error_code));
20✔
4661
    std::size_t message_size = std::strlen(message);
20✔
4662
    bool try_again = determine_try_again(error_code);
20✔
4663

10✔
4664
    logger.detail("Sending: ERROR(error_code=%1, message_size=%2, try_again=%3, session_ident=%4)", int(error_code),
20✔
4665
                  message_size, try_again, session_ident); // Throws
20✔
4666

10✔
4667
    OutputBuffer& out = get_output_buffer();
20✔
4668
    int protocol_version = get_client_protocol_version();
20✔
4669
    get_server_protocol().make_error_message(protocol_version, out, error_code, message, message_size, try_again,
20✔
4670
                                             session_ident); // Throws
20✔
4671

10✔
4672
    auto handler = [this]() {
20✔
4673
        handle_write_error(); // Throws
20✔
4674
    };
20✔
4675
    m_websocket.async_write_binary(out.data(), out.size(), std::move(handler));
20✔
4676
    m_is_sending = true;
20✔
4677
}
20✔
4678

4679

4680
void SyncConnection::handle_write_error()
4681
{
20✔
4682
    m_is_sending = false;
20✔
4683
    REALM_ASSERT(m_is_closing);
20✔
4684
    if (!m_ssl_stream) {
20✔
4685
        std::error_code ec;
20✔
4686
        m_socket->shutdown(network::Socket::shutdown_send, ec);
20✔
4687
        if (ec && ec != make_basic_system_error_code(ENOTCONN))
20!
4688
            throw std::system_error(ec);
×
4689
    }
20✔
4690
}
20✔
4691

4692

4693
// For connection level errors, `sess` is ignored. For session level errors, a
4694
// session must be specified.
4695
//
4696
// If a session is specified, that session object will have been detached from
4697
// the ServerFile object (and possibly destroyed) upon return from
4698
// protocol_error().
4699
//
4700
// If a session is specified for a protocol level error, that session object
4701
// will have been destroyed upon return from protocol_error(). For session level
4702
// errors, the specified session will have been destroyed upon return from
4703
// protocol_error() if, and only if the negotiated protocol version is less than
4704
// 23.
4705
void SyncConnection::protocol_error(ProtocolError error_code, Session* sess)
4706
{
78✔
4707
    REALM_ASSERT(!m_is_closing);
78✔
4708
    bool session_level = is_session_level_error(error_code);
78✔
4709
    REALM_ASSERT(!session_level || sess);
78✔
4710
    REALM_ASSERT(!sess || m_sessions.count(sess->get_session_ident()) == 1);
78✔
4711
    if (logger.would_log(util::Logger::Level::debug)) {
78✔
4712
        const char* message = get_protocol_error_message(int(error_code));
×
4713
        Logger& logger_2 = (session_level ? sess->logger : logger);
×
4714
        logger_2.debug("Protocol error: %1 (error_code=%2)", message, int(error_code)); // Throws
×
4715
    }
×
4716
    session_ident_type session_ident = (session_level ? sess->get_session_ident() : 0);
78✔
4717
    if (session_level) {
78✔
4718
        sess->initiate_deactivation(error_code); // Throws
78✔
4719
        return;
78✔
4720
    }
78✔
4721
    do_initiate_soft_close(error_code, session_ident); // Throws
×
4722
}
×
4723

4724

4725
void SyncConnection::do_initiate_soft_close(ProtocolError error_code, session_ident_type session_ident)
4726
{
20✔
4727
    REALM_ASSERT(get_protocol_error_message(int(error_code)));
20✔
4728

10✔
4729
    // With recent versions of the protocol (when the version is greater than,
10✔
4730
    // or equal to 23), this function will only be called for connection level
10✔
4731
    // errors, never for session specific errors. However, for the purpose of
10✔
4732
    // emulating earlier protocol versions, this function might be called for
10✔
4733
    // session specific errors too.
10✔
4734
    REALM_ASSERT(is_session_level_error(error_code) == (session_ident != 0));
20✔
4735
    REALM_ASSERT(!is_session_level_error(error_code));
20✔
4736

10✔
4737
    REALM_ASSERT(m_send_trigger);
20✔
4738
    REALM_ASSERT(!m_is_closing);
20✔
4739
    m_is_closing = true;
20✔
4740

10✔
4741
    m_error_code = error_code;
20✔
4742
    m_error_session_ident = session_ident;
20✔
4743

10✔
4744
    // Don't waste time and effort sending any other messages
10✔
4745
    m_send_pong = false;
20✔
4746
    m_sessions_enlisted_to_send.clear();
20✔
4747

10✔
4748
    m_receiving_session = nullptr;
20✔
4749

10✔
4750
    terminate_sessions(); // Throws
20✔
4751

10✔
4752
    m_send_trigger->trigger();
20✔
4753
}
20✔
4754

4755

4756
void SyncConnection::close_due_to_close_by_client(std::error_code ec)
4757
{
694✔
4758
    auto log_level = (ec == util::MiscExtErrors::end_of_input ? Logger::Level::detail : Logger::Level::info);
626✔
4759
    // Suicide
400✔
4760
    terminate(log_level, "Sync connection closed by client: %1", ec.message()); // Throws
694✔
4761
}
694✔
4762

4763

4764
void SyncConnection::close_due_to_error(std::error_code ec)
4765
{
412✔
4766
    // Suicide
190✔
4767
    terminate(Logger::Level::error, "Sync connection closed due to error: %1",
412✔
4768
              ec.message()); // Throws
412✔
4769
}
412✔
4770

4771

4772
void SyncConnection::terminate_sessions()
4773
{
1,130✔
4774
    for (auto& entry : m_sessions) {
1,762✔
4775
        Session& sess = *entry.second;
1,762✔
4776
        sess.terminate(); // Throws
1,762✔
4777
    }
1,762✔
4778
    m_sessions_enlisted_to_send.clear();
1,130✔
4779
    m_sessions.clear();
1,130✔
4780
}
1,130✔
4781

4782

4783
void SyncConnection::initiate_soft_close()
4784
{
20✔
4785
    if (!m_is_closing) {
20✔
4786
        session_ident_type session_ident = 0;                                    // Not session specific
20✔
4787
        do_initiate_soft_close(ProtocolError::connection_closed, session_ident); // Throws
20✔
4788
    }
20✔
4789
}
20✔
4790

4791

4792
void SyncConnection::discard_session(session_ident_type session_ident) noexcept
4793
{
2,672✔
4794
    m_sessions.erase(session_ident);
2,672✔
4795
}
2,672✔
4796

4797
} // anonymous namespace
4798

4799

4800
// ============================ sync::Server implementation ============================
4801

4802
class Server::Implementation : public ServerImpl {
4803
public:
4804
    Implementation(const std::string& root_dir, util::Optional<PKey> pkey, Server::Config config)
4805
        : ServerImpl{root_dir, std::move(pkey), std::move(config)} // Throws
4806
    {
7,968✔
4807
    }
7,968✔
4808
    virtual ~Implementation() {}
7,968✔
4809
};
4810

4811

4812
Server::Server(const std::string& root_dir, util::Optional<sync::PKey> pkey, Config config)
4813
    : m_impl{new Implementation{root_dir, std::move(pkey), std::move(config)}} // Throws
4814
{
7,966✔
4815
}
7,966✔
4816

4817

4818
Server::Server(Server&& serv) noexcept
4819
    : m_impl{std::move(serv.m_impl)}
4820
{
×
4821
}
×
4822

4823

4824
Server::~Server() noexcept {}
7,968✔
4825

4826

4827
void Server::start()
4828
{
7,296✔
4829
    m_impl->start(); // Throws
7,296✔
4830
}
7,296✔
4831

4832

4833
void Server::start(const std::string& listen_address, const std::string& listen_port, bool reuse_address)
4834
{
672✔
4835
    m_impl->start(listen_address, listen_port, reuse_address); // Throws
672✔
4836
}
672✔
4837

4838

4839
network::Endpoint Server::listen_endpoint() const
4840
{
7,974✔
4841
    return m_impl->listen_endpoint(); // Throws
7,974✔
4842
}
7,974✔
4843

4844

4845
void Server::run()
4846
{
7,916✔
4847
    m_impl->run(); // Throws
7,916✔
4848
}
7,916✔
4849

4850

4851
void Server::stop() noexcept
4852
{
8,810✔
4853
    m_impl->stop();
8,810✔
4854
}
8,810✔
4855

4856

4857
uint_fast64_t Server::errors_seen() const noexcept
4858
{
672✔
4859
    return m_impl->errors_seen;
672✔
4860
}
672✔
4861

4862

4863
void Server::stop_sync_and_wait_for_backup_completion(util::UniqueFunction<void(bool did_backup)> completion_handler,
4864
                                                      milliseconds_type timeout)
4865
{
×
4866
    m_impl->stop_sync_and_wait_for_backup_completion(std::move(completion_handler), timeout); // Throws
×
4867
}
×
4868

4869

4870
void Server::set_connection_reaper_timeout(milliseconds_type timeout)
4871
{
4✔
4872
    m_impl->set_connection_reaper_timeout(timeout);
4✔
4873
}
4✔
4874

4875

4876
void Server::close_connections()
4877
{
20✔
4878
    m_impl->close_connections();
20✔
4879
}
20✔
4880

4881

4882
bool Server::map_virtual_to_real_path(const std::string& virt_path, std::string& real_path)
4883
{
72✔
4884
    return m_impl->map_virtual_to_real_path(virt_path, real_path); // Throws
72✔
4885
}
72✔
4886

4887

4888
void Server::recognize_external_change(const std::string& virt_path)
4889
{
4,800✔
4890
    m_impl->recognize_external_change(virt_path); // Throws
4,800✔
4891
}
4,800✔
4892

4893

4894
void Server::get_workunit_timers(milliseconds_type& parallel_section, milliseconds_type& sequential_section)
4895
{
×
4896
    m_impl->get_workunit_timers(parallel_section, sequential_section);
×
4897
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc