• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2031

12 Feb 2024 07:38PM UTC coverage: 91.844% (+0.006%) from 91.838%
2031

push

Evergreen

web-flow
Make `SyncSession::get_file_ident()` public (#7203)

93014 of 171494 branches covered (54.24%)

23 of 28 new or added lines in 4 files covered. (82.14%)

51 existing lines in 12 files now uncovered.

235322 of 256218 relevant lines covered (91.84%)

6255148.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.1
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    void update_subscription_version_info();
127

128
    std::string get_appservices_connection_id();
129

130
private:
131
    ClientImpl& m_client;
132
    DBRef m_db;
133
    Replication* m_replication;
134

135
    const ProtocolEnvelope m_protocol_envelope;
136
    const std::string m_server_address;
137
    const port_type m_server_port;
138
    const std::string m_user_id;
139
    const SyncServerMode m_sync_mode;
140
    const std::string m_authorization_header_name;
141
    const std::map<std::string, std::string> m_custom_http_headers;
142
    const bool m_verify_servers_ssl_certificate;
143
    const bool m_simulate_integration_error;
144
    const Optional<std::string> m_ssl_trust_certificate_path;
145
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
146
    const size_t m_flx_bootstrap_batch_size_bytes;
147

148
    // This one is different from null when, and only when the session wrapper
149
    // is in ClientImpl::m_abandoned_session_wrappers.
150
    SessionWrapper* m_next = nullptr;
151

152
    // After initiation, these may only be accessed by the event loop thread.
153
    std::string m_http_request_path_prefix;
154
    std::string m_virt_path;
155
    std::string m_signed_access_token;
156

157
    util::Optional<ClientReset> m_client_reset_config;
158

159
    util::Optional<ProxyConfig> m_proxy_config;
160

161
    uint_fast64_t m_last_reported_uploadable_bytes = 0;
162
    util::UniqueFunction<ProgressHandler> m_progress_handler;
163
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
164

165
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
166
    bool m_in_debug_hook = false;
167

168
    SessionReason m_session_reason;
169

170
    const uint64_t m_schema_version;
171

172
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
173
    int64_t m_flx_active_version = 0;
174
    int64_t m_flx_last_seen_version = 0;
175
    int64_t m_flx_pending_mark_version = 0;
176
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
177

178
    std::shared_ptr<MigrationStore> m_migration_store;
179

180
    bool m_initiated = false;
181

182
    // Set to true when this session wrapper is actualized (or when it is
183
    // finalized before proper actualization). It is then never modified again.
184
    //
185
    // A session specific post handler submitted after the initiation of the
186
    // session wrapper (initiate()) will always find that `m_actualized` is
187
    // true. This is the case, because the scheduling of such a post handler
188
    // will have been preceded by the triggering of
189
    // `ClientImpl::m_actualize_and_finalize` (in
190
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
191
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
192
    // before the post handler. If the session wrapper is no longer in
193
    // `ClientImpl::m_unactualized_session_wrappers` when
194
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
195
    // have been abandoned already, but in that case,
196
    // finalize_before_actualization() has already been called.
197
    bool m_actualized = false;
198

199
    bool m_force_closed = false;
200

201
    bool m_suspended = false;
202

203
    // Has the SessionWrapper been finalized?
204
    bool m_finalized = false;
205

206
    // Set to true when the first DOWNLOAD message is received to indicate that
207
    // the byte-level download progress parameters can be considered reasonable
208
    // reliable. Before that, a lot of time may have passed, so our record of
209
    // the download progress is likely completely out of date.
210
    bool m_reliable_download_progress = false;
211

212
    // Set to point to an activated session object during actualization of the
213
    // session wrapper. Set to null during finalization of the session
214
    // wrapper. Both modifications are guaranteed to be performed by the event
215
    // loop thread.
216
    //
217
    // If a session specific post handler, that is submitted after the
218
    // initiation of the session wrapper, sees that `m_sess` is null, it can
219
    // conclude that the session wrapper has been both abandoned and
220
    // finalized. This is true, because the scheduling of such a post handler
221
    // will have been preceded by the triggering of
222
    // `ClientImpl::m_actualize_and_finalize` (in
223
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
224
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
225
    // before the post handler, so the session wrapper must have been actualized
226
    // unless it was already abandoned by the application. If it was abandoned
227
    // before it was actualized, it will already have been finalized by
228
    // finalize_before_actualization().
229
    //
230
    // Must only be accessed from the event loop thread.
231
    SessionImpl* m_sess = nullptr;
232

233
    // These must only be accessed from the event loop thread.
234
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
235
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
236
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
237

238
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
239
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
240
    // event loop thread.
241
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
242
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
243
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
244

245
    void on_sync_progress();
246
    void on_upload_completion();
247
    void on_download_completion();
248
    void on_suspended(const SessionErrorInfo& error_info);
249
    void on_resumed();
250
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
251
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
252
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
253
    void on_flx_sync_version_complete(int64_t version);
254

255
    void report_progress(bool only_if_new_uploadable_data = false);
256

257
    friend class SessionWrapperStack;
258
    friend class ClientImpl::Session;
259
};
260

261

262
// ################ SessionWrapperStack ################
263

264
inline bool SessionWrapperStack::empty() const noexcept
265
{
×
266
    return !m_back;
×
267
}
×
268

269

270
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
271
{
10,050✔
272
    REALM_ASSERT(!w->m_next);
10,050✔
273
    w->m_next = m_back;
10,050✔
274
    m_back = w.release();
10,050✔
275
}
10,050✔
276

277

278
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
279
{
25,850✔
280
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
25,850✔
281
    if (m_back) {
25,850✔
282
        m_back = m_back->m_next;
10,048✔
283
        w->m_next = nullptr;
10,048✔
284
    }
10,048✔
285
    return w;
25,850✔
286
}
25,850✔
287

288

289
inline void SessionWrapperStack::clear() noexcept
290
{
25,414✔
291
    while (m_back) {
25,414✔
292
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
293
        m_back = w->m_next;
×
294
    }
×
295
}
25,414✔
296

297

298
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
299
    : m_back{q.m_back}
300
{
301
    q.m_back = nullptr;
302
}
303

304

305
SessionWrapperStack::~SessionWrapperStack()
306
{
25,416✔
307
    clear();
25,416✔
308
}
25,416✔
309

310

311
// ################ ClientImpl ################
312

313

314
ClientImpl::~ClientImpl()
315
{
9,614✔
316
    // Since no other thread is allowed to be accessing this client or any of
4,738✔
317
    // its subobjects at this time, no mutex locking is necessary.
4,738✔
318

4,738✔
319
    shutdown_and_wait();
9,614✔
320
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,738✔
321
    // are abandoned.
4,738✔
322
    REALM_ASSERT(m_stopped);
9,614✔
323
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,614✔
324
}
9,614✔
325

326

327
void ClientImpl::cancel_reconnect_delay()
328
{
1,632✔
329
    // Thread safety required
838✔
330
    post([this](Status status) {
1,632✔
331
        if (status == ErrorCodes::OperationAborted)
1,632✔
332
            return;
×
333
        else if (!status.is_ok())
1,632✔
334
            throw Exception(status);
×
335

838✔
336
        for (auto& p : m_server_slots) {
1,632✔
337
            ServerSlot& slot = p.second;
1,632✔
338
            if (m_one_connection_per_session) {
1,632✔
339
                REALM_ASSERT(!slot.connection);
×
340
                for (const auto& p : slot.alt_connections) {
×
341
                    ClientImpl::Connection& conn = *p.second;
×
342
                    conn.resume_active_sessions(); // Throws
×
343
                    conn.cancel_reconnect_delay(); // Throws
×
344
                }
×
345
            }
×
346
            else {
1,632✔
347
                REALM_ASSERT(slot.alt_connections.empty());
1,632✔
348
                if (slot.connection) {
1,632✔
349
                    ClientImpl::Connection& conn = *slot.connection;
1,628✔
350
                    conn.resume_active_sessions(); // Throws
1,628✔
351
                    conn.cancel_reconnect_delay(); // Throws
1,628✔
352
                }
1,628✔
353
                else {
4✔
354
                    slot.reconnect_info.reset();
4✔
355
                }
4✔
356
            }
1,632✔
357
        }
1,632✔
358
    }); // Throws
1,632✔
359
}
1,632✔
360

361

362
void ClientImpl::voluntary_disconnect_all_connections()
363
{
12✔
364
    auto done_pf = util::make_promise_future<void>();
12✔
365
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
366
        if (status == ErrorCodes::OperationAborted) {
12✔
367
            return;
×
368
        }
×
369

6✔
370
        REALM_ASSERT(status.is_ok());
12✔
371

6✔
372
        try {
12✔
373
            for (auto& p : m_server_slots) {
12✔
374
                ServerSlot& slot = p.second;
12✔
375
                if (m_one_connection_per_session) {
12✔
376
                    REALM_ASSERT(!slot.connection);
×
377
                    for (const auto& p : slot.alt_connections) {
×
378
                        ClientImpl::Connection& conn = *p.second;
×
379
                        if (conn.get_state() == ConnectionState::disconnected) {
×
380
                            continue;
×
381
                        }
×
382
                        conn.voluntary_disconnect();
×
383
                    }
×
384
                }
×
385
                else {
12✔
386
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
387
                    if (!slot.connection) {
12✔
388
                        continue;
×
389
                    }
×
390
                    ClientImpl::Connection& conn = *slot.connection;
12✔
391
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
392
                        continue;
×
393
                    }
×
394
                    conn.voluntary_disconnect();
12✔
395
                }
12✔
396
            }
12✔
397
        }
12✔
398
        catch (...) {
6✔
399
            promise.set_error(exception_to_status());
×
400
            return;
×
401
        }
×
402
        promise.emplace_value();
12✔
403
    });
12✔
404
    done_pf.future.get();
12✔
405
}
12✔
406

407

408
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
409
{
396✔
410
    // Thread safety required
48✔
411

48✔
412
    {
396✔
413
        std::lock_guard lock{m_mutex};
396✔
414
        m_sessions_terminated = false;
396✔
415
    }
396✔
416

48✔
417
    // The technique employed here relies on the fact that
48✔
418
    // actualize_and_finalize_session_wrappers() must get to execute at least
48✔
419
    // once before the post handler submitted below gets to execute, but still
48✔
420
    // at a time where all session wrappers, that are abandoned prior to the
48✔
421
    // execution of wait_for_session_terminations_or_client_stopped(), have been
48✔
422
    // added to `m_abandoned_session_wrappers`.
48✔
423
    //
48✔
424
    // To see that this is the case, consider a session wrapper that was
48✔
425
    // abandoned before wait_for_session_terminations_or_client_stopped() was
48✔
426
    // invoked. Then the session wrapper will have been added to
48✔
427
    // `m_abandoned_session_wrappers`, and an invocation of
48✔
428
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
48✔
429
    // guarantees mentioned in the documentation of Trigger then ensure
48✔
430
    // that at least one execution of actualize_and_finalize_session_wrappers()
48✔
431
    // will happen after the session wrapper has been added to
48✔
432
    // `m_abandoned_session_wrappers`, but before the post handler submitted
48✔
433
    // below gets to execute.
48✔
434
    post([this](Status status) mutable {
396✔
435
        if (status == ErrorCodes::OperationAborted)
396✔
436
            return;
×
437
        else if (!status.is_ok())
396✔
438
            throw Exception(status);
×
439

48✔
440
        std::lock_guard lock{m_mutex};
396✔
441
        m_sessions_terminated = true;
396✔
442
        m_wait_or_client_stopped_cond.notify_all();
396✔
443
    }); // Throws
396✔
444

48✔
445
    bool completion_condition_was_satisfied;
396✔
446
    {
396✔
447
        std::unique_lock lock{m_mutex};
396✔
448
        while (!m_sessions_terminated && !m_stopped)
764✔
449
            m_wait_or_client_stopped_cond.wait(lock);
368✔
450
        completion_condition_was_satisfied = !m_stopped;
396✔
451
    }
396✔
452
    return completion_condition_was_satisfied;
396✔
453
}
396✔
454

455

456
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
457
util::Future<void> ClientImpl::notify_session_terminated()
458
{
44✔
459
    auto pf = util::make_promise_future<void>();
44✔
460
    post([promise = std::move(pf.promise)](Status status) mutable {
44✔
461
        // Includes operation_aborted
22✔
462
        if (!status.is_ok()) {
44✔
463
            promise.set_error(status);
×
464
            return;
×
465
        }
×
466

22✔
467
        promise.emplace_value();
44✔
468
    });
44✔
469

22✔
470
    return std::move(pf.future);
44✔
471
}
44✔
472

473
void ClientImpl::drain_connections_on_loop()
474
{
9,614✔
475
    post([this](Status status) mutable {
9,612✔
476
        REALM_ASSERT(status.is_ok());
9,612✔
477
        drain_connections();
9,612✔
478
    });
9,612✔
479
}
9,614✔
480

481
void ClientImpl::shutdown_and_wait()
482
{
10,374✔
483
    shutdown();
10,374✔
484
    std::unique_lock lock{m_drain_mutex};
10,374✔
485
    if (m_drained) {
10,374✔
486
        return;
756✔
487
    }
756✔
488

4,740✔
489
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,618✔
490
    m_drain_cv.wait(lock, [&] {
21,440✔
491
        return m_num_connections == 0 && m_outstanding_posts == 0;
21,440✔
492
    });
21,440✔
493

4,740✔
494
    m_drained = true;
9,618✔
495
}
9,618✔
496

497
void ClientImpl::shutdown() noexcept
498
{
20,066✔
499
    {
20,066✔
500
        std::lock_guard lock{m_mutex};
20,066✔
501
        if (m_stopped)
20,066✔
502
            return;
10,452✔
503
        m_stopped = true;
9,614✔
504
        m_wait_or_client_stopped_cond.notify_all();
9,614✔
505
    }
9,614✔
506

4,738✔
507
    drain_connections_on_loop();
9,614✔
508
}
9,614✔
509

510

511
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
512
{
10,228✔
513
    // Thread safety required.
4,942✔
514

4,942✔
515
    std::lock_guard lock{m_mutex};
10,228✔
516
    REALM_ASSERT(m_actualize_and_finalize);
10,228✔
517
    m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
10,228✔
518
    bool retrigger = !m_actualize_and_finalize_needed;
10,228✔
519
    m_actualize_and_finalize_needed = true;
10,228✔
520
    // The conditional triggering needs to happen before releasing the mutex,
4,942✔
521
    // because if two threads call register_unactualized_session_wrapper()
4,942✔
522
    // roughly concurrently, then only the first one is guaranteed to be asked
4,942✔
523
    // to retrigger, but that retriggering must have happened before the other
4,942✔
524
    // thread returns from register_unactualized_session_wrapper().
4,942✔
525
    //
4,942✔
526
    // Note that a similar argument applies when two threads call
4,942✔
527
    // register_abandoned_session_wrapper(), and when one thread calls one of
4,942✔
528
    // them and another thread call the other.
4,942✔
529
    if (retrigger)
10,228✔
530
        m_actualize_and_finalize->trigger();
6,524✔
531
}
10,228✔
532

533

534
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
535
{
10,228✔
536
    // Thread safety required.
4,942✔
537

4,942✔
538
    std::lock_guard lock{m_mutex};
10,228✔
539
    REALM_ASSERT(m_actualize_and_finalize);
10,228✔
540

4,942✔
541
    // If the session wrapper has not yet been actualized (on the event loop
4,942✔
542
    // thread), it can be immediately finalized. This ensures that we will
4,942✔
543
    // generally not actualize a session wrapper that has already been
4,942✔
544
    // abandoned.
4,942✔
545
    auto i = m_unactualized_session_wrappers.find(wrapper.get());
10,228✔
546
    if (i != m_unactualized_session_wrappers.end()) {
10,228✔
547
        m_unactualized_session_wrappers.erase(i);
178✔
548
        wrapper->finalize_before_actualization();
178✔
549
        return;
178✔
550
    }
178✔
551
    m_abandoned_session_wrappers.push(std::move(wrapper));
10,050✔
552
    bool retrigger = !m_actualize_and_finalize_needed;
10,050✔
553
    m_actualize_and_finalize_needed = true;
10,050✔
554
    // The conditional triggering needs to happen before releasing the
4,850✔
555
    // mutex. See implementation of register_unactualized_session_wrapper() for
4,850✔
556
    // details.
4,850✔
557
    if (retrigger)
10,050✔
558
        m_actualize_and_finalize->trigger();
9,280✔
559
}
10,050✔
560

561

562
// Must be called from the event loop thread.
563
void ClientImpl::actualize_and_finalize_session_wrappers()
564
{
15,804✔
565
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
15,804✔
566
    SessionWrapperStack abandoned_session_wrappers;
15,804✔
567
    bool stopped;
15,804✔
568
    {
15,804✔
569
        std::lock_guard lock{m_mutex};
15,804✔
570
        m_actualize_and_finalize_needed = false;
15,804✔
571
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
15,804✔
572
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
15,804✔
573
        stopped = m_stopped;
15,804✔
574
    }
15,804✔
575
    // Note, we need to finalize old session wrappers before we actualize new
7,454✔
576
    // ones. This ensures that deactivation of old sessions is initiated before
7,454✔
577
    // new session are activated. This, in turn, ensures that the server does
7,454✔
578
    // not see two overlapping sessions for the same local Realm file.
7,454✔
579
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
25,854✔
580
        wrapper->finalize(); // Throws
10,050✔
581
    if (stopped) {
15,804✔
582
        for (auto& p : unactualized_session_wrappers) {
408✔
583
            SessionWrapper& wrapper = *p.first;
4✔
584
            wrapper.finalize_before_actualization();
4✔
585
        }
4✔
586
        return;
772✔
587
    }
772✔
588
    for (auto& p : unactualized_session_wrappers) {
15,032✔
589
        SessionWrapper& wrapper = *p.first;
10,044✔
590
        ServerEndpoint server_endpoint = std::move(p.second);
10,044✔
591
        wrapper.actualize(std::move(server_endpoint)); // Throws
10,044✔
592
    }
10,044✔
593
}
15,032✔
594

595

596
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
597
                                                   const std::string& authorization_header_name,
598
                                                   const std::map<std::string, std::string>& custom_http_headers,
599
                                                   bool verify_servers_ssl_certificate,
600
                                                   Optional<std::string> ssl_trust_certificate_path,
601
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
602
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
603
{
10,042✔
604
    auto&& [server_slot_it, inserted] =
10,042✔
605
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,042✔
606
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,042✔
607

4,846✔
608
    // TODO: enable multiplexing with proxies
4,846✔
609
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,042✔
610
        // Use preexisting connection
3,590✔
611
        REALM_ASSERT(server_slot.alt_connections.empty());
7,390✔
612
        return *server_slot.connection;
7,390✔
613
    }
7,390✔
614

1,256✔
615
    // Create a new connection
1,256✔
616
    REALM_ASSERT(!server_slot.connection);
2,652✔
617
    connection_ident_type ident = m_prev_connection_ident + 1;
2,652✔
618
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,652✔
619
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,652✔
620
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,652✔
621
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,652✔
622
    ClientImpl::Connection& conn = *conn_2;
2,652✔
623
    if (!m_one_connection_per_session) {
2,652✔
624
        server_slot.connection = std::move(conn_2);
2,632✔
625
    }
2,632✔
626
    else {
20✔
627
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
20✔
628
    }
20✔
629
    m_prev_connection_ident = ident;
2,652✔
630
    was_created = true;
2,652✔
631
    {
2,652✔
632
        std::lock_guard lk(m_drain_mutex);
2,652✔
633
        ++m_num_connections;
2,652✔
634
    }
2,652✔
635
    return conn;
2,652✔
636
}
2,652✔
637

638

639
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
640
{
2,652✔
641
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,652✔
642
    auto i = m_server_slots.find(endpoint);
2,652✔
643
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,652✔
644
    ServerSlot& server_slot = i->second;
2,652✔
645
    if (!m_one_connection_per_session) {
2,652✔
646
        REALM_ASSERT(server_slot.alt_connections.empty());
2,638✔
647
        REALM_ASSERT(&*server_slot.connection == &conn);
2,638✔
648
        server_slot.reconnect_info = conn.get_reconnect_info();
2,638✔
649
        server_slot.connection.reset();
2,638✔
650
    }
2,638✔
651
    else {
14✔
652
        REALM_ASSERT(!server_slot.connection);
14✔
653
        connection_ident_type ident = conn.get_ident();
14✔
654
        auto j = server_slot.alt_connections.find(ident);
14✔
655
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
14✔
656
        REALM_ASSERT(&*j->second == &conn);
14✔
657
        server_slot.alt_connections.erase(j);
14✔
658
    }
14✔
659

1,256✔
660
    {
2,652✔
661
        std::lock_guard lk(m_drain_mutex);
2,652✔
662
        REALM_ASSERT(m_num_connections);
2,652✔
663
        --m_num_connections;
2,652✔
664
        m_drain_cv.notify_all();
2,652✔
665
    }
2,652✔
666
}
2,652✔
667

668

669
// ################ SessionImpl ################
670

671
void SessionImpl::force_close()
672
{
140✔
673
    // Allow force_close() if session is active or hasn't been activated yet.
72✔
674
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
140!
675
        m_wrapper.force_close();
140✔
676
    }
140✔
677
}
140✔
678

679
void SessionImpl::on_connection_state_changed(ConnectionState state,
680
                                              const util::Optional<SessionErrorInfo>& error_info)
681
{
10,660✔
682
    // Only used to report errors back to the SyncSession while the Session is active
5,264✔
683
    if (m_state == SessionImpl::Active) {
10,664✔
684
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
10,664✔
685
    }
10,664✔
686
}
10,660✔
687

688

689
const std::string& SessionImpl::get_virt_path() const noexcept
690
{
8,294✔
691
    // Can only be called if the session is active or being activated
3,380✔
692
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
8,294!
693
    return m_wrapper.m_virt_path;
8,294✔
694
}
8,294✔
695

696
const std::string& SessionImpl::get_realm_path() const noexcept
697
{
10,326✔
698
    // Can only be called if the session is active or being activated
4,988✔
699
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,326✔
700
    return m_wrapper.m_db->get_path();
10,326✔
701
}
10,326✔
702

703
DBRef SessionImpl::get_db() const noexcept
704
{
22,562✔
705
    // Can only be called if the session is active or being activated
11,880✔
706
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
22,562!
707
    return m_wrapper.m_db;
22,562✔
708
}
22,562✔
709

710
ClientReplication& SessionImpl::get_repl() const noexcept
711
{
115,298✔
712
    // Can only be called if the session is active or being activated
57,534✔
713
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
115,298✔
714
    return m_wrapper.get_replication();
115,298✔
715
}
115,298✔
716

717
ClientHistory& SessionImpl::get_history() const noexcept
718
{
113,522✔
719
    return get_repl().get_history();
113,522✔
720
}
113,522✔
721

722
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
723
{
13,496✔
724
    // Can only be called if the session is active or being activated
6,492✔
725
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
13,496✔
726
    return m_wrapper.m_client_reset_config;
13,496✔
727
}
13,496✔
728

729
SessionReason SessionImpl::get_session_reason() noexcept
730
{
1,356✔
731
    // Can only be called if the session is active or being activated
680✔
732
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,356!
733
    return m_wrapper.m_session_reason;
1,356✔
734
}
1,356✔
735

736
uint64_t SessionImpl::get_schema_version() noexcept
737
{
1,356✔
738
    // Can only be called if the session is active or being activated
680✔
739
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,356!
740
    return m_wrapper.m_schema_version;
1,356✔
741
}
1,356✔
742

743
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
744
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
745
{
43,104✔
746
    // Ignore the call if the session is not active
21,734✔
747
    if (m_state != State::Active) {
43,104✔
748
        return;
×
749
    }
×
750

21,734✔
751
    try {
43,104✔
752
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
43,104!
753
        if (simulate_integration_error) {
43,104✔
754
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
755
        }
×
756
        version_type client_version;
43,104✔
757
        if (REALM_LIKELY(!get_client().is_dry_run())) {
43,104✔
758
            VersionInfo version_info;
43,104✔
759
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
43,104✔
760
            client_version = version_info.realm_version;
43,104✔
761
        }
43,104✔
UNCOV
762
        else {
×
763
            // Fake it for "dry run" mode
UNCOV
764
            client_version = m_last_version_available + 1;
×
UNCOV
765
        }
×
766
        on_changesets_integrated(client_version, progress); // Throws
43,104✔
767
    }
43,104✔
768
    catch (const IntegrationException& e) {
21,746✔
769
        on_integration_failure(e);
24✔
770
    }
24✔
771
    m_wrapper.on_sync_progress(); // Throws
43,106✔
772
}
43,106✔
773

774

775
void SessionImpl::on_upload_completion()
776
{
14,818✔
777
    // Ignore the call if the session is not active
7,328✔
778
    if (m_state == State::Active) {
14,818✔
779
        m_wrapper.on_upload_completion(); // Throws
14,818✔
780
    }
14,818✔
781
}
14,818✔
782

783

784
void SessionImpl::on_download_completion()
785
{
17,190✔
786
    // Ignore the call if the session is not active
7,864✔
787
    if (m_state == State::Active) {
17,190✔
788
        m_wrapper.on_download_completion(); // Throws
17,190✔
789
    }
17,190✔
790
}
17,190✔
791

792

793
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
794
{
678✔
795
    // Ignore the call if the session is not active
334✔
796
    if (m_state == State::Active) {
678✔
797
        m_wrapper.on_suspended(error_info); // Throws
678✔
798
    }
678✔
799
}
678✔
800

801

802
void SessionImpl::on_resumed()
803
{
70✔
804
    // Ignore the call if the session is not active
34✔
805
    if (m_state == State::Active) {
70✔
806
        m_wrapper.on_resumed(); // Throws
70✔
807
    }
70✔
808
}
70✔
809

810
void SessionImpl::handle_pending_client_reset_acknowledgement()
811
{
302✔
812
    // Ignore the call if the session is not active
150✔
813
    if (m_state == State::Active) {
302✔
814
        m_wrapper.handle_pending_client_reset_acknowledgement();
302✔
815
    }
302✔
816
}
302✔
817

818
void SessionImpl::update_subscription_version_info()
819
{
284✔
820
    // Ignore the call if the session is not active
142✔
821
    if (m_state == State::Active) {
284✔
822
        m_wrapper.update_subscription_version_info();
284✔
823
    }
284✔
824
}
284✔
825

826
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
827
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
828
{
45,052✔
829
    // Ignore the call if the session is not active
22,710✔
830
    if (m_state != State::Active) {
45,052✔
831
        return false;
×
832
    }
×
833

22,710✔
834
    if (is_steady_state_download_message(batch_state, query_version)) {
45,052✔
835
        return false;
43,102✔
836
    }
43,102✔
837

976✔
838
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,950✔
839
    util::Optional<SyncProgress> maybe_progress;
1,950✔
840
    if (batch_state == DownloadBatchState::LastInBatch) {
1,950✔
841
        maybe_progress = progress;
1,768✔
842
    }
1,768✔
843

976✔
844
    bool new_batch = false;
1,950✔
845
    try {
1,950✔
846
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,950✔
847
    }
1,950✔
848
    catch (const LogicError& ex) {
976✔
849
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
850
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
851
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
852
                                    ProtocolError::bad_changeset_size);
×
853
            on_integration_failure(ex);
×
854
            return true;
×
855
        }
×
856
        throw;
×
857
    }
×
858

974✔
859
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
974✔
860
    // bootstrapping.
974✔
861
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,948✔
862
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
44✔
863
    }
44✔
864

974✔
865
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
1,948✔
866
                                       batch_state, received_changesets.size());
1,948✔
867
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,948✔
868
        return true;
12✔
869
    }
12✔
870
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,936✔
871

968✔
872
    if (batch_state == DownloadBatchState::MoreToCome) {
1,936✔
873
        return true;
176✔
874
    }
176✔
875

880✔
876
    try {
1,760✔
877
        process_pending_flx_bootstrap();
1,760✔
878
    }
1,760✔
879
    catch (const IntegrationException& e) {
890✔
880
        on_integration_failure(e);
20✔
881
    }
20✔
882
    catch (...) {
880✔
883
        on_integration_failure(IntegrationException(exception_to_status()));
×
884
    }
×
885

880✔
886
    return true;
1,760✔
887
}
1,760✔
888

889

890
void SessionImpl::process_pending_flx_bootstrap()
891
{
11,798✔
892
    // Ignore the call if not a flx session or session is not active
5,724✔
893
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,798✔
894
        return;
8,704✔
895
    }
8,704✔
896
    // Should never be called if session is not active
1,548✔
897
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,094✔
898
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,094✔
899
    if (!bootstrap_store->has_pending()) {
3,094✔
900
        return;
1,314✔
901
    }
1,314✔
902

890✔
903
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,780✔
904
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,780✔
905
                "changeset size: %3)",
1,780✔
906
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,780✔
907
                pending_batch_stats.pending_changeset_bytes);
1,780✔
908
    auto& history = get_repl().get_history();
1,780✔
909
    VersionInfo new_version;
1,780✔
910
    SyncProgress progress;
1,780✔
911
    int64_t query_version = -1;
1,780✔
912
    size_t changesets_processed = 0;
1,780✔
913

890✔
914
    // Used to commit each batch after it was transformed.
890✔
915
    TransactionRef transact = get_db()->start_write();
1,780✔
916
    while (bootstrap_store->has_pending()) {
3,700✔
917
        auto start_time = std::chrono::steady_clock::now();
1,932✔
918
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,932✔
919
        if (!pending_batch.progress) {
1,932✔
920
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
921
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
4✔
922
            // bootstrap store requires a write transaction itself.
4✔
923
            transact->close();
8✔
924
            bootstrap_store->clear();
8✔
925
            return;
8✔
926
        }
8✔
927

962✔
928
        auto batch_state =
1,924✔
929
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,844✔
930
        uint64_t downloadable_bytes = 0;
1,924✔
931
        query_version = pending_batch.query_version;
1,924✔
932
        bool simulate_integration_error =
1,924✔
933
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,924✔
934
        if (simulate_integration_error) {
1,924✔
935
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
936
        }
4✔
937

960✔
938
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
1,920✔
939
                        batch_state, pending_batch.changesets.size());
1,920✔
940

960✔
941
        history.integrate_server_changesets(
1,920✔
942
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
1,920✔
943
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
1,910✔
944
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,900✔
945
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,900✔
946
            });
1,900✔
947
        progress = *pending_batch.progress;
1,920✔
948
        changesets_processed += pending_batch.changesets.size();
1,920✔
949
        auto duration = std::chrono::steady_clock::now() - start_time;
1,920✔
950

960✔
951
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,920✔
952
                                      batch_state, pending_batch.changesets.size());
1,920✔
953
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,920✔
954

960✔
955
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,920✔
956
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,920✔
957
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,920✔
958
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,920✔
959
                    pending_batch.remaining_changesets);
1,920✔
960
    }
1,920✔
961
    on_changesets_integrated(new_version.realm_version, progress);
1,774✔
962

884✔
963
    REALM_ASSERT_3(query_version, !=, -1);
1,768✔
964
    m_wrapper.on_sync_progress();
1,768✔
965
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,768✔
966

884✔
967
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,768✔
968
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,768✔
969
    // NoAction/EarlyReturn are both valid no-op actions to take here.
884✔
970
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,768✔
971
}
1,768✔
972

973
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
974
{
20✔
975
    // Ignore the call if the session is not active
10✔
976
    if (m_state == State::Active) {
20✔
977
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
978
    }
20✔
979
}
20✔
980

981
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
982
{
1,792✔
983
    // Ignore the call if the session is not active
896✔
984
    if (m_state == State::Active) {
1,792✔
985
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,792✔
986
    }
1,792✔
987
}
1,792✔
988

989
SubscriptionStore* SessionImpl::get_flx_subscription_store()
990
{
15,926✔
991
    // Should never be called if session is not active
8,074✔
992
    REALM_ASSERT_EX(m_state == State::Active, m_state);
15,926✔
993
    return m_wrapper.get_flx_subscription_store();
15,926✔
994
}
15,926✔
995

996
MigrationStore* SessionImpl::get_migration_store()
997
{
62,322✔
998
    // Should never be called if session is not active
31,626✔
999
    REALM_ASSERT_EX(m_state == State::Active, m_state);
62,322✔
1000
    return m_wrapper.get_migration_store();
62,322✔
1001
}
62,322✔
1002

1003
void SessionImpl::on_flx_sync_version_complete(int64_t version)
1004
{
288✔
1005
    // Ignore the call if the session is not active
144✔
1006
    if (m_state == State::Active) {
288✔
1007
        m_wrapper.on_flx_sync_version_complete(version);
288✔
1008
    }
288✔
1009
}
288✔
1010

1011
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
1012
{
1,982✔
1013
    // Should never be called if session is not active
988✔
1014
    REALM_ASSERT_EX(m_state == State::Active, m_state);
1,982✔
1015

988✔
1016
    // Make sure we don't call the debug hook recursively.
988✔
1017
    if (m_wrapper.m_in_debug_hook) {
1,982✔
1018
        return SyncClientHookAction::NoAction;
24✔
1019
    }
24✔
1020
    m_wrapper.m_in_debug_hook = true;
1,958✔
1021
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
1,958✔
1022
        m_wrapper.m_in_debug_hook = false;
1,958✔
1023
    });
1,958✔
1024

976✔
1025
    auto action = m_wrapper.m_debug_hook(data);
1,958✔
1026
    switch (action) {
1,958✔
1027
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1028
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1029
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1030

6✔
1031
            auto err_processing_err = receive_error_message(err_info);
12✔
1032
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1033
            return SyncClientHookAction::EarlyReturn;
12✔
1034
        }
×
1035
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1036
            get_connection().voluntary_disconnect();
24✔
1037
            return SyncClientHookAction::EarlyReturn;
24✔
1038
        }
×
1039
        default:
1,914✔
1040
            return action;
1,914✔
1041
    }
1,958✔
1042
}
1,958✔
1043

1044
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1045
                                                  int64_t query_version, DownloadBatchState batch_state,
1046
                                                  size_t num_changesets)
1047
{
115,384✔
1048
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
115,384✔
1049
        return SyncClientHookAction::NoAction;
113,576✔
1050
    }
113,576✔
1051
    if (REALM_UNLIKELY(m_state != State::Active)) {
1,808✔
1052
        return SyncClientHookAction::NoAction;
×
1053
    }
×
1054

904✔
1055
    SyncClientHookData data;
1,808✔
1056
    data.event = event;
1,808✔
1057
    data.batch_state = batch_state;
1,808✔
1058
    data.progress = progress;
1,808✔
1059
    data.num_changesets = num_changesets;
1,808✔
1060
    data.query_version = query_version;
1,808✔
1061

904✔
1062
    return call_debug_hook(data);
1,808✔
1063
}
1,808✔
1064

1065
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1066
{
1,384✔
1067
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,384✔
1068
        return SyncClientHookAction::NoAction;
1,212✔
1069
    }
1,212✔
1070
    if (REALM_UNLIKELY(m_state != State::Active)) {
172✔
1071
        return SyncClientHookAction::NoAction;
×
1072
    }
×
1073

84✔
1074
    SyncClientHookData data;
172✔
1075
    data.event = event;
172✔
1076
    data.batch_state = DownloadBatchState::SteadyState;
172✔
1077
    data.progress = m_progress;
172✔
1078
    data.num_changesets = 0;
172✔
1079
    data.query_version = 0;
172✔
1080
    data.error_info = &error_info;
172✔
1081

84✔
1082
    return call_debug_hook(data);
172✔
1083
}
172✔
1084

1085
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1086
{
90,120✔
1087
    // Should never be called if session is not active
45,426✔
1088
    REALM_ASSERT_EX(m_state == State::Active, m_state);
90,120✔
1089
    if (batch_state == DownloadBatchState::SteadyState) {
90,120✔
1090
        return true;
43,104✔
1091
    }
43,104✔
1092

23,690✔
1093
    if (!m_is_flx_sync_session) {
47,016✔
1094
        return true;
42,076✔
1095
    }
42,076✔
1096

2,470✔
1097
    // If this is a steady state DOWNLOAD, no need for special handling.
2,470✔
1098
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,940✔
1099
        return true;
1,034✔
1100
    }
1,034✔
1101

1,954✔
1102
    return false;
3,906✔
1103
}
3,906✔
1104

1105
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1106
{
60✔
1107
    if (m_state != State::Active) {
60✔
1108
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1109
    }
×
1110

30✔
1111
    try {
60✔
1112
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1113
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1114
            return Status{ErrorCodes::LogicError,
4✔
1115
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1116
        }
4✔
1117
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
1118
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1119
        }
×
1120
    }
4✔
1121
    catch (const nlohmann::json::parse_error& e) {
4✔
1122
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1123
    }
4✔
1124

26✔
1125
    auto pf = util::make_promise_future<std::string>();
52✔
1126

26✔
1127
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1128
        // Includes operation_aborted
26✔
1129
        if (!status.is_ok()) {
52✔
1130
            promise.set_error(status);
×
1131
            return;
×
1132
        }
×
1133

26✔
1134
        auto id = ++m_last_pending_test_command_ident;
52✔
1135
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1136
        ensure_enlisted_to_send();
52✔
1137
    });
52✔
1138

26✔
1139
    return std::move(pf.future);
52✔
1140
}
52✔
1141

1142
// ################ SessionWrapper ################
1143

1144
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1145
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1146
// the ClientImpl::Connection that owns the ClientImpl::Session.
1147
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1148
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1149
    : m_client{client}
1150
    , m_db(std::move(db))
1151
    , m_replication(m_db->get_replication())
1152
    , m_protocol_envelope{config.protocol_envelope}
1153
    , m_server_address{std::move(config.server_address)}
1154
    , m_server_port{config.server_port}
1155
    , m_user_id(std::move(config.user_id))
1156
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1157
    , m_authorization_header_name{config.authorization_header_name}
1158
    , m_custom_http_headers{config.custom_http_headers}
1159
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1160
    , m_simulate_integration_error{config.simulate_integration_error}
1161
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1162
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
1163
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1164
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1165
    , m_virt_path{std::move(config.realm_identifier)}
1166
    , m_signed_access_token{std::move(config.signed_user_token)}
1167
    , m_client_reset_config{std::move(config.client_reset_config)}
1168
    , m_proxy_config{config.proxy_config} // Throws
1169
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1170
    , m_session_reason(config.session_reason)
1171
    , m_schema_version(config.schema_version)
1172
    , m_flx_subscription_store(std::move(flx_sub_store))
1173
    , m_migration_store(std::move(migration_store))
1174
{
11,380✔
1175
    REALM_ASSERT(m_db);
11,380✔
1176
    REALM_ASSERT(m_db->get_replication());
11,380✔
1177
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
11,380✔
1178
    if (m_client_reset_config) {
11,380✔
1179
        m_session_reason = SessionReason::ClientReset;
368✔
1180
    }
368✔
1181
}
11,380✔
1182

1183
SessionWrapper::~SessionWrapper() noexcept
1184
{
11,380✔
1185
    if (m_db && m_actualized) {
11,380✔
1186
        m_db->remove_commit_listener(this);
178✔
1187
        m_db->release_sync_agent();
178✔
1188
    }
178✔
1189
}
11,380✔
1190

1191

1192
inline ClientReplication& SessionWrapper::get_replication() noexcept
1193
{
115,298✔
1194
    REALM_ASSERT(m_db);
115,298✔
1195
    return static_cast<ClientReplication&>(*m_replication);
115,298✔
1196
}
115,298✔
1197

1198

1199
inline ClientImpl& SessionWrapper::get_client() noexcept
1200
{
72✔
1201
    return m_client;
72✔
1202
}
72✔
1203

1204
bool SessionWrapper::has_flx_subscription_store() const
1205
{
1,792✔
1206
    return static_cast<bool>(m_flx_subscription_store);
1,792✔
1207
}
1,792✔
1208

1209
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1210
{
20✔
1211
    REALM_ASSERT(!m_finalized);
20✔
1212
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1213
}
20✔
1214

1215
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1216
{
2,036✔
1217
    REALM_ASSERT(!m_finalized);
2,036✔
1218
    m_flx_last_seen_version = version;
2,036✔
1219
    m_flx_active_version = version;
2,036✔
1220
}
2,036✔
1221

1222
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1223
{
1,792✔
1224
    if (!has_flx_subscription_store()) {
1,792✔
1225
        return;
×
1226
    }
×
1227
    REALM_ASSERT(!m_finalized);
1,792✔
1228
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,792✔
1229
    REALM_ASSERT(new_version >= m_flx_active_version);
1,792✔
1230
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,792✔
1231

896✔
1232
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,792✔
1233

896✔
1234
    switch (batch_state) {
1,792✔
1235
        case DownloadBatchState::SteadyState:
✔
1236
            // Cannot be called with this value.
1237
            REALM_UNREACHABLE();
1238
        case DownloadBatchState::LastInBatch:
1,748✔
1239
            if (m_flx_active_version == new_version) {
1,748✔
1240
                return;
×
1241
            }
×
1242
            on_flx_sync_version_complete(new_version);
1,748✔
1243
            if (new_version == 0) {
1,748✔
1244
                new_state = SubscriptionSet::State::Complete;
820✔
1245
            }
820✔
1246
            else {
928✔
1247
                new_state = SubscriptionSet::State::AwaitingMark;
928✔
1248
                m_flx_pending_mark_version = new_version;
928✔
1249
            }
928✔
1250
            break;
1,748✔
1251
        case DownloadBatchState::MoreToCome:
896✔
1252
            if (m_flx_last_seen_version == new_version) {
44✔
1253
                return;
×
1254
            }
×
1255

22✔
1256
            m_flx_last_seen_version = new_version;
44✔
1257
            new_state = SubscriptionSet::State::Bootstrapping;
44✔
1258
            break;
44✔
1259
    }
1,792✔
1260

896✔
1261
    get_flx_subscription_store()->update_state(new_version, new_state);
1,792✔
1262
}
1,792✔
1263

1264
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1265
{
17,738✔
1266
    REALM_ASSERT(!m_finalized);
17,738✔
1267
    return m_flx_subscription_store.get();
17,738✔
1268
}
17,738✔
1269

1270
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1271
{
5,042✔
1272
    REALM_ASSERT(!m_finalized);
5,042✔
1273
    return m_flx_pending_bootstrap_store.get();
5,042✔
1274
}
5,042✔
1275

1276
MigrationStore* SessionWrapper::get_migration_store()
1277
{
62,324✔
1278
    REALM_ASSERT(!m_finalized);
62,324✔
1279
    return m_migration_store.get();
62,324✔
1280
}
62,324✔
1281

1282
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1283
{
3,848✔
1284
    REALM_ASSERT(!m_initiated);
3,848✔
1285
    m_progress_handler = std::move(handler);
3,848✔
1286
}
3,848✔
1287

1288

1289
inline void
1290
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1291
{
11,476✔
1292
    REALM_ASSERT(!m_initiated);
11,476✔
1293
    m_connection_state_change_listener = std::move(listener);
11,476✔
1294
}
11,476✔
1295

1296

1297
void SessionWrapper::initiate()
1298
{
10,228✔
1299
    REALM_ASSERT(!m_initiated);
10,228✔
1300
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
10,228✔
1301
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
10,228✔
1302
    m_initiated = true;
10,228✔
1303
    m_db->add_commit_listener(this);
10,228✔
1304
}
10,228✔
1305

1306

1307
void SessionWrapper::on_commit(version_type new_version)
1308
{
109,622✔
1309
    // Thread safety required
54,248✔
1310
    REALM_ASSERT(m_initiated);
109,622✔
1311

54,248✔
1312
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
109,622✔
1313
        return;
4✔
1314
    }
4✔
1315

54,246✔
1316
    util::bind_ptr<SessionWrapper> self{this};
109,618✔
1317
    m_client.post([self = std::move(self), new_version](Status status) {
109,646✔
1318
        if (status == ErrorCodes::OperationAborted)
109,642✔
1319
            return;
×
1320
        else if (!status.is_ok())
109,642✔
1321
            throw Exception(status);
×
1322

54,242✔
1323
        REALM_ASSERT(self->m_actualized);
109,642✔
1324
        if (REALM_UNLIKELY(!self->m_sess))
109,642✔
1325
            return; // Already finalized
54,752✔
1326
        SessionImpl& sess = *self->m_sess;
108,760✔
1327
        sess.recognize_sync_version(new_version); // Throws
108,760✔
1328
        bool only_if_new_uploadable_data = true;
108,760✔
1329
        self->report_progress(only_if_new_uploadable_data); // Throws
108,760✔
1330
    });
108,760✔
1331
}
109,618✔
1332

1333

1334
void SessionWrapper::cancel_reconnect_delay()
1335
{
20✔
1336
    // Thread safety required
10✔
1337
    REALM_ASSERT(m_initiated);
20✔
1338

10✔
1339
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
20✔
1340
        return;
×
1341
    }
×
1342

10✔
1343
    util::bind_ptr<SessionWrapper> self{this};
20✔
1344
    m_client.post([self = std::move(self)](Status status) {
20✔
1345
        if (status == ErrorCodes::OperationAborted)
20✔
1346
            return;
×
1347
        else if (!status.is_ok())
20✔
1348
            throw Exception(status);
×
1349

10✔
1350
        REALM_ASSERT(self->m_actualized);
20✔
1351
        if (REALM_UNLIKELY(!self->m_sess))
20✔
1352
            return; // Already finalized
10✔
1353
        SessionImpl& sess = *self->m_sess;
20✔
1354
        sess.cancel_resumption_delay(); // Throws
20✔
1355
        ClientImpl::Connection& conn = sess.get_connection();
20✔
1356
        conn.cancel_reconnect_delay(); // Throws
20✔
1357
    });                                // Throws
20✔
1358
}
20✔
1359

1360
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1361
                                    WaitOperCompletionHandler handler)
1362
{
4,850✔
1363
    REALM_ASSERT(upload_completion || download_completion);
4,850✔
1364
    REALM_ASSERT(m_initiated);
4,850✔
1365
    REALM_ASSERT(!m_finalized);
4,850✔
1366

2,338✔
1367
    util::bind_ptr<SessionWrapper> self{this};
4,850✔
1368
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
4,850✔
1369
                   download_completion](Status status) mutable {
4,850✔
1370
        if (status == ErrorCodes::OperationAborted)
4,850✔
1371
            return;
×
1372
        else if (!status.is_ok())
4,850✔
1373
            throw Exception(status);
×
1374

2,338✔
1375
        REALM_ASSERT(self->m_actualized);
4,850✔
1376
        if (REALM_UNLIKELY(!self->m_sess)) {
4,850✔
1377
            // Already finalized
34✔
1378
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
70✔
1379
            return;
70✔
1380
        }
70✔
1381
        if (upload_completion) {
4,780✔
1382
            if (download_completion) {
2,488✔
1383
                // Wait for upload and download completion
148✔
1384
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
294✔
1385
            }
294✔
1386
            else {
2,194✔
1387
                // Wait for upload completion only
1,010✔
1388
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,194✔
1389
            }
2,194✔
1390
        }
2,488✔
1391
        else {
2,292✔
1392
            // Wait for download completion only
1,146✔
1393
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,292✔
1394
        }
2,292✔
1395
        SessionImpl& sess = *self->m_sess;
4,780✔
1396
        if (upload_completion)
4,780✔
1397
            sess.request_upload_completion_notification(); // Throws
2,488✔
1398
        if (download_completion)
4,780✔
1399
            sess.request_download_completion_notification(); // Throws
2,586✔
1400
    });                                                      // Throws
4,780✔
1401
}
4,850✔
1402

1403

1404
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1405
{
12,996✔
1406
    // Thread safety required
6,498✔
1407
    REALM_ASSERT(m_initiated);
12,996✔
1408
    REALM_ASSERT(!m_finalized);
12,996✔
1409

6,498✔
1410
    std::int_fast64_t target_mark;
12,996✔
1411
    {
12,996✔
1412
        std::lock_guard lock{m_client.m_mutex};
12,996✔
1413
        target_mark = ++m_target_upload_mark;
12,996✔
1414
    }
12,996✔
1415

6,498✔
1416
    util::bind_ptr<SessionWrapper> self{this};
12,996✔
1417
    m_client.post([self = std::move(self), target_mark](Status status) {
12,996✔
1418
        if (status == ErrorCodes::OperationAborted)
12,996✔
1419
            return;
×
1420
        else if (!status.is_ok())
12,996✔
1421
            throw Exception(status);
×
1422

6,498✔
1423
        REALM_ASSERT(self->m_actualized);
12,996✔
1424
        // The session wrapper may already have been finalized. This can only
6,498✔
1425
        // happen if it was abandoned, but in that case, the call of
6,498✔
1426
        // wait_for_upload_complete_or_client_stopped() must have returned
6,498✔
1427
        // already.
6,498✔
1428
        if (REALM_UNLIKELY(!self->m_sess))
12,996✔
1429
            return;
6,506✔
1430
        if (target_mark > self->m_staged_upload_mark) {
12,986✔
1431
            self->m_staged_upload_mark = target_mark;
12,986✔
1432
            SessionImpl& sess = *self->m_sess;
12,986✔
1433
            sess.request_upload_completion_notification(); // Throws
12,986✔
1434
        }
12,986✔
1435
    }); // Throws
12,986✔
1436

6,498✔
1437
    bool completion_condition_was_satisfied;
12,996✔
1438
    {
12,996✔
1439
        std::unique_lock lock{m_client.m_mutex};
12,996✔
1440
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
32,970✔
1441
            m_client.m_wait_or_client_stopped_cond.wait(lock);
19,974✔
1442
        completion_condition_was_satisfied = !m_client.m_stopped;
12,996✔
1443
    }
12,996✔
1444
    return completion_condition_was_satisfied;
12,996✔
1445
}
12,996✔
1446

1447

1448
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1449
{
10,092✔
1450
    // Thread safety required
5,048✔
1451
    REALM_ASSERT(m_initiated);
10,092✔
1452
    REALM_ASSERT(!m_finalized);
10,092✔
1453

5,048✔
1454
    std::int_fast64_t target_mark;
10,092✔
1455
    {
10,092✔
1456
        std::lock_guard lock{m_client.m_mutex};
10,092✔
1457
        target_mark = ++m_target_download_mark;
10,092✔
1458
    }
10,092✔
1459

5,048✔
1460
    util::bind_ptr<SessionWrapper> self{this};
10,092✔
1461
    m_client.post([self = std::move(self), target_mark](Status status) {
10,092✔
1462
        if (status == ErrorCodes::OperationAborted)
10,092✔
1463
            return;
×
1464
        else if (!status.is_ok())
10,092✔
1465
            throw Exception(status);
×
1466

5,048✔
1467
        REALM_ASSERT(self->m_actualized);
10,092✔
1468
        // The session wrapper may already have been finalized. This can only
5,048✔
1469
        // happen if it was abandoned, but in that case, the call of
5,048✔
1470
        // wait_for_download_complete_or_client_stopped() must have returned
5,048✔
1471
        // already.
5,048✔
1472
        if (REALM_UNLIKELY(!self->m_sess))
10,092✔
1473
            return;
5,078✔
1474
        if (target_mark > self->m_staged_download_mark) {
10,034✔
1475
            self->m_staged_download_mark = target_mark;
10,034✔
1476
            SessionImpl& sess = *self->m_sess;
10,034✔
1477
            sess.request_download_completion_notification(); // Throws
10,034✔
1478
        }
10,034✔
1479
    }); // Throws
10,034✔
1480

5,048✔
1481
    bool completion_condition_was_satisfied;
10,092✔
1482
    {
10,092✔
1483
        std::unique_lock lock{m_client.m_mutex};
10,092✔
1484
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,652✔
1485
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,560✔
1486
        completion_condition_was_satisfied = !m_client.m_stopped;
10,092✔
1487
    }
10,092✔
1488
    return completion_condition_was_satisfied;
10,092✔
1489
}
10,092✔
1490

1491

1492
void SessionWrapper::refresh(std::string signed_access_token)
1493
{
208✔
1494
    // Thread safety required
104✔
1495
    REALM_ASSERT(m_initiated);
208✔
1496
    REALM_ASSERT(!m_finalized);
208✔
1497

104✔
1498
    m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
208✔
1499
        if (status == ErrorCodes::OperationAborted)
208✔
1500
            return;
×
1501
        else if (!status.is_ok())
208✔
1502
            throw Exception(status);
×
1503

104✔
1504
        REALM_ASSERT(self->m_actualized);
208✔
1505
        if (REALM_UNLIKELY(!self->m_sess))
208✔
1506
            return; // Already finalized
104✔
1507
        self->m_signed_access_token = std::move(token);
208✔
1508
        SessionImpl& sess = *self->m_sess;
208✔
1509
        ClientImpl::Connection& conn = sess.get_connection();
208✔
1510
        // FIXME: This only makes sense when each session uses a separate connection.
104✔
1511
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
208✔
1512
        sess.cancel_resumption_delay();                                                          // Throws
208✔
1513
        conn.cancel_reconnect_delay();                                                           // Throws
208✔
1514
    });
208✔
1515
}
208✔
1516

1517

1518
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1519
{
11,380✔
1520
    if (wrapper->m_initiated) {
11,380✔
1521
        ClientImpl& client = wrapper->m_client;
10,228✔
1522
        client.register_abandoned_session_wrapper(std::move(wrapper));
10,228✔
1523
    }
10,228✔
1524
}
11,380✔
1525

1526

1527
// Must be called from event loop thread
1528
void SessionWrapper::actualize(ServerEndpoint endpoint)
1529
{
10,046✔
1530
    REALM_ASSERT(!m_actualized);
10,046✔
1531
    REALM_ASSERT(!m_sess);
10,046✔
1532
    // Cannot be actualized if it's already been finalized or force closed
4,848✔
1533
    REALM_ASSERT(!m_finalized);
10,046✔
1534
    REALM_ASSERT(!m_force_closed);
10,046✔
1535
    try {
10,046✔
1536
        m_db->claim_sync_agent();
10,046✔
1537
    }
10,046✔
1538
    catch (const MultipleSyncAgents&) {
4,850✔
1539
        finalize_before_actualization();
4✔
1540
        throw;
4✔
1541
    }
4✔
1542
    auto sync_mode = endpoint.server_mode;
10,042✔
1543

4,846✔
1544
    bool was_created = false;
10,042✔
1545
    ClientImpl::Connection& conn = m_client.get_connection(
10,042✔
1546
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,042✔
1547
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,042✔
1548
        was_created); // Throws
10,042✔
1549
    try {
10,042✔
1550
        // FIXME: This only makes sense when each session uses a separate connection.
4,846✔
1551
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,042✔
1552
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,042✔
1553
        if (sync_mode == SyncServerMode::FLX) {
10,042✔
1554
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,334✔
1555
        }
1,334✔
1556

4,846✔
1557
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,042✔
1558
        m_sess = sess.get();
10,042✔
1559
        conn.activate_session(std::move(sess)); // Throws
10,042✔
1560
    }
10,042✔
1561
    catch (...) {
4,846✔
1562
        if (was_created)
×
1563
            m_client.remove_connection(conn);
×
1564

1565
        // finalize_before_actualization() expects m_sess to be nullptr, but it's possible that we
1566
        // reached its assignment above before throwing. Unset it here so we get a clean unhandled
1567
        // exception failure instead of a REALM_ASSERT in finalize_before_actualization().
1568
        m_sess = nullptr;
×
1569
        finalize_before_actualization();
×
1570
        throw;
×
1571
    }
×
1572

4,846✔
1573
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
4,846✔
1574
    // session cannot change the state of the bootstrap store at the same time.
4,846✔
1575
    update_subscription_version_info();
10,040✔
1576

4,846✔
1577
    m_actualized = true;
10,040✔
1578
    if (was_created)
10,040✔
1579
        conn.activate(); // Throws
2,652✔
1580

4,846✔
1581
    if (m_connection_state_change_listener) {
10,040✔
1582
        ConnectionState state = conn.get_state();
10,026✔
1583
        if (state != ConnectionState::disconnected) {
10,026✔
1584
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,248✔
1585
            if (state == ConnectionState::connected)
7,248✔
1586
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,956✔
1587
        }
7,248✔
1588
    }
10,026✔
1589

4,846✔
1590
    if (!m_client_reset_config)
10,040✔
1591
        report_progress(); // Throws
9,676✔
1592
}
10,040✔
1593

1594
void SessionWrapper::force_close()
1595
{
140✔
1596
    if (m_force_closed || m_finalized) {
140✔
1597
        return;
×
1598
    }
×
1599
    REALM_ASSERT(m_actualized);
140✔
1600
    REALM_ASSERT(m_sess);
140✔
1601
    m_force_closed = true;
140✔
1602

72✔
1603
    ClientImpl::Connection& conn = m_sess->get_connection();
140✔
1604
    conn.initiate_session_deactivation(m_sess); // Throws
140✔
1605

72✔
1606
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
72✔
1607
    m_flx_pending_bootstrap_store.reset();
140✔
1608
    // Clear the subscription and migration store refs since they are owned by SyncSession
72✔
1609
    m_flx_subscription_store.reset();
140✔
1610
    m_migration_store.reset();
140✔
1611
    m_sess = nullptr;
140✔
1612
    // Everything is being torn down, no need to report connection state anymore
72✔
1613
    m_connection_state_change_listener = {};
140✔
1614
}
140✔
1615

1616
// Must be called from event loop thread
1617
void SessionWrapper::finalize()
1618
{
10,050✔
1619
    REALM_ASSERT(m_actualized);
10,050✔
1620

4,850✔
1621
    // Already finalized?
4,850✔
1622
    if (m_finalized) {
10,050✔
1623
        return;
×
1624
    }
×
1625

4,850✔
1626
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
4,850✔
1627
    m_db->remove_commit_listener(this);
10,050✔
1628

4,850✔
1629
    m_finalized = true;
10,050✔
1630

4,850✔
1631
    if (!m_force_closed) {
10,050✔
1632
        REALM_ASSERT(m_sess);
9,902✔
1633
        ClientImpl::Connection& conn = m_sess->get_connection();
9,902✔
1634
        conn.initiate_session_deactivation(m_sess); // Throws
9,902✔
1635

4,774✔
1636
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
4,774✔
1637
        m_flx_pending_bootstrap_store.reset();
9,902✔
1638
        // Clear the subscription and migration store refs since they are owned by SyncSession
4,774✔
1639
        m_flx_subscription_store.reset();
9,902✔
1640
        m_migration_store.reset();
9,902✔
1641
        m_sess = nullptr;
9,902✔
1642
    }
9,902✔
1643

4,850✔
1644
    // The Realm file can be closed now, as no access to the Realm file is
4,850✔
1645
    // supposed to happen on behalf of a session after initiation of
4,850✔
1646
    // deactivation.
4,850✔
1647
    m_db->release_sync_agent();
10,050✔
1648
    m_db = nullptr;
10,050✔
1649

4,850✔
1650
    // All outstanding wait operations must be canceled
4,850✔
1651
    while (!m_upload_completion_handlers.empty()) {
10,472✔
1652
        auto handler = std::move(m_upload_completion_handlers.back());
422✔
1653
        m_upload_completion_handlers.pop_back();
422✔
1654
        handler(
422✔
1655
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
422✔
1656
    }
422✔
1657
    while (!m_download_completion_handlers.empty()) {
10,252✔
1658
        auto handler = std::move(m_download_completion_handlers.back());
202✔
1659
        m_download_completion_handlers.pop_back();
202✔
1660
        handler(
202✔
1661
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
202✔
1662
    }
202✔
1663
    while (!m_sync_completion_handlers.empty()) {
10,062✔
1664
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1665
        m_sync_completion_handlers.pop_back();
12✔
1666
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
12✔
1667
    }
12✔
1668
}
10,050✔
1669

1670

1671
// Must be called only when an unactualized session wrapper becomes abandoned.
1672
//
1673
// Called with a lock on `m_client.m_mutex`.
1674
inline void SessionWrapper::finalize_before_actualization() noexcept
1675
{
186✔
1676
    REALM_ASSERT(!m_sess);
186✔
1677
    m_actualized = true;
186✔
1678
    m_force_closed = true;
186✔
1679
}
186✔
1680

1681

1682
inline void SessionWrapper::on_sync_progress()
1683
{
44,854✔
1684
    REALM_ASSERT(!m_finalized);
44,854✔
1685
    m_reliable_download_progress = true;
44,854✔
1686
    report_progress(); // Throws
44,854✔
1687
}
44,854✔
1688

1689

1690
void SessionWrapper::on_upload_completion()
1691
{
14,818✔
1692
    REALM_ASSERT(!m_finalized);
14,818✔
1693
    while (!m_upload_completion_handlers.empty()) {
16,674✔
1694
        auto handler = std::move(m_upload_completion_handlers.back());
1,856✔
1695
        m_upload_completion_handlers.pop_back();
1,856✔
1696
        handler(Status::OK()); // Throws
1,856✔
1697
    }
1,856✔
1698
    while (!m_sync_completion_handlers.empty()) {
15,016✔
1699
        auto handler = std::move(m_sync_completion_handlers.back());
198✔
1700
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
198✔
1701
        m_sync_completion_handlers.pop_back();
198✔
1702
    }
198✔
1703
    std::lock_guard lock{m_client.m_mutex};
14,818✔
1704
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,818✔
1705
        m_reached_upload_mark = m_staged_upload_mark;
12,960✔
1706
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,960✔
1707
    }
12,960✔
1708
}
14,818✔
1709

1710

1711
void SessionWrapper::on_download_completion()
1712
{
17,190✔
1713
    while (!m_download_completion_handlers.empty()) {
19,478✔
1714
        auto handler = std::move(m_download_completion_handlers.back());
2,288✔
1715
        m_download_completion_handlers.pop_back();
2,288✔
1716
        handler(Status::OK()); // Throws
2,288✔
1717
    }
2,288✔
1718
    while (!m_sync_completion_handlers.empty()) {
17,274✔
1719
        auto handler = std::move(m_sync_completion_handlers.back());
84✔
1720
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
84✔
1721
        m_sync_completion_handlers.pop_back();
84✔
1722
    }
84✔
1723

7,864✔
1724
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
17,190✔
1725
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
802✔
1726
                             m_flx_pending_mark_version);
802✔
1727
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
802✔
1728
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
802✔
1729
    }
802✔
1730

7,864✔
1731
    std::lock_guard lock{m_client.m_mutex};
17,190✔
1732
    if (m_staged_download_mark > m_reached_download_mark) {
17,190✔
1733
        m_reached_download_mark = m_staged_download_mark;
9,962✔
1734
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,962✔
1735
    }
9,962✔
1736
}
17,190✔
1737

1738

1739
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1740
{
678✔
1741
    REALM_ASSERT(!m_finalized);
678✔
1742
    m_suspended = true;
678✔
1743
    if (m_connection_state_change_listener) {
678✔
1744
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
678✔
1745
    }
678✔
1746
}
678✔
1747

1748

1749
void SessionWrapper::on_resumed()
1750
{
70✔
1751
    REALM_ASSERT(!m_finalized);
70✔
1752
    m_suspended = false;
70✔
1753
    if (m_connection_state_change_listener) {
70✔
1754
        ClientImpl::Connection& conn = m_sess->get_connection();
70✔
1755
        if (conn.get_state() != ConnectionState::disconnected) {
70✔
1756
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
64✔
1757
            if (conn.get_state() == ConnectionState::connected)
64✔
1758
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
58✔
1759
        }
64✔
1760
    }
70✔
1761
}
70✔
1762

1763

1764
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1765
                                                 const util::Optional<SessionErrorInfo>& error_info)
1766
{
10,664✔
1767
    if (m_connection_state_change_listener) {
10,664✔
1768
        if (!m_suspended)
10,650✔
1769
            m_connection_state_change_listener(state, error_info); // Throws
10,630✔
1770
    }
10,650✔
1771
}
10,664✔
1772

1773

1774
void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
1775
{
163,296✔
1776
    REALM_ASSERT(!m_finalized);
163,296✔
1777
    REALM_ASSERT(m_sess);
163,296✔
1778

81,148✔
1779
    if (!m_progress_handler)
163,296✔
1780
        return;
113,784✔
1781

23,026✔
1782
    std::uint_fast64_t downloaded_bytes = 0;
49,512✔
1783
    std::uint_fast64_t downloadable_bytes = 0;
49,512✔
1784
    std::uint_fast64_t uploaded_bytes = 0;
49,512✔
1785
    std::uint_fast64_t uploadable_bytes = 0;
49,512✔
1786
    std::uint_fast64_t snapshot_version = 0;
49,512✔
1787
    ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
49,512✔
1788
                                             uploadable_bytes, snapshot_version);
49,512✔
1789

23,026✔
1790
    // If this progress notification was triggered by a commit being made we
23,026✔
1791
    // only want to send it if the uploadable bytes has actually increased,
23,026✔
1792
    // and not if it was an empty commit.
23,026✔
1793
    if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes)
49,512✔
1794
        return;
34,972✔
1795
    m_last_reported_uploadable_bytes = uploadable_bytes;
14,540✔
1796

6,218✔
1797
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
6,218✔
1798
    // is only the remaining to download. This is confusing, so make them use
6,218✔
1799
    // the same units.
6,218✔
1800
    std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
14,540✔
1801

6,218✔
1802
    m_sess->logger.debug("Progress handler called, downloaded = %1, "
14,540✔
1803
                         "downloadable(total) = %2, uploaded = %3, "
14,540✔
1804
                         "uploadable = %4, reliable_download_progress = %5, "
14,540✔
1805
                         "snapshot version = %6",
14,540✔
1806
                         downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
14,540✔
1807
                         m_reliable_download_progress, snapshot_version);
14,540✔
1808

6,218✔
1809
    // FIXME: Why is this boolean status communicated to the application as
6,218✔
1810
    // a 64-bit integer? Also, the name `progress_version` is confusing.
6,218✔
1811
    std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
10,800✔
1812
    m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
14,540✔
1813
                       snapshot_version);
14,540✔
1814
}
14,540✔
1815

1816
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1817
{
60✔
1818
    if (!m_sess) {
60✔
1819
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1820
    }
×
1821

30✔
1822
    return m_sess->send_test_command(std::move(body));
60✔
1823
}
60✔
1824

1825
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1826
{
302✔
1827
    REALM_ASSERT(!m_finalized);
302✔
1828

150✔
1829
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
302✔
1830
    REALM_ASSERT(pending_reset);
302✔
1831
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
302✔
1832
                        pending_reset->time);
302✔
1833
    async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
302✔
1834
        if (status == ErrorCodes::OperationAborted) {
302✔
1835
            return;
140✔
1836
        }
140✔
1837
        auto& logger = self->m_sess->logger;
162✔
1838
        if (!status.is_ok()) {
162✔
1839
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1840
            return;
×
1841
        }
×
1842

80✔
1843
        auto wt = self->m_db->start_write();
162✔
1844
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
162✔
1845
        if (!cur_pending_reset) {
162✔
1846
            logger.debug(
×
1847
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1848
                pending_reset.type, pending_reset.time);
×
1849
            return;
×
1850
        }
×
1851
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
162✔
1852
            logger.debug(
×
1853
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
1854
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
1855
        }
×
1856
        else {
162✔
1857
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
162✔
1858
                         "Removing cycle detection tracker.",
162✔
1859
                         pending_reset.type, pending_reset.time);
162✔
1860
        }
162✔
1861
        _impl::client_reset::remove_pending_client_resets(*wt);
162✔
1862
        wt->commit();
162✔
1863
    });
162✔
1864
}
302✔
1865

1866
void SessionWrapper::update_subscription_version_info()
1867
{
10,324✔
1868
    if (!m_flx_subscription_store)
10,324✔
1869
        return;
8,888✔
1870
    auto versions_info = m_flx_subscription_store->get_version_info();
1,436✔
1871
    m_flx_active_version = versions_info.active;
1,436✔
1872
    m_flx_pending_mark_version = versions_info.pending_mark;
1,436✔
1873
}
1,436✔
1874

1875
std::string SessionWrapper::get_appservices_connection_id()
1876
{
72✔
1877
    auto pf = util::make_promise_future<std::string>();
72✔
1878
    REALM_ASSERT(m_initiated);
72✔
1879

36✔
1880
    util::bind_ptr<SessionWrapper> self(this);
72✔
1881
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
1882
        if (!status.is_ok()) {
72✔
1883
            promise.set_error(status);
×
1884
            return;
×
1885
        }
×
1886

36✔
1887
        if (!self->m_sess) {
72✔
1888
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1889
            return;
×
1890
        }
×
1891

36✔
1892
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1893
    });
72✔
1894

36✔
1895
    return pf.future.get();
72✔
1896
}
72✔
1897

1898
// ################ ClientImpl::Connection ################
1899

1900
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1901
                                   const std::string& authorization_header_name,
1902
                                   const std::map<std::string, std::string>& custom_http_headers,
1903
                                   bool verify_servers_ssl_certificate,
1904
                                   Optional<std::string> ssl_trust_certificate_path,
1905
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1906
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1907
    : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws
1908
    , logger{*logger_ptr}
1909
    , m_client{client}
1910
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1911
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1912
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1913
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1914
    , m_reconnect_info{reconnect_info}
1915
    , m_session_history{}
1916
    , m_ident{ident}
1917
    , m_server_endpoint{std::move(endpoint)}
1918
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1919
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1920
{
2,650✔
1921
    m_on_idle = m_client.create_trigger([this](Status status) {
2,656✔
1922
        if (status == ErrorCodes::OperationAborted)
2,656✔
1923
            return;
×
1924
        else if (!status.is_ok())
2,656✔
1925
            throw Exception(status);
×
1926

1,256✔
1927
        REALM_ASSERT(m_activated);
2,656✔
1928
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,656✔
1929
            on_idle(); // Throws
2,652✔
1930
            // Connection object may be destroyed now.
1,256✔
1931
        }
2,652✔
1932
    });
2,656✔
1933
}
2,650✔
1934

1935
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1936
{
12✔
1937
    return m_ident;
12✔
1938
}
12✔
1939

1940

1941
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1942
{
2,652✔
1943
    return m_server_endpoint;
2,652✔
1944
}
2,652✔
1945

1946
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1947
                                                        const std::string& signed_access_token)
1948
{
10,248✔
1949
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,248✔
1950
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,248✔
1951
}
10,248✔
1952

1953

1954
void ClientImpl::Connection::resume_active_sessions()
1955
{
1,628✔
1956
    auto handler = [=](ClientImpl::Session& sess) {
3,252✔
1957
        sess.cancel_resumption_delay(); // Throws
3,252✔
1958
    };
3,252✔
1959
    for_each_active_session(std::move(handler)); // Throws
1,628✔
1960
}
1,628✔
1961

1962
void ClientImpl::Connection::on_idle()
1963
{
2,652✔
1964
    logger.debug("Destroying connection object");
2,652✔
1965
    ClientImpl& client = get_client();
2,652✔
1966
    client.remove_connection(*this);
2,652✔
1967
    // NOTE: This connection object is now destroyed!
1,256✔
1968
}
2,652✔
1969

1970

1971
std::string ClientImpl::Connection::get_http_request_path() const
1972
{
3,434✔
1973
    using namespace std::string_view_literals;
3,434✔
1974
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,434✔
1975

1,656✔
1976
    std::string path;
3,434✔
1977
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,434✔
1978
    path += m_http_request_path_prefix;
3,434✔
1979
    path += param;
3,434✔
1980
    path += m_signed_access_token;
3,434✔
1981

1,656✔
1982
    return path;
3,434✔
1983
}
3,434✔
1984

1985

1986
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1987
{
2,650✔
1988
    std::ostringstream out;
2,650✔
1989
    out.imbue(std::locale::classic());
2,650✔
1990
    out << "Connection[" << ident << "]: "; // Throws
2,650✔
1991
    return out.str();                       // Throws
2,650✔
1992
}
2,650✔
1993

1994

1995
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1996
                                                            util::Optional<SessionErrorInfo> error_info)
1997
{
10,196✔
1998
    if (m_force_closed) {
10,196✔
1999
        return;
2,326✔
2000
    }
2,326✔
2001
    auto handler = [=](ClientImpl::Session& sess) {
10,514✔
2002
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
10,514✔
2003
        sess_2.on_connection_state_changed(state, error_info); // Throws
10,514✔
2004
    };
10,514✔
2005
    for_each_active_session(std::move(handler)); // Throws
7,870✔
2006
}
7,870✔
2007

2008

2009
Client::Client(Config config)
2010
    : m_impl{new ClientImpl{std::move(config)}} // Throws
2011
{
9,614✔
2012
}
9,614✔
2013

2014

2015
Client::Client(Client&& client) noexcept
2016
    : m_impl{std::move(client.m_impl)}
2017
{
×
2018
}
×
2019

2020

2021
Client::~Client() noexcept {}
9,614✔
2022

2023

2024
void Client::shutdown() noexcept
2025
{
9,692✔
2026
    m_impl->shutdown();
9,692✔
2027
}
9,692✔
2028

2029
void Client::shutdown_and_wait()
2030
{
760✔
2031
    m_impl->shutdown_and_wait();
760✔
2032
}
760✔
2033

2034
void Client::cancel_reconnect_delay()
2035
{
1,632✔
2036
    m_impl->cancel_reconnect_delay();
1,632✔
2037
}
1,632✔
2038

2039
void Client::voluntary_disconnect_all_connections()
2040
{
12✔
2041
    m_impl->voluntary_disconnect_all_connections();
12✔
2042
}
12✔
2043

2044
bool Client::wait_for_session_terminations_or_client_stopped()
2045
{
396✔
2046
    return m_impl->wait_for_session_terminations_or_client_stopped();
396✔
2047
}
396✔
2048

2049
util::Future<void> Client::notify_session_terminated()
2050
{
44✔
2051
    return m_impl->notify_session_terminated();
44✔
2052
}
44✔
2053

2054
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2055
                                  port_type& port, std::string& path) const
2056
{
3,772✔
2057
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
3,772✔
2058
}
3,772✔
2059

2060

2061
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2062
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2063
{
11,380✔
2064
    util::bind_ptr<SessionWrapper> sess;
11,380✔
2065
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
11,380✔
2066
                                  std::move(config)}); // Throws
11,380✔
2067
    // The reference count passed back to the application is implicitly
5,518✔
2068
    // owned by a naked pointer. This is done to avoid exposing
5,518✔
2069
    // implementation details through the header file (that is, through the
5,518✔
2070
    // Session object).
5,518✔
2071
    m_impl = sess.release();
11,380✔
2072
}
11,380✔
2073

2074

2075
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2076
{
3,848✔
2077
    m_impl->set_progress_handler(std::move(handler)); // Throws
3,848✔
2078
}
3,848✔
2079

2080

2081
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2082
{
11,476✔
2083
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,476✔
2084
}
11,476✔
2085

2086

2087
void Session::bind()
2088
{
10,228✔
2089
    m_impl->initiate(); // Throws
10,228✔
2090
}
10,228✔
2091

2092

2093
void Session::nonsync_transact_notify(version_type new_version)
2094
{
16,686✔
2095
    m_impl->on_commit(new_version); // Throws
16,686✔
2096
}
16,686✔
2097

2098

2099
void Session::cancel_reconnect_delay()
2100
{
20✔
2101
    m_impl->cancel_reconnect_delay(); // Throws
20✔
2102
}
20✔
2103

2104

2105
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2106
{
4,548✔
2107
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,548✔
2108
}
4,548✔
2109

2110

2111
bool Session::wait_for_upload_complete_or_client_stopped()
2112
{
12,996✔
2113
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,996✔
2114
}
12,996✔
2115

2116

2117
bool Session::wait_for_download_complete_or_client_stopped()
2118
{
10,092✔
2119
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,092✔
2120
}
10,092✔
2121

2122

2123
void Session::refresh(const std::string& signed_access_token)
2124
{
208✔
2125
    m_impl->refresh(signed_access_token); // Throws
208✔
2126
}
208✔
2127

2128

2129
void Session::abandon() noexcept
2130
{
11,380✔
2131
    REALM_ASSERT(m_impl);
11,380✔
2132
    // Reabsorb the ownership assigned to the applications naked pointer by
5,518✔
2133
    // Session constructor
5,518✔
2134
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
11,380✔
2135
    SessionWrapper::abandon(std::move(wrapper));
11,380✔
2136
}
11,380✔
2137

2138
util::Future<std::string> Session::send_test_command(std::string body)
2139
{
60✔
2140
    return m_impl->send_test_command(std::move(body));
60✔
2141
}
60✔
2142

2143
std::string Session::get_appservices_connection_id()
2144
{
72✔
2145
    return m_impl->get_appservices_connection_id();
72✔
2146
}
72✔
2147

2148
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2149
{
×
2150
    switch (proxyType) {
×
2151
        case ProxyConfig::Type::HTTP:
×
2152
            return os << "HTTP";
×
2153
        case ProxyConfig::Type::HTTPS:
×
2154
            return os << "HTTPS";
×
2155
    }
×
2156
    REALM_TERMINATE("Invalid Proxy Type object.");
2157
}
×
2158

2159
} // namespace sync
2160
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc