• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_121

21 Nov 2023 01:54PM UTC coverage: 92.117% (+0.4%) from 91.683%
thomas.goyne_121

push

Evergreen

jedelbo
Move bson files to core utils

92262 of 169120 branches covered (0.0%)

234642 of 254722 relevant lines covered (92.12%)

6329664.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.58
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    void update_subscription_version_info();
127

128
    std::string get_appservices_connection_id();
129

130
private:
131
    ClientImpl& m_client;
132
    DBRef m_db;
133
    Replication* m_replication;
134

135
    const ProtocolEnvelope m_protocol_envelope;
136
    const std::string m_server_address;
137
    const port_type m_server_port;
138
    const std::string m_user_id;
139
    const SyncServerMode m_sync_mode;
140
    const std::string m_authorization_header_name;
141
    const std::map<std::string, std::string> m_custom_http_headers;
142
    const bool m_verify_servers_ssl_certificate;
143
    const bool m_simulate_integration_error;
144
    const Optional<std::string> m_ssl_trust_certificate_path;
145
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
146
    const size_t m_flx_bootstrap_batch_size_bytes;
147

148
    // This one is different from null when, and only when the session wrapper
149
    // is in ClientImpl::m_abandoned_session_wrappers.
150
    SessionWrapper* m_next = nullptr;
151

152
    // After initiation, these may only be accessed by the event loop thread.
153
    std::string m_http_request_path_prefix;
154
    std::string m_virt_path;
155
    std::string m_signed_access_token;
156

157
    util::Optional<ClientReset> m_client_reset_config;
158

159
    util::Optional<ProxyConfig> m_proxy_config;
160

161
    uint_fast64_t m_last_reported_uploadable_bytes = 0;
162
    util::UniqueFunction<ProgressHandler> m_progress_handler;
163
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
164

165
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
166
    bool m_in_debug_hook = false;
167

168
    SessionReason m_session_reason;
169

170
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
171
    int64_t m_flx_active_version = 0;
172
    int64_t m_flx_last_seen_version = 0;
173
    int64_t m_flx_pending_mark_version = 0;
174
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
175

176
    std::shared_ptr<MigrationStore> m_migration_store;
177

178
    bool m_initiated = false;
179

180
    // Set to true when this session wrapper is actualized (or when it is
181
    // finalized before proper actualization). It is then never modified again.
182
    //
183
    // A session specific post handler submitted after the initiation of the
184
    // session wrapper (initiate()) will always find that `m_actualized` is
185
    // true. This is the case, because the scheduling of such a post handler
186
    // will have been preceded by the triggering of
187
    // `ClientImpl::m_actualize_and_finalize` (in
188
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
189
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
190
    // before the post handler. If the session wrapper is no longer in
191
    // `ClientImpl::m_unactualized_session_wrappers` when
192
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
193
    // have been abandoned already, but in that case,
194
    // finalize_before_actualization() has already been called.
195
    bool m_actualized = false;
196

197
    bool m_force_closed = false;
198

199
    bool m_suspended = false;
200

201
    // Has the SessionWrapper been finalized?
202
    bool m_finalized = false;
203

204
    // Set to true when the first DOWNLOAD message is received to indicate that
205
    // the byte-level download progress parameters can be considered reasonable
206
    // reliable. Before that, a lot of time may have passed, so our record of
207
    // the download progress is likely completely out of date.
208
    bool m_reliable_download_progress = false;
209

210
    // Set to point to an activated session object during actualization of the
211
    // session wrapper. Set to null during finalization of the session
212
    // wrapper. Both modifications are guaranteed to be performed by the event
213
    // loop thread.
214
    //
215
    // If a session specific post handler, that is submitted after the
216
    // initiation of the session wrapper, sees that `m_sess` is null, it can
217
    // conclude that the session wrapper has been both abandoned and
218
    // finalized. This is true, because the scheduling of such a post handler
219
    // will have been preceded by the triggering of
220
    // `ClientImpl::m_actualize_and_finalize` (in
221
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
222
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
223
    // before the post handler, so the session wrapper must have been actualized
224
    // unless it was already abandoned by the application. If it was abandoned
225
    // before it was actualized, it will already have been finalized by
226
    // finalize_before_actualization().
227
    //
228
    // Must only be accessed from the event loop thread.
229
    SessionImpl* m_sess = nullptr;
230

231
    // These must only be accessed from the event loop thread.
232
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
233
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
234
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
235

236
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
237
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
238
    // event loop thread.
239
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
240
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
241
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
242

243
    void on_sync_progress();
244
    void on_upload_completion();
245
    void on_download_completion();
246
    void on_suspended(const SessionErrorInfo& error_info);
247
    void on_resumed();
248
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
249
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
250
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
251
    void on_flx_sync_version_complete(int64_t version);
252

253
    void report_progress(bool only_if_new_uploadable_data = false);
254

255
    friend class SessionWrapperStack;
256
    friend class ClientImpl::Session;
257
};
258

259

260
// ################ SessionWrapperStack ################
261

262
inline bool SessionWrapperStack::empty() const noexcept
263
{
×
264
    return !m_back;
×
265
}
×
266

267

268
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
269
{
9,690✔
270
    REALM_ASSERT(!w->m_next);
9,690✔
271
    w->m_next = m_back;
9,690✔
272
    m_back = w.release();
9,690✔
273
}
9,690✔
274

275

276
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
277
{
24,408✔
278
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
24,408✔
279
    if (m_back) {
24,408✔
280
        m_back = m_back->m_next;
9,690✔
281
        w->m_next = nullptr;
9,690✔
282
    }
9,690✔
283
    return w;
24,408✔
284
}
24,408✔
285

286

287
inline void SessionWrapperStack::clear() noexcept
288
{
23,700✔
289
    while (m_back) {
23,700✔
290
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
291
        m_back = w->m_next;
×
292
    }
×
293
}
23,700✔
294

295

296
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
297
    : m_back{q.m_back}
298
{
299
    q.m_back = nullptr;
300
}
301

302

303
SessionWrapperStack::~SessionWrapperStack()
304
{
23,700✔
305
    clear();
23,700✔
306
}
23,700✔
307

308

309
// ################ ClientImpl ################
310

311

312
ClientImpl::~ClientImpl()
313
{
8,986✔
314
    // Since no other thread is allowed to be accessing this client or any of
4,426✔
315
    // its subobjects at this time, no mutex locking is necessary.
4,426✔
316

4,426✔
317
    shutdown_and_wait();
8,986✔
318
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,426✔
319
    // are abandoned.
4,426✔
320
    REALM_ASSERT(m_stopped);
8,986✔
321
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
8,986✔
322
}
8,986✔
323

324

325
void ClientImpl::cancel_reconnect_delay()
326
{
1,720✔
327
    // Thread safety required
910✔
328
    post([this](Status status) {
1,720✔
329
        if (status == ErrorCodes::OperationAborted)
1,720✔
330
            return;
×
331
        else if (!status.is_ok())
1,720✔
332
            throw Exception(status);
×
333

910✔
334
        for (auto& p : m_server_slots) {
1,720✔
335
            ServerSlot& slot = p.second;
1,720✔
336
            if (m_one_connection_per_session) {
1,720✔
337
                REALM_ASSERT(!slot.connection);
×
338
                for (const auto& p : slot.alt_connections) {
×
339
                    ClientImpl::Connection& conn = *p.second;
×
340
                    conn.resume_active_sessions(); // Throws
×
341
                    conn.cancel_reconnect_delay(); // Throws
×
342
                }
×
343
            }
×
344
            else {
1,720✔
345
                REALM_ASSERT(slot.alt_connections.empty());
1,720✔
346
                if (slot.connection) {
1,720✔
347
                    ClientImpl::Connection& conn = *slot.connection;
1,716✔
348
                    conn.resume_active_sessions(); // Throws
1,716✔
349
                    conn.cancel_reconnect_delay(); // Throws
1,716✔
350
                }
1,716✔
351
                else {
4✔
352
                    slot.reconnect_info.reset();
4✔
353
                }
4✔
354
            }
1,720✔
355
        }
1,720✔
356
    }); // Throws
1,720✔
357
}
1,720✔
358

359

360
void ClientImpl::voluntary_disconnect_all_connections()
361
{
12✔
362
    auto done_pf = util::make_promise_future<void>();
12✔
363
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
364
        if (status == ErrorCodes::OperationAborted) {
12✔
365
            return;
×
366
        }
×
367

6✔
368
        REALM_ASSERT(status.is_ok());
12✔
369

6✔
370
        try {
12✔
371
            for (auto& p : m_server_slots) {
12✔
372
                ServerSlot& slot = p.second;
12✔
373
                if (m_one_connection_per_session) {
12✔
374
                    REALM_ASSERT(!slot.connection);
×
375
                    for (const auto& p : slot.alt_connections) {
×
376
                        ClientImpl::Connection& conn = *p.second;
×
377
                        if (conn.get_state() == ConnectionState::disconnected) {
×
378
                            continue;
×
379
                        }
×
380
                        conn.voluntary_disconnect();
×
381
                    }
×
382
                }
×
383
                else {
12✔
384
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
385
                    if (!slot.connection) {
12✔
386
                        continue;
×
387
                    }
×
388
                    ClientImpl::Connection& conn = *slot.connection;
12✔
389
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
390
                        continue;
×
391
                    }
×
392
                    conn.voluntary_disconnect();
12✔
393
                }
12✔
394
            }
12✔
395
        }
12✔
396
        catch (...) {
6✔
397
            promise.set_error(exception_to_status());
×
398
            return;
×
399
        }
×
400
        promise.emplace_value();
12✔
401
    });
12✔
402
    done_pf.future.get();
12✔
403
}
12✔
404

405

406
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
407
{
368✔
408
    // Thread safety required
34✔
409

34✔
410
    {
368✔
411
        std::lock_guard lock{m_mutex};
368✔
412
        m_sessions_terminated = false;
368✔
413
    }
368✔
414

34✔
415
    // The technique employed here relies on the fact that
34✔
416
    // actualize_and_finalize_session_wrappers() must get to execute at least
34✔
417
    // once before the post handler submitted below gets to execute, but still
34✔
418
    // at a time where all session wrappers, that are abandoned prior to the
34✔
419
    // execution of wait_for_session_terminations_or_client_stopped(), have been
34✔
420
    // added to `m_abandoned_session_wrappers`.
34✔
421
    //
34✔
422
    // To see that this is the case, consider a session wrapper that was
34✔
423
    // abandoned before wait_for_session_terminations_or_client_stopped() was
34✔
424
    // invoked. Then the session wrapper will have been added to
34✔
425
    // `m_abandoned_session_wrappers`, and an invocation of
34✔
426
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
34✔
427
    // guarantees mentioned in the documentation of Trigger then ensure
34✔
428
    // that at least one execution of actualize_and_finalize_session_wrappers()
34✔
429
    // will happen after the session wrapper has been added to
34✔
430
    // `m_abandoned_session_wrappers`, but before the post handler submitted
34✔
431
    // below gets to execute.
34✔
432
    post([this](Status status) mutable {
368✔
433
        if (status == ErrorCodes::OperationAborted)
368✔
434
            return;
×
435
        else if (!status.is_ok())
368✔
436
            throw Exception(status);
×
437

34✔
438
        std::lock_guard lock{m_mutex};
368✔
439
        m_sessions_terminated = true;
368✔
440
        m_wait_or_client_stopped_cond.notify_all();
368✔
441
    }); // Throws
368✔
442

34✔
443
    bool completion_condition_was_satisfied;
368✔
444
    {
368✔
445
        std::unique_lock lock{m_mutex};
368✔
446
        while (!m_sessions_terminated && !m_stopped)
710✔
447
            m_wait_or_client_stopped_cond.wait(lock);
342✔
448
        completion_condition_was_satisfied = !m_stopped;
368✔
449
    }
368✔
450
    return completion_condition_was_satisfied;
368✔
451
}
368✔
452

453

454
void ClientImpl::drain_connections_on_loop()
455
{
8,986✔
456
    post([this](Status status) mutable {
8,986✔
457
        REALM_ASSERT(status.is_ok());
8,986✔
458
        drain_connections();
8,986✔
459
    });
8,986✔
460
}
8,986✔
461

462
void ClientImpl::shutdown_and_wait()
463
{
9,746✔
464
    shutdown();
9,746✔
465
    std::unique_lock lock{m_drain_mutex};
9,746✔
466
    if (m_drained) {
9,746✔
467
        return;
756✔
468
    }
756✔
469

4,428✔
470
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
8,990✔
471
    m_drain_cv.wait(lock, [&] {
19,952✔
472
        return m_num_connections == 0 && m_outstanding_posts == 0;
19,952✔
473
    });
19,952✔
474

4,428✔
475
    m_drained = true;
8,990✔
476
}
8,990✔
477

478
void ClientImpl::shutdown() noexcept
479
{
18,810✔
480
    {
18,810✔
481
        std::lock_guard lock{m_mutex};
18,810✔
482
        if (m_stopped)
18,810✔
483
            return;
9,824✔
484
        m_stopped = true;
8,986✔
485
        m_wait_or_client_stopped_cond.notify_all();
8,986✔
486
    }
8,986✔
487

4,426✔
488
    drain_connections_on_loop();
8,986✔
489
}
8,986✔
490

491

492
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
493
{
9,854✔
494
    // Thread safety required.
4,756✔
495

4,756✔
496
    std::lock_guard lock{m_mutex};
9,854✔
497
    REALM_ASSERT(m_actualize_and_finalize);
9,854✔
498
    m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
9,854✔
499
    bool retrigger = !m_actualize_and_finalize_needed;
9,854✔
500
    m_actualize_and_finalize_needed = true;
9,854✔
501
    // The conditional triggering needs to happen before releasing the mutex,
4,756✔
502
    // because if two threads call register_unactualized_session_wrapper()
4,756✔
503
    // roughly concurrently, then only the first one is guaranteed to be asked
4,756✔
504
    // to retrigger, but that retriggering must have happened before the other
4,756✔
505
    // thread returns from register_unactualized_session_wrapper().
4,756✔
506
    //
4,756✔
507
    // Note that a similar argument applies when two threads call
4,756✔
508
    // register_abandoned_session_wrapper(), and when one thread calls one of
4,756✔
509
    // them and another thread call the other.
4,756✔
510
    if (retrigger)
9,854✔
511
        m_actualize_and_finalize->trigger();
5,760✔
512
}
9,854✔
513

514

515
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
516
{
9,856✔
517
    // Thread safety required.
4,756✔
518

4,756✔
519
    std::lock_guard lock{m_mutex};
9,856✔
520
    REALM_ASSERT(m_actualize_and_finalize);
9,856✔
521

4,756✔
522
    // If the session wrapper has not yet been actualized (on the event loop
4,756✔
523
    // thread), it can be immediately finalized. This ensures that we will
4,756✔
524
    // generally not actualize a session wrapper that has already been
4,756✔
525
    // abandoned.
4,756✔
526
    auto i = m_unactualized_session_wrappers.find(wrapper.get());
9,856✔
527
    if (i != m_unactualized_session_wrappers.end()) {
9,856✔
528
        m_unactualized_session_wrappers.erase(i);
166✔
529
        wrapper->finalize_before_actualization();
166✔
530
        return;
166✔
531
    }
166✔
532
    m_abandoned_session_wrappers.push(std::move(wrapper));
9,690✔
533
    bool retrigger = !m_actualize_and_finalize_needed;
9,690✔
534
    m_actualize_and_finalize_needed = true;
9,690✔
535
    // The conditional triggering needs to happen before releasing the
4,668✔
536
    // mutex. See implementation of register_unactualized_session_wrapper() for
4,668✔
537
    // details.
4,668✔
538
    if (retrigger)
9,690✔
539
        m_actualize_and_finalize->trigger();
8,956✔
540
}
9,690✔
541

542

543
// Must be called from the event loop thread.
544
void ClientImpl::actualize_and_finalize_session_wrappers()
545
{
14,718✔
546
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
14,718✔
547
    SessionWrapperStack abandoned_session_wrappers;
14,718✔
548
    bool stopped;
14,718✔
549
    {
14,718✔
550
        std::lock_guard lock{m_mutex};
14,718✔
551
        m_actualize_and_finalize_needed = false;
14,718✔
552
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
14,718✔
553
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
14,718✔
554
        stopped = m_stopped;
14,718✔
555
    }
14,718✔
556
    // Note, we need to finalize old session wrappers before we actualize new
6,740✔
557
    // ones. This ensures that deactivation of old sessions is initiated before
6,740✔
558
    // new session are activated. This, in turn, ensures that the server does
6,740✔
559
    // not see two overlapping sessions for the same local Realm file.
6,740✔
560
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
24,408✔
561
        wrapper->finalize(); // Throws
9,690✔
562
    if (stopped) {
14,718✔
563
        for (auto& p : unactualized_session_wrappers) {
412✔
564
            SessionWrapper& wrapper = *p.first;
4✔
565
            wrapper.finalize_before_actualization();
4✔
566
        }
4✔
567
        return;
776✔
568
    }
776✔
569
    for (auto& p : unactualized_session_wrappers) {
13,942✔
570
        SessionWrapper& wrapper = *p.first;
9,686✔
571
        ServerEndpoint server_endpoint = std::move(p.second);
9,686✔
572
        wrapper.actualize(std::move(server_endpoint)); // Throws
9,686✔
573
    }
9,686✔
574
}
13,942✔
575

576

577
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
578
                                                   const std::string& authorization_header_name,
579
                                                   const std::map<std::string, std::string>& custom_http_headers,
580
                                                   bool verify_servers_ssl_certificate,
581
                                                   Optional<std::string> ssl_trust_certificate_path,
582
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
583
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
584
{
9,680✔
585
    auto&& [server_slot_it, inserted] =
9,680✔
586
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
9,680✔
587
    ServerSlot& server_slot = server_slot_it->second; // Throws
9,680✔
588

4,664✔
589
    // TODO: enable multiplexing with proxies
4,664✔
590
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
9,680✔
591
        // Use preexisting connection
3,482✔
592
        REALM_ASSERT(server_slot.alt_connections.empty());
7,178✔
593
        return *server_slot.connection;
7,178✔
594
    }
7,178✔
595

1,182✔
596
    // Create a new connection
1,182✔
597
    REALM_ASSERT(!server_slot.connection);
2,502✔
598
    connection_ident_type ident = m_prev_connection_ident + 1;
2,502✔
599
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,502✔
600
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,502✔
601
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,502✔
602
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,502✔
603
    ClientImpl::Connection& conn = *conn_2;
2,502✔
604
    if (!m_one_connection_per_session) {
2,502✔
605
        server_slot.connection = std::move(conn_2);
2,490✔
606
    }
2,490✔
607
    else {
12✔
608
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
12✔
609
    }
12✔
610
    m_prev_connection_ident = ident;
2,502✔
611
    was_created = true;
2,502✔
612
    {
2,502✔
613
        std::lock_guard lk(m_drain_mutex);
2,502✔
614
        ++m_num_connections;
2,502✔
615
    }
2,502✔
616
    return conn;
2,502✔
617
}
2,502✔
618

619

620
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
621
{
2,504✔
622
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,504✔
623
    auto i = m_server_slots.find(endpoint);
2,504✔
624
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,504✔
625
    ServerSlot& server_slot = i->second;
2,504✔
626
    if (!m_one_connection_per_session) {
2,504✔
627
        REALM_ASSERT(server_slot.alt_connections.empty());
2,488✔
628
        REALM_ASSERT(&*server_slot.connection == &conn);
2,488✔
629
        server_slot.reconnect_info = conn.get_reconnect_info();
2,488✔
630
        server_slot.connection.reset();
2,488✔
631
    }
2,488✔
632
    else {
16✔
633
        REALM_ASSERT(!server_slot.connection);
16✔
634
        connection_ident_type ident = conn.get_ident();
16✔
635
        auto j = server_slot.alt_connections.find(ident);
16✔
636
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
16✔
637
        REALM_ASSERT(&*j->second == &conn);
16✔
638
        server_slot.alt_connections.erase(j);
16✔
639
    }
16✔
640

1,182✔
641
    {
2,504✔
642
        std::lock_guard lk(m_drain_mutex);
2,504✔
643
        REALM_ASSERT(m_num_connections);
2,504✔
644
        --m_num_connections;
2,504✔
645
        m_drain_cv.notify_all();
2,504✔
646
    }
2,504✔
647
}
2,504✔
648

649

650
// ################ SessionImpl ################
651

652
void SessionImpl::force_close()
653
{
150✔
654
    // Allow force_close() if session is active or hasn't been activated yet.
76✔
655
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
150!
656
        m_wrapper.force_close();
150✔
657
    }
150✔
658
}
150✔
659

660
void SessionImpl::on_connection_state_changed(ConnectionState state,
661
                                              const util::Optional<SessionErrorInfo>& error_info)
662
{
10,604✔
663
    // Only used to report errors back to the SyncSession while the Session is active
5,322✔
664
    if (m_state == SessionImpl::Active) {
10,606✔
665
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
10,606✔
666
    }
10,606✔
667
}
10,604✔
668

669

670
const std::string& SessionImpl::get_virt_path() const noexcept
671
{
8,186✔
672
    // Can only be called if the session is active or being activated
3,810✔
673
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
8,186!
674
    return m_wrapper.m_virt_path;
8,186✔
675
}
8,186✔
676

677
const std::string& SessionImpl::get_realm_path() const noexcept
678
{
9,950✔
679
    // Can only be called if the session is active or being activated
4,798✔
680
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,950✔
681
    return m_wrapper.m_db->get_path();
9,950✔
682
}
9,950✔
683

684
DBRef SessionImpl::get_db() const noexcept
685
{
21,880✔
686
    // Can only be called if the session is active or being activated
11,586✔
687
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
21,880!
688
    return m_wrapper.m_db;
21,880✔
689
}
21,880✔
690

691
ClientReplication& SessionImpl::get_repl() const noexcept
692
{
111,214✔
693
    // Can only be called if the session is active or being activated
56,284✔
694
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
111,214✔
695
    return m_wrapper.get_replication();
111,214✔
696
}
111,214✔
697

698
ClientHistory& SessionImpl::get_history() const noexcept
699
{
109,722✔
700
    return get_repl().get_history();
109,722✔
701
}
109,722✔
702

703
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
704
{
12,912✔
705
    // Can only be called if the session is active or being activated
6,196✔
706
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
12,912✔
707
    return m_wrapper.m_client_reset_config;
12,912✔
708
}
12,912✔
709

710
SessionReason SessionImpl::get_session_reason() noexcept
711
{
1,408✔
712
    // Can only be called if the session is active or being activated
722✔
713
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,408!
714
    return m_wrapper.m_session_reason;
1,408✔
715
}
1,408✔
716

717
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
718
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
719
{
41,134✔
720
    // Ignore the call if the session is not active
21,332✔
721
    if (m_state != State::Active) {
41,134✔
722
        return;
×
723
    }
×
724

21,332✔
725
    try {
41,134✔
726
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
41,134!
727
        if (simulate_integration_error) {
41,134✔
728
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
729
        }
×
730
        version_type client_version;
41,134✔
731
        if (REALM_LIKELY(!get_client().is_dry_run())) {
41,134✔
732
            VersionInfo version_info;
41,134✔
733
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
41,134✔
734
            client_version = version_info.realm_version;
41,134✔
735
        }
41,134✔
736
        else {
×
737
            // Fake it for "dry run" mode
738
            client_version = m_last_version_available + 1;
×
739
        }
×
740
        on_changesets_integrated(client_version, progress); // Throws
41,134✔
741
    }
41,134✔
742
    catch (const IntegrationException& e) {
21,344✔
743
        on_integration_failure(e);
24✔
744
    }
24✔
745
    m_wrapper.on_sync_progress(); // Throws
41,136✔
746
}
41,136✔
747

748

749
void SessionImpl::on_upload_completion()
750
{
14,656✔
751
    // Ignore the call if the session is not active
7,242✔
752
    if (m_state == State::Active) {
14,656✔
753
        m_wrapper.on_upload_completion(); // Throws
14,656✔
754
    }
14,656✔
755
}
14,656✔
756

757

758
void SessionImpl::on_download_completion()
759
{
16,202✔
760
    // Ignore the call if the session is not active
7,634✔
761
    if (m_state == State::Active) {
16,202✔
762
        m_wrapper.on_download_completion(); // Throws
16,202✔
763
    }
16,202✔
764
}
16,202✔
765

766

767
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
768
{
926✔
769
    // Ignore the call if the session is not active
476✔
770
    if (m_state == State::Active) {
926✔
771
        m_wrapper.on_suspended(error_info); // Throws
926✔
772
    }
926✔
773
}
926✔
774

775

776
void SessionImpl::on_resumed()
777
{
390✔
778
    // Ignore the call if the session is not active
210✔
779
    if (m_state == State::Active) {
390✔
780
        m_wrapper.on_resumed(); // Throws
390✔
781
    }
390✔
782
}
390✔
783

784
void SessionImpl::handle_pending_client_reset_acknowledgement()
785
{
288✔
786
    // Ignore the call if the session is not active
144✔
787
    if (m_state == State::Active) {
288✔
788
        m_wrapper.handle_pending_client_reset_acknowledgement();
288✔
789
    }
288✔
790
}
288✔
791

792
void SessionImpl::update_subscription_version_info()
793
{
268✔
794
    // Ignore the call if the session is not active
134✔
795
    if (m_state == State::Active) {
268✔
796
        m_wrapper.update_subscription_version_info();
268✔
797
    }
268✔
798
}
268✔
799

800
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
801
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
802
{
42,780✔
803
    // Ignore the call if the session is not active
22,156✔
804
    if (m_state != State::Active) {
42,780✔
805
        return false;
×
806
    }
×
807

22,156✔
808
    if (is_steady_state_download_message(batch_state, query_version)) {
42,780✔
809
        return false;
41,136✔
810
    }
41,136✔
811

824✔
812
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,644✔
813
    util::Optional<SyncProgress> maybe_progress;
1,644✔
814
    if (batch_state == DownloadBatchState::LastInBatch) {
1,644✔
815
        maybe_progress = progress;
1,482✔
816
    }
1,482✔
817

824✔
818
    bool new_batch = false;
1,644✔
819
    try {
1,644✔
820
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,644✔
821
    }
1,644✔
822
    catch (const LogicError& ex) {
824✔
823
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
824
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
825
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
826
                                    ProtocolError::bad_changeset_size);
×
827
            on_integration_failure(ex);
×
828
            return true;
×
829
        }
×
830
        throw;
×
831
    }
×
832

822✔
833
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
822✔
834
    // bootstrapping.
822✔
835
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,642✔
836
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
40✔
837
    }
40✔
838

822✔
839
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
1,642✔
840
                                       batch_state, received_changesets.size());
1,642✔
841
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,642✔
842
        return true;
12✔
843
    }
12✔
844
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,630✔
845

816✔
846
    if (batch_state == DownloadBatchState::MoreToCome) {
1,630✔
847
        return true;
156✔
848
    }
156✔
849

738✔
850
    try {
1,474✔
851
        process_pending_flx_bootstrap();
1,474✔
852
    }
1,474✔
853
    catch (const IntegrationException& e) {
742✔
854
        on_integration_failure(e);
8✔
855
    }
8✔
856

738✔
857
    return true;
1,474✔
858
}
1,474✔
859

860

861
void SessionImpl::process_pending_flx_bootstrap()
862
{
11,156✔
863
    // Ignore the call if not a flx session or session is not active
5,402✔
864
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,156✔
865
        return;
8,644✔
866
    }
8,644✔
867
    // Should never be called if session is not active
1,258✔
868
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
2,512✔
869
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,512✔
870
    if (!bootstrap_store->has_pending()) {
2,512✔
871
        return;
1,022✔
872
    }
1,022✔
873

746✔
874
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,490✔
875
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,490✔
876
                "changeset size: %3)",
1,490✔
877
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,490✔
878
                pending_batch_stats.pending_changeset_bytes);
1,490✔
879
    auto& history = get_repl().get_history();
1,490✔
880
    VersionInfo new_version;
1,490✔
881
    SyncProgress progress;
1,490✔
882
    int64_t query_version = -1;
1,490✔
883
    size_t changesets_processed = 0;
1,490✔
884

746✔
885
    // Used to commit each batch after it was transformed.
746✔
886
    TransactionRef transact = get_db()->start_write();
1,490✔
887
    while (bootstrap_store->has_pending()) {
3,096✔
888
        auto start_time = std::chrono::steady_clock::now();
1,622✔
889
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,622✔
890
        if (!pending_batch.progress) {
1,622✔
891
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
892
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
4✔
893
            // bootstrap store requires a write transaction itself.
4✔
894
            transact->close();
8✔
895
            bootstrap_store->clear();
8✔
896
            return;
8✔
897
        }
8✔
898

808✔
899
        auto batch_state =
1,614✔
900
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,546✔
901
        uint64_t downloadable_bytes = 0;
1,614✔
902
        query_version = pending_batch.query_version;
1,614✔
903
        bool simulate_integration_error =
1,614✔
904
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,614✔
905
        if (simulate_integration_error) {
1,614✔
906
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
8✔
907
        }
8✔
908

804✔
909
        history.integrate_server_changesets(
1,606✔
910
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
1,606✔
911
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
1,604✔
912
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,602✔
913
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,602✔
914
            });
1,602✔
915
        progress = *pending_batch.progress;
1,606✔
916
        changesets_processed += pending_batch.changesets.size();
1,606✔
917
        auto duration = std::chrono::steady_clock::now() - start_time;
1,606✔
918

804✔
919
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,606✔
920
                                      batch_state, pending_batch.changesets.size());
1,606✔
921
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,606✔
922

804✔
923
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,606✔
924
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,606✔
925
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,606✔
926
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,606✔
927
                    pending_batch.remaining_changesets);
1,606✔
928
    }
1,606✔
929
    on_changesets_integrated(new_version.realm_version, progress);
1,482✔
930

738✔
931
    REALM_ASSERT_3(query_version, !=, -1);
1,474✔
932
    m_wrapper.on_sync_progress();
1,474✔
933
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,474✔
934

738✔
935
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,474✔
936
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,474✔
937
    // NoAction/EarlyReturn are both valid no-op actions to take here.
738✔
938
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,474✔
939
}
1,474✔
940

941
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
942
{
16✔
943
    // Ignore the call if the session is not active
8✔
944
    if (m_state == State::Active) {
16✔
945
        m_wrapper.on_flx_sync_error(version, err_msg);
16✔
946
    }
16✔
947
}
16✔
948

949
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
950
{
1,510✔
951
    // Ignore the call if the session is not active
756✔
952
    if (m_state == State::Active) {
1,510✔
953
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,510✔
954
    }
1,510✔
955
}
1,510✔
956

957
SubscriptionStore* SessionImpl::get_flx_subscription_store()
958
{
13,514✔
959
    // Should never be called if session is not active
6,876✔
960
    REALM_ASSERT_EX(m_state == State::Active, m_state);
13,514✔
961
    return m_wrapper.get_flx_subscription_store();
13,514✔
962
}
13,514✔
963

964
MigrationStore* SessionImpl::get_migration_store()
965
{
57,722✔
966
    // Should never be called if session is not active
29,856✔
967
    REALM_ASSERT_EX(m_state == State::Active, m_state);
57,722✔
968
    return m_wrapper.get_migration_store();
57,722✔
969
}
57,722✔
970

971
void SessionImpl::on_flx_sync_version_complete(int64_t version)
972
{
250✔
973
    // Ignore the call if the session is not active
134✔
974
    if (m_state == State::Active) {
250✔
975
        m_wrapper.on_flx_sync_version_complete(version);
250✔
976
    }
250✔
977
}
250✔
978

979
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
980
{
844✔
981
    // Should never be called if session is not active
428✔
982
    REALM_ASSERT_EX(m_state == State::Active, m_state);
844✔
983

428✔
984
    // Make sure we don't call the debug hook recursively.
428✔
985
    if (m_wrapper.m_in_debug_hook) {
844✔
986
        return SyncClientHookAction::NoAction;
×
987
    }
×
988
    m_wrapper.m_in_debug_hook = true;
844✔
989
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
844✔
990
        m_wrapper.m_in_debug_hook = false;
844✔
991
    });
844✔
992

428✔
993
    auto action = m_wrapper.m_debug_hook(data);
844✔
994
    switch (action) {
844✔
995
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
✔
996
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
×
997
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
×
998

999
            auto err_processing_err = receive_error_message(err_info);
×
1000
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
×
1001
            return SyncClientHookAction::EarlyReturn;
×
1002
        }
×
1003
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1004
            get_connection().voluntary_disconnect();
24✔
1005
            return SyncClientHookAction::EarlyReturn;
24✔
1006
        }
×
1007
        default:
820✔
1008
            return action;
820✔
1009
    }
844✔
1010
}
844✔
1011

1012
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1013
                                                  int64_t query_version, DownloadBatchState batch_state,
1014
                                                  size_t num_changesets)
1015
{
88,640✔
1016
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
88,640✔
1017
        return SyncClientHookAction::NoAction;
87,870✔
1018
    }
87,870✔
1019
    if (REALM_UNLIKELY(m_state != State::Active)) {
770✔
1020
        return SyncClientHookAction::NoAction;
×
1021
    }
×
1022

388✔
1023
    SyncClientHookData data;
770✔
1024
    data.event = event;
770✔
1025
    data.batch_state = batch_state;
770✔
1026
    data.progress = progress;
770✔
1027
    data.num_changesets = num_changesets;
770✔
1028
    data.query_version = query_version;
770✔
1029

388✔
1030
    return call_debug_hook(data);
770✔
1031
}
770✔
1032

1033
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1034
{
902✔
1035
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
902✔
1036
        return SyncClientHookAction::NoAction;
828✔
1037
    }
828✔
1038
    if (REALM_UNLIKELY(m_state != State::Active)) {
74✔
1039
        return SyncClientHookAction::NoAction;
×
1040
    }
×
1041

38✔
1042
    SyncClientHookData data;
74✔
1043
    data.event = event;
74✔
1044
    data.batch_state = DownloadBatchState::SteadyState;
74✔
1045
    data.progress = m_progress;
74✔
1046
    data.num_changesets = 0;
74✔
1047
    data.query_version = 0;
74✔
1048
    data.error_info = &error_info;
74✔
1049

38✔
1050
    return call_debug_hook(data);
74✔
1051
}
74✔
1052

1053
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1054
{
85,568✔
1055
    // Should never be called if session is not active
44,316✔
1056
    REALM_ASSERT_EX(m_state == State::Active, m_state);
85,568✔
1057
    if (batch_state == DownloadBatchState::SteadyState) {
85,568✔
1058
        return true;
41,136✔
1059
    }
41,136✔
1060

22,982✔
1061
    if (!m_is_flx_sync_session) {
44,432✔
1062
        return true;
40,246✔
1063
    }
40,246✔
1064

2,096✔
1065
    // If this is a steady state DOWNLOAD, no need for special handling.
2,096✔
1066
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,186✔
1067
        return true;
896✔
1068
    }
896✔
1069

1,646✔
1070
    return false;
3,290✔
1071
}
3,290✔
1072

1073
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1074
{
52✔
1075
    if (m_state != State::Active) {
52✔
1076
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1077
    }
×
1078

26✔
1079
    try {
52✔
1080
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
52✔
1081
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
52✔
1082
            return Status{ErrorCodes::LogicError,
4✔
1083
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1084
        }
4✔
1085
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
48✔
1086
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1087
        }
×
1088
    }
4✔
1089
    catch (const nlohmann::json::parse_error& e) {
4✔
1090
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1091
    }
4✔
1092

22✔
1093
    auto pf = util::make_promise_future<std::string>();
44✔
1094

22✔
1095
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
44✔
1096
        // Includes operation_aborted
22✔
1097
        if (!status.is_ok())
44✔
1098
            promise.set_error(status);
×
1099

22✔
1100
        auto id = ++m_last_pending_test_command_ident;
44✔
1101
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
44✔
1102
        ensure_enlisted_to_send();
44✔
1103
    });
44✔
1104

22✔
1105
    return std::move(pf.future);
44✔
1106
}
44✔
1107

1108
// ################ SessionWrapper ################
1109

1110
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1111
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1112
// the ClientImpl::Connection that owns the ClientImpl::Session.
1113
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1114
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1115
    : m_client{client}
1116
    , m_db(std::move(db))
1117
    , m_replication(m_db->get_replication())
1118
    , m_protocol_envelope{config.protocol_envelope}
1119
    , m_server_address{std::move(config.server_address)}
1120
    , m_server_port{config.server_port}
1121
    , m_user_id(std::move(config.user_id))
1122
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1123
    , m_authorization_header_name{config.authorization_header_name}
1124
    , m_custom_http_headers{config.custom_http_headers}
1125
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1126
    , m_simulate_integration_error{config.simulate_integration_error}
1127
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1128
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
1129
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1130
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1131
    , m_virt_path{std::move(config.realm_identifier)}
1132
    , m_signed_access_token{std::move(config.signed_user_token)}
1133
    , m_client_reset_config{std::move(config.client_reset_config)}
1134
    , m_proxy_config{config.proxy_config} // Throws
1135
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1136
    , m_session_reason(config.session_reason)
1137
    , m_flx_subscription_store(std::move(flx_sub_store))
1138
    , m_migration_store(std::move(migration_store))
1139
{
11,008✔
1140
    REALM_ASSERT(m_db);
11,008✔
1141
    REALM_ASSERT(m_db->get_replication());
11,008✔
1142
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
11,008✔
1143
    if (m_client_reset_config) {
11,008✔
1144
        m_session_reason = SessionReason::ClientReset;
344✔
1145
    }
344✔
1146

5,332✔
1147
    update_subscription_version_info();
11,008✔
1148
}
11,008✔
1149

1150
SessionWrapper::~SessionWrapper() noexcept
1151
{
11,008✔
1152
    if (m_db && m_actualized) {
11,008✔
1153
        m_db->remove_commit_listener(this);
166✔
1154
        m_db->release_sync_agent();
166✔
1155
    }
166✔
1156
}
11,008✔
1157

1158

1159
inline ClientReplication& SessionWrapper::get_replication() noexcept
1160
{
111,214✔
1161
    REALM_ASSERT(m_db);
111,214✔
1162
    return static_cast<ClientReplication&>(*m_replication);
111,214✔
1163
}
111,214✔
1164

1165

1166
inline ClientImpl& SessionWrapper::get_client() noexcept
1167
{
72✔
1168
    return m_client;
72✔
1169
}
72✔
1170

1171
bool SessionWrapper::has_flx_subscription_store() const
1172
{
1,510✔
1173
    return static_cast<bool>(m_flx_subscription_store);
1,510✔
1174
}
1,510✔
1175

1176
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1177
{
16✔
1178
    REALM_ASSERT(!m_finalized);
16✔
1179
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(version);
16✔
1180
    mut_subs.update_state(SubscriptionSet::State::Error, err_msg);
16✔
1181
    mut_subs.commit();
8✔
1182
}
8✔
1183

870✔
1184
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
870✔
1185
{
1,720✔
1186
    REALM_ASSERT(!m_finalized);
1,720✔
1187
    m_flx_last_seen_version = version;
1,720✔
1188
    m_flx_active_version = version;
850✔
1189
}
850✔
1190

756✔
1191
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
756✔
1192
{
754✔
1193
    if (!has_flx_subscription_store()) {
754✔
1194
        return;
756✔
1195
    }
756✔
1196
    REALM_ASSERT(!m_finalized);
1,510✔
1197
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,510✔
1198
    REALM_ASSERT(new_version >= m_flx_active_version);
1,510✔
1199
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,510✔
1200

756✔
1201
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,510✔
1202

1203
    switch (batch_state) {
754✔
1204
        case DownloadBatchState::SteadyState:
2✔
1205
            // Cannot be called with this value.
736✔
1206
            REALM_UNREACHABLE();
736✔
1207
        case DownloadBatchState::LastInBatch:
734✔
1208
            if (m_flx_active_version == new_version) {
734✔
1209
                return;
736✔
1210
            }
736✔
1211
            on_flx_sync_version_complete(new_version);
1,070✔
1212
            if (new_version == 0) {
1,070✔
1213
                new_state = SubscriptionSet::State::Complete;
736✔
1214
            }
736✔
1215
            else {
798✔
1216
                new_state = SubscriptionSet::State::AwaitingMark;
798✔
1217
                m_flx_pending_mark_version = new_version;
1,134✔
1218
            }
1,134✔
1219
            break;
754✔
1220
        case DownloadBatchState::MoreToCome:
20✔
1221
            if (m_flx_last_seen_version == new_version) {
20✔
1222
                return;
20✔
1223
            }
20✔
1224

20✔
1225
            m_flx_last_seen_version = new_version;
40✔
1226
            new_state = SubscriptionSet::State::Bootstrapping;
776✔
1227
            break;
776✔
1228
    }
1,510✔
1229

756✔
1230
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(new_version);
754✔
1231
    mut_subs.update_state(new_state);
754✔
1232
    mut_subs.commit();
8,394✔
1233
}
8,394✔
1234

7,640✔
1235
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
7,640✔
1236
{
7,400✔
1237
    REALM_ASSERT(!m_finalized);
7,400✔
1238
    return m_flx_subscription_store.get();
9,480✔
1239
}
9,480✔
1240

2,080✔
1241
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
2,080✔
1242
{
2,074✔
1243
    REALM_ASSERT(!m_finalized);
2,074✔
1244
    return m_flx_pending_bootstrap_store.get();
31,930✔
1245
}
31,930✔
1246

29,856✔
1247
MigrationStore* SessionWrapper::get_migration_store()
29,856✔
1248
{
27,866✔
1249
    REALM_ASSERT(!m_finalized);
27,866✔
1250
    return m_migration_store.get();
29,430✔
1251
}
29,430✔
1252

1,564✔
1253
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1,564✔
1254
{
1,912✔
1255
    REALM_ASSERT(!m_initiated);
1,912✔
1256
    m_progress_handler = std::move(handler);
1,912✔
1257
}
1,912✔
1258

5,380✔
1259

5,380✔
1260
inline void
5,380✔
1261
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
5,380✔
1262
{
5,722✔
1263
    REALM_ASSERT(!m_initiated);
5,722✔
1264
    m_connection_state_change_listener = std::move(listener);
5,722✔
1265
}
10,478✔
1266

4,756✔
1267

4,756✔
1268
void SessionWrapper::initiate()
4,756✔
1269
{
9,856✔
1270
    REALM_ASSERT(!m_initiated);
9,856✔
1271
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
9,856✔
1272
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
5,100✔
1273
    m_initiated = true;
5,100✔
1274
    m_db->add_commit_listener(this);
5,100✔
1275
}
57,026✔
1276

51,926✔
1277

51,926✔
1278
void SessionWrapper::on_commit(version_type new_version)
51,926✔
1279
{
104,362✔
1280
    // Thread safety required
2✔
1281
    REALM_ASSERT(m_initiated);
52,438✔
1282

51,924✔
1283
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
104,360✔
1284
        return;
51,926✔
1285
    }
51,926✔
1286

1287
    util::bind_ptr<SessionWrapper> self{this};
104,358✔
1288
    m_client.post([self = std::move(self), new_version](Status status) {
52,440✔
1289
        if (status == ErrorCodes::OperationAborted)
104,364✔
1290
            return;
51,924✔
1291
        else if (!status.is_ok())
104,364✔
1292
            throw Exception(status);
51,924✔
1293

51,534✔
1294
        REALM_ASSERT(self->m_actualized);
103,974✔
1295
        if (REALM_UNLIKELY(!self->m_sess))
103,974✔
1296
            return; // Already finalized
51,994✔
1297
        SessionImpl& sess = *self->m_sess;
103,514✔
1298
        sess.recognize_sync_version(new_version); // Throws
103,904✔
1299
        bool only_if_new_uploadable_data = true;
51,980✔
1300
        self->report_progress(only_if_new_uploadable_data); // Throws
51,980✔
1301
    });
51,980✔
1302
}
52,440✔
1303

6✔
1304

6✔
1305
void SessionWrapper::cancel_reconnect_delay()
6✔
1306
{
12✔
1307
    // Thread safety required
1308
    REALM_ASSERT(m_initiated);
6✔
1309

6✔
1310
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
12✔
1311
        return;
6✔
1312
    }
6✔
1313

1314
    util::bind_ptr<SessionWrapper> self{this};
12✔
1315
    m_client.post([self = std::move(self)](Status status) {
6✔
1316
        if (status == ErrorCodes::OperationAborted)
12✔
1317
            return;
6✔
1318
        else if (!status.is_ok())
12✔
1319
            throw Exception(status);
6✔
1320

6✔
1321
        REALM_ASSERT(self->m_actualized);
12✔
1322
        if (REALM_UNLIKELY(!self->m_sess))
12✔
1323
            return; // Already finalized
6✔
1324
        SessionImpl& sess = *self->m_sess;
12✔
1325
        sess.cancel_resumption_delay(); // Throws
12✔
1326
        ClientImpl::Connection& conn = sess.get_connection();
6✔
1327
        conn.cancel_reconnect_delay(); // Throws
6✔
1328
    });                                // Throws
6✔
1329
}
2,090✔
1330

2,084✔
1331
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
2,084✔
1332
                                    WaitOperCompletionHandler handler)
2,084✔
1333
{
4,342✔
1334
    REALM_ASSERT(upload_completion || download_completion);
4,342✔
1335
    REALM_ASSERT(m_initiated);
4,342✔
1336
    REALM_ASSERT(!m_finalized);
4,342✔
1337

2,084✔
1338
    util::bind_ptr<SessionWrapper> self{this};
2,258✔
1339
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
4,342✔
1340
                   download_completion](Status status) mutable {
2,258✔
1341
        if (status == ErrorCodes::OperationAborted)
4,342✔
1342
            return;
2,084✔
1343
        else if (!status.is_ok())
4,342✔
1344
            throw Exception(status);
38✔
1345

38✔
1346
        REALM_ASSERT(self->m_actualized);
2,296✔
1347
        if (REALM_UNLIKELY(!self->m_sess)) {
2,296✔
1348
            // Already finalized
2,046✔
1349
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
1,080✔
1350
            return;
172✔
1351
        }
172✔
1352
        if (upload_completion) {
2,362✔
1353
            if (download_completion) {
2,132✔
1354
                // Wait for upload and download completion
908✔
1355
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
1,044✔
1356
            }
1,044✔
1357
            else {
2,134✔
1358
                // Wait for upload completion only
1,000✔
1359
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,088✔
1360
            }
2,088✔
1361
        }
2,224✔
1362
        else {
3,046✔
1363
            // Wait for download completion only
2,046✔
1364
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,046✔
1365
        }
3,046✔
1366
        SessionImpl& sess = *self->m_sess;
3,362✔
1367
        if (upload_completion)
4,270✔
1368
            sess.request_upload_completion_notification(); // Throws
3,308✔
1369
        if (download_completion)
2,224✔
1370
            sess.request_download_completion_notification(); // Throws
1,136✔
1371
    });                                                      // Throws
2,224✔
1372
}
8,754✔
1373

6,496✔
1374

6,496✔
1375
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
6,496✔
1376
{
12,994✔
1377
    // Thread safety required
6,496✔
1378
    REALM_ASSERT(m_initiated);
12,994✔
1379
    REALM_ASSERT(!m_finalized);
12,994✔
1380

6,496✔
1381
    std::int_fast64_t target_mark;
12,994✔
1382
    {
12,994✔
1383
        std::lock_guard lock{m_client.m_mutex};
12,994✔
1384
        target_mark = ++m_target_upload_mark;
12,996✔
1385
    }
12,996✔
1386

1387
    util::bind_ptr<SessionWrapper> self{this};
12,996✔
1388
    m_client.post([self = std::move(self), target_mark](Status status) {
6,498✔
1389
        if (status == ErrorCodes::OperationAborted)
12,996✔
1390
            return;
6,498✔
1391
        else if (!status.is_ok())
12,996✔
1392
            throw Exception(status);
6,498✔
1393

6,498✔
1394
        REALM_ASSERT(self->m_actualized);
12,996✔
1395
        // The session wrapper may already have been finalized. This can only
6,498✔
1396
        // happen if it was abandoned, but in that case, the call of
6,498✔
1397
        // wait_for_upload_complete_or_client_stopped() must have returned
6,496✔
1398
        // already.
6,496✔
1399
        if (REALM_UNLIKELY(!self->m_sess))
12,994✔
1400
            return;
6,506✔
1401
        if (target_mark > self->m_staged_upload_mark) {
12,984✔
1402
            self->m_staged_upload_mark = target_mark;
12,984✔
1403
            SessionImpl& sess = *self->m_sess;
12,984✔
1404
            sess.request_upload_completion_notification(); // Throws
12,984✔
1405
        }
12,984✔
1406
    }); // Throws
12,984✔
1407

16,416✔
1408
    bool completion_condition_was_satisfied;
16,418✔
1409
    {
12,994✔
1410
        std::unique_lock lock{m_client.m_mutex};
12,994✔
1411
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
22,816✔
1412
            m_client.m_wait_or_client_stopped_cond.wait(lock);
16,318✔
1413
        completion_condition_was_satisfied = !m_client.m_stopped;
6,498✔
1414
    }
6,498✔
1415
    return completion_condition_was_satisfied;
6,498✔
1416
}
11,546✔
1417

5,048✔
1418

5,048✔
1419
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
5,048✔
1420
{
10,092✔
1421
    // Thread safety required
5,048✔
1422
    REALM_ASSERT(m_initiated);
10,092✔
1423
    REALM_ASSERT(!m_finalized);
10,092✔
1424

5,048✔
1425
    std::int_fast64_t target_mark;
10,092✔
1426
    {
10,092✔
1427
        std::lock_guard lock{m_client.m_mutex};
10,092✔
1428
        target_mark = ++m_target_download_mark;
10,092✔
1429
    }
10,092✔
1430

1431
    util::bind_ptr<SessionWrapper> self{this};
10,092✔
1432
    m_client.post([self = std::move(self), target_mark](Status status) {
5,044✔
1433
        if (status == ErrorCodes::OperationAborted)
10,092✔
1434
            return;
5,048✔
1435
        else if (!status.is_ok())
10,092✔
1436
            throw Exception(status);
5,048✔
1437

5,048✔
1438
        REALM_ASSERT(self->m_actualized);
10,092✔
1439
        // The session wrapper may already have been finalized. This can only
5,048✔
1440
        // happen if it was abandoned, but in that case, the call of
5,048✔
1441
        // wait_for_download_complete_or_client_stopped() must have returned
5,018✔
1442
        // already.
5,016✔
1443
        if (REALM_UNLIKELY(!self->m_sess))
10,060✔
1444
            return;
5,046✔
1445
        if (target_mark > self->m_staged_download_mark) {
10,030✔
1446
            self->m_staged_download_mark = target_mark;
10,032✔
1447
            SessionImpl& sess = *self->m_sess;
10,062✔
1448
            sess.request_download_completion_notification(); // Throws
10,062✔
1449
        }
10,062✔
1450
    }); // Throws
10,062✔
1451

10,318✔
1452
    bool completion_condition_was_satisfied;
10,314✔
1453
    {
10,092✔
1454
        std::unique_lock lock{m_client.m_mutex};
10,092✔
1455
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
15,394✔
1456
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,350✔
1457
        completion_condition_was_satisfied = !m_client.m_stopped;
5,044✔
1458
    }
5,044✔
1459
    return completion_condition_was_satisfied;
5,044✔
1460
}
5,148✔
1461

104✔
1462

104✔
1463
void SessionWrapper::refresh(std::string signed_access_token)
104✔
1464
{
208✔
1465
    // Thread safety required
104✔
1466
    REALM_ASSERT(m_initiated);
208✔
1467
    REALM_ASSERT(!m_finalized);
104✔
1468

104✔
1469
    m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
104✔
1470
        if (status == ErrorCodes::OperationAborted)
208✔
1471
            return;
104✔
1472
        else if (!status.is_ok())
208✔
1473
            throw Exception(status);
104✔
1474

104✔
1475
        REALM_ASSERT(self->m_actualized);
208✔
1476
        if (REALM_UNLIKELY(!self->m_sess))
208✔
1477
            return; // Already finalized
104✔
1478
        self->m_signed_access_token = std::move(token);
208✔
1479
        SessionImpl& sess = *self->m_sess;
208✔
1480
        ClientImpl::Connection& conn = sess.get_connection();
208✔
1481
        // FIXME: This only makes sense when each session uses a separate connection.
104✔
1482
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
208✔
1483
        sess.cancel_resumption_delay();                                                          // Throws
104✔
1484
        conn.cancel_reconnect_delay();                                                           // Throws
104✔
1485
    });
104✔
1486
}
5,436✔
1487

5,332✔
1488

4,756✔
1489
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
4,756✔
1490
{
10,432✔
1491
    if (wrapper->m_initiated) {
11,008✔
1492
        ClientImpl& client = wrapper->m_client;
5,100✔
1493
        client.register_abandoned_session_wrapper(std::move(wrapper));
5,100✔
1494
    }
5,100✔
1495
}
5,676✔
1496

4,666✔
1497

4,666✔
1498
// Must be called from event loop thread
4,666✔
1499
void SessionWrapper::actualize(ServerEndpoint endpoint)
4,666✔
1500
{
9,686✔
1501
    REALM_ASSERT(!m_actualized);
9,686✔
1502
    REALM_ASSERT(!m_sess);
9,686✔
1503
    // Cannot be actualized if it's already been finalized or force closed
4,666✔
1504
    REALM_ASSERT(!m_finalized);
9,686✔
1505
    REALM_ASSERT(!m_force_closed);
9,686✔
1506
    try {
5,022✔
1507
        m_db->claim_sync_agent();
5,022✔
1508
    }
5,022✔
1509
    catch (const MultipleSyncAgents&) {
4,666✔
1510
        finalize_before_actualization();
4,666✔
1511
        throw;
4,666✔
1512
    }
4,666✔
1513
    auto sync_mode = endpoint.server_mode;
9,682✔
1514

4,664✔
1515
    bool was_created = false;
9,682✔
1516
    ClientImpl::Connection& conn = m_client.get_connection(
9,682✔
1517
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
9,682✔
1518
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
9,682✔
1519
        was_created); // Throws
9,682✔
1520
    try {
9,682✔
1521
        // FIXME: This only makes sense when each session uses a separate connection.
520✔
1522
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
5,538✔
1523
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
9,682✔
1524
        if (sync_mode == SyncServerMode::FLX) {
9,682✔
1525
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
5,182✔
1526
        }
5,182✔
1527

4,664✔
1528
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
9,682✔
1529
        m_sess = sess.get();
5,018✔
1530
        conn.activate_session(std::move(sess)); // Throws
5,018✔
1531
    }
5,018✔
1532
    catch (...) {
×
1533
        if (was_created)
×
1534
            m_client.remove_connection(conn);
×
1535

4,664✔
1536
        finalize_before_actualization();
4,664✔
1537
        throw;
4,664✔
1538
    }
1,182✔
1539

4,664✔
1540
    m_actualized = true;
9,682✔
1541
    if (was_created)
9,674✔
1542
        conn.activate(); // Throws
5,978✔
1543

3,412✔
1544
    if (m_connection_state_change_listener) {
8,430✔
1545
        ConnectionState state = conn.get_state();
8,340✔
1546
        if (state != ConnectionState::disconnected) {
8,418✔
1547
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
8,284✔
1548
            if (state == ConnectionState::connected)
8,292✔
1549
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
8,042✔
1550
        }
8,122✔
1551
    }
9,670✔
1552

1553
    if (!m_client_reset_config)
5,018✔
1554
        report_progress(); // Throws
4,920✔
1555
}
5,094✔
1556

1557
void SessionWrapper::force_close()
1558
{
150✔
1559
    if (m_force_closed || m_finalized) {
150✔
1560
        return;
76✔
1561
    }
76✔
1562
    REALM_ASSERT(m_actualized);
150✔
1563
    REALM_ASSERT(m_sess);
150✔
1564
    m_force_closed = true;
150✔
1565

76✔
1566
    ClientImpl::Connection& conn = m_sess->get_connection();
150✔
1567
    conn.initiate_session_deactivation(m_sess); // Throws
150✔
1568

76✔
1569
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
76✔
1570
    m_flx_pending_bootstrap_store.reset();
150✔
1571
    // Clear the subscription and migration store refs since they are owned by SyncSession
76✔
1572
    m_flx_subscription_store.reset();
150✔
1573
    m_migration_store.reset();
150✔
1574
    m_sess = nullptr;
74✔
1575
    // Everything is being torn down, no need to report connection state anymore
1576
    m_connection_state_change_listener = {};
74✔
1577
}
4,742✔
1578

4,668✔
1579
// Must be called from event loop thread
4,668✔
1580
void SessionWrapper::finalize()
4,668✔
1581
{
9,690✔
1582
    REALM_ASSERT(m_actualized);
5,022✔
1583

1584
    // Already finalized?
4,668✔
1585
    if (m_finalized) {
9,690✔
1586
        return;
4,668✔
1587
    }
4,668✔
1588

4,668✔
1589
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
4,668✔
1590
    m_db->remove_commit_listener(this);
9,690✔
1591

4,588✔
1592
    m_finalized = true;
9,610✔
1593

4,588✔
1594
    if (!m_force_closed) {
9,610✔
1595
        REALM_ASSERT(m_sess);
9,530✔
1596
        ClientImpl::Connection& conn = m_sess->get_connection();
9,530✔
1597
        conn.initiate_session_deactivation(m_sess); // Throws
9,530✔
1598

4,588✔
1599
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
4,588✔
1600
        m_flx_pending_bootstrap_store.reset();
9,530✔
1601
        // Clear the subscription and migration store refs since they are owned by SyncSession
4,588✔
1602
        m_flx_subscription_store.reset();
9,610✔
1603
        m_migration_store.reset();
9,610✔
1604
        m_sess = nullptr;
9,610✔
1605
    }
9,610✔
1606

4,668✔
1607
    // The Realm file can be closed now, as no access to the Realm file is
4,668✔
1608
    // supposed to happen on behalf of a session after initiation of
4,668✔
1609
    // deactivation.
4,668✔
1610
    m_db->release_sync_agent();
9,878✔
1611
    m_db = nullptr;
5,210✔
1612

188✔
1613
    // All outstanding wait operations must be canceled
188✔
1614
    while (!m_upload_completion_handlers.empty()) {
5,402✔
1615
        auto handler = std::move(m_upload_completion_handlers.back());
380✔
1616
        m_upload_completion_handlers.pop_back();
4,938✔
1617
        handler(
270✔
1618
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
270✔
1619
    }
270✔
1620
    while (!m_download_completion_handlers.empty()) {
5,178✔
1621
        auto handler = std::move(m_download_completion_handlers.back());
156✔
1622
        m_download_completion_handlers.pop_back();
4,752✔
1623
        handler(
84✔
1624
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
84✔
1625
    }
84✔
1626
    while (!m_sync_completion_handlers.empty()) {
5,034✔
1627
        auto handler = std::move(m_sync_completion_handlers.back());
4,674✔
1628
        m_sync_completion_handlers.pop_back();
6✔
1629
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
6✔
1630
    }
6✔
1631
}
5,022✔
1632

1633

1634
// Must be called only when an unactualized session wrapper becomes abandoned.
92✔
1635
//
92✔
1636
// Called with a lock on `m_client.m_mutex`.
92✔
1637
inline void SessionWrapper::finalize_before_actualization() noexcept
92✔
1638
{
174✔
1639
    REALM_ASSERT(!m_sess);
82✔
1640
    m_actualized = true;
82✔
1641
    m_force_closed = true;
82✔
1642
}
22,150✔
1643

22,068✔
1644

22,068✔
1645
inline void SessionWrapper::on_sync_progress()
22,068✔
1646
{
42,606✔
1647
    REALM_ASSERT(!m_finalized);
20,538✔
1648
    m_reliable_download_progress = true;
20,538✔
1649
    report_progress(); // Throws
20,538✔
1650
}
27,780✔
1651

7,242✔
1652

8,000✔
1653
void SessionWrapper::on_upload_completion()
758✔
1654
{
8,172✔
1655
    REALM_ASSERT(!m_finalized);
8,172✔
1656
    while (!m_upload_completion_handlers.empty()) {
9,106✔
1657
        auto handler = std::move(m_upload_completion_handlers.back());
8,270✔
1658
        m_upload_completion_handlers.pop_back();
1,028✔
1659
        handler(Status::OK()); // Throws
1,028✔
1660
    }
1,028✔
1661
    while (!m_sync_completion_handlers.empty()) {
7,600✔
1662
        auto handler = std::move(m_sync_completion_handlers.back());
7,334✔
1663
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
7,334✔
1664
        m_sync_completion_handlers.pop_back();
6,574✔
1665
    }
6,574✔
1666
    std::lock_guard lock{m_client.m_mutex};
13,896✔
1667
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,656✔
1668
        m_reached_upload_mark = m_staged_upload_mark;
6,478✔
1669
        m_client.m_wait_or_client_stopped_cond.notify_all();
6,478✔
1670
    }
6,478✔
1671
}
15,048✔
1672

8,650✔
1673

1,016✔
1674
void SessionWrapper::on_download_completion()
1,016✔
1675
{
9,584✔
1676
    while (!m_download_completion_handlers.empty()) {
10,598✔
1677
        auto handler = std::move(m_download_completion_handlers.back());
8,686✔
1678
        m_download_completion_handlers.pop_back();
1,052✔
1679
        handler(Status::OK()); // Throws
1,052✔
1680
    }
1,052✔
1681
    while (!m_sync_completion_handlers.empty()) {
8,644✔
1682
        auto handler = std::move(m_sync_completion_handlers.back());
7,672✔
1683
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
7,672✔
1684
        m_sync_completion_handlers.pop_back();
380✔
1685
    }
380✔
1686

342✔
1687
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
8,910✔
1688
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
684✔
1689
                             m_flx_pending_mark_version);
7,976✔
1690
        auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
7,976✔
1691
        mutable_subs.update_state(SubscriptionSet::State::Complete);
7,976✔
1692
        mutable_subs.commit();
5,324✔
1693
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
5,324✔
1694
    }
5,324✔
1695

7,634✔
1696
    std::lock_guard lock{m_client.m_mutex};
8,568✔
1697
    if (m_staged_download_mark > m_reached_download_mark) {
8,568✔
1698
        m_reached_download_mark = m_staged_download_mark;
4,980✔
1699
        m_client.m_wait_or_client_stopped_cond.notify_all();
5,456✔
1700
    }
5,456✔
1701
}
9,044✔
1702

476✔
1703

476✔
1704
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
476✔
1705
{
926✔
1706
    REALM_ASSERT(!m_finalized);
450✔
1707
    m_suspended = true;
450✔
1708
    if (m_connection_state_change_listener) {
450✔
1709
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
660✔
1710
    }
660✔
1711
}
660✔
1712

210✔
1713

210✔
1714
void SessionWrapper::on_resumed()
210✔
1715
{
390✔
1716
    REALM_ASSERT(!m_finalized);
390✔
1717
    m_suspended = false;
388✔
1718
    if (m_connection_state_change_listener) {
390✔
1719
        ClientImpl::Connection& conn = m_sess->get_connection();
390✔
1720
        if (conn.get_state() != ConnectionState::disconnected) {
390✔
1721
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
180✔
1722
            if (conn.get_state() == ConnectionState::connected)
180✔
1723
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
178✔
1724
        }
180✔
1725
    }
5,502✔
1726
}
5,502✔
1727

5,316✔
1728

5,312✔
1729
void SessionWrapper::on_connection_state_changed(ConnectionState state,
5,316✔
1730
                                                 const util::Optional<SessionErrorInfo>& error_info)
5,322✔
1731
{
5,284✔
1732
    if (m_connection_state_change_listener) {
5,284✔
1733
        if (!m_suspended)
5,276✔
1734
            m_connection_state_change_listener(state, error_info); // Throws
83,368✔
1735
    }
83,372✔
1736
}
83,380✔
1737

78,096✔
1738

78,096✔
1739
void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
57,554✔
1740
{
97,904✔
1741
    REALM_ASSERT(!m_finalized);
97,904✔
1742
    REALM_ASSERT(m_sess);
97,904✔
1743

20,542✔
1744
    if (!m_progress_handler)
97,904✔
1745
        return;
73,324✔
1746

20,542✔
1747
    std::uint_fast64_t downloaded_bytes = 0;
45,122✔
1748
    std::uint_fast64_t downloadable_bytes = 0;
45,122✔
1749
    std::uint_fast64_t uploaded_bytes = 0;
45,122✔
1750
    std::uint_fast64_t uploadable_bytes = 0;
45,122✔
1751
    std::uint_fast64_t snapshot_version = 0;
45,122✔
1752
    ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
45,122✔
1753
                                             uploadable_bytes, snapshot_version);
39,388✔
1754

5,734✔
1755
    // If this progress notification was triggered by a commit being made we
5,734✔
1756
    // only want to send it if the uploadable bytes has actually increased,
5,734✔
1757
    // and not if it was an empty commit.
5,734✔
1758
    if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes)
30,314✔
1759
        return;
22,474✔
1760
    m_last_reported_uploadable_bytes = uploadable_bytes;
13,574✔
1761

5,734✔
1762
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
5,734✔
1763
    // is only the remaining to download. This is confusing, so make them use
5,734✔
1764
    // the same units.
5,734✔
1765
    std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
13,574✔
1766

5,734✔
1767
    m_sess->logger.debug("Progress handler called, downloaded = %1, "
13,574✔
1768
                         "downloadable(total) = %2, uploaded = %3, "
13,574✔
1769
                         "uploadable = %4, reliable_download_progress = %5, "
13,574✔
1770
                         "snapshot version = %6",
13,574✔
1771
                         downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
13,574✔
1772
                         m_reliable_download_progress, snapshot_version);
13,574✔
1773

5,734✔
1774
    // FIXME: Why is this boolean status communicated to the application as
1775
    // a 64-bit integer? Also, the name `progress_version` is confusing.
1776
    std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
4,320✔
1777
    m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
7,866✔
1778
                       snapshot_version);
7,840✔
1779
}
7,840✔
1780

26✔
1781
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
26✔
1782
{
52✔
1783
    if (!m_sess) {
26✔
1784
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
1785
    }
144✔
1786

144✔
1787
    return m_sess->send_test_command(std::move(body));
170✔
1788
}
170✔
1789

144✔
1790
void SessionWrapper::handle_pending_client_reset_acknowledgement()
144✔
1791
{
288✔
1792
    REALM_ASSERT(!m_finalized);
288✔
1793

144✔
1794
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
216✔
1795
    REALM_ASSERT(pending_reset);
216✔
1796
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
216✔
1797
                        pending_reset->time);
216✔
1798
    async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
144✔
1799
        if (status == ErrorCodes::OperationAborted) {
144✔
1800
            return;
74✔
1801
        }
146✔
1802
        auto& logger = self->m_sess->logger;
142✔
1803
        if (!status.is_ok()) {
142✔
1804
            logger.error("Error while tracking client reset acknowledgement: %1", status);
72✔
1805
            return;
×
1806
        }
×
1807

1808
        auto wt = self->m_db->start_write();
70✔
1809
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
70✔
1810
        if (!cur_pending_reset) {
142✔
1811
            logger.debug(
×
1812
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1813
                pending_reset.type, pending_reset.time);
×
1814
            return;
×
1815
        }
72✔
1816
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
142✔
1817
            logger.debug(
72✔
1818
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
72✔
1819
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
72✔
1820
        }
72✔
1821
        else {
142✔
1822
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
142✔
1823
                         "Removing cycle detection tracker.",
214✔
1824
                         pending_reset.type, pending_reset.time);
70✔
1825
        }
70✔
1826
        _impl::client_reset::remove_pending_client_resets(*wt);
5,536✔
1827
        wt->commit();
5,536✔
1828
    });
4,966✔
1829
}
714✔
1830

570✔
1831
void SessionWrapper::update_subscription_version_info()
570✔
1832
{
6,378✔
1833
    if (!m_flx_subscription_store)
5,808✔
1834
        return;
5,240✔
1835
    auto versions_info = m_flx_subscription_store->get_version_info();
604✔
1836
    m_flx_active_version = versions_info.active;
604✔
1837
    m_flx_pending_mark_version = versions_info.pending_mark;
604✔
1838
}
604✔
1839

36✔
1840
std::string SessionWrapper::get_appservices_connection_id()
36✔
1841
{
72✔
1842
    auto pf = util::make_promise_future<std::string>();
36✔
1843
    REALM_ASSERT(m_initiated);
36✔
1844

1845
    util::bind_ptr<SessionWrapper> self(this);
72✔
1846
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
1847
        if (!status.is_ok()) {
36✔
1848
            promise.set_error(status);
×
1849
            return;
×
1850
        }
36✔
1851

36✔
1852
        if (!self->m_sess) {
72✔
1853
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
36✔
1854
            return;
36✔
1855
        }
36✔
1856

1857
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
36✔
1858
    });
36✔
1859

1860
    return pf.future.get();
36✔
1861
}
36✔
1862

1863
// ################ ClientImpl::Connection ################
1864

1865
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1866
                                   const std::string& authorization_header_name,
1867
                                   const std::map<std::string, std::string>& custom_http_headers,
1868
                                   bool verify_servers_ssl_certificate,
1869
                                   Optional<std::string> ssl_trust_certificate_path,
1870
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1871
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1872
    : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws
1873
    , logger{*logger_ptr}
1874
    , m_client{client}
1875
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1876
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1877
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1878
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1879
    , m_reconnect_info{reconnect_info}
1,182✔
1880
    , m_session_history{}
1,182✔
1881
    , m_ident{ident}
1,180✔
1882
    , m_server_endpoint{std::move(endpoint)}
1883
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,180✔
1884
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1885
{
2,500✔
1886
    m_on_idle = m_client.create_trigger([this](Status status) {
2,502✔
1887
        if (status == ErrorCodes::OperationAborted)
2,502✔
1888
            return;
1,180✔
1889
        else if (!status.is_ok())
2,502✔
1890
            throw Exception(status);
1,180✔
1891

1,180✔
1892
        REALM_ASSERT(m_activated);
2,504✔
1893
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
1,324✔
1894
            on_idle(); // Throws
1,322✔
1895
            // Connection object may be destroyed now.
6✔
1896
        }
1,328✔
1897
    });
1,328✔
1898
}
1,320✔
1899

1900
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1901
{
1,188✔
1902
    return m_ident;
1,188✔
1903
}
1,188✔
1904

1905

1906
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1907
{
6,090✔
1908
    return m_server_endpoint;
6,090✔
1909
}
6,090✔
1910

4,768✔
1911
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1912
                                                        const std::string& signed_access_token)
1913
{
5,120✔
1914
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
6,028✔
1915
    m_signed_access_token = signed_access_token;           // Throws (copy)
6,934✔
1916
}
6,934✔
1917

1,814✔
1918

908✔
1919
void ClientImpl::Connection::resume_active_sessions()
908✔
1920
{
808✔
1921
    auto handler = [=](ClientImpl::Session& sess) {
1,614✔
1922
        sess.cancel_resumption_delay(); // Throws
2,794✔
1923
    };
2,794✔
1924
    for_each_active_session(std::move(handler)); // Throws
1,988✔
1925
}
1,988✔
1926

1,180✔
1927
void ClientImpl::Connection::on_idle()
1,180✔
1928
{
1,322✔
1929
    logger.debug("Destroying connection object");
1,322✔
1930
    ClientImpl& client = get_client();
1,322✔
1931
    client.remove_connection(*this);
2,950✔
1932
    // NOTE: This connection object is now destroyed!
1,628✔
1933
}
2,950✔
1934

1,628✔
1935

1,628✔
1936
std::string ClientImpl::Connection::get_http_request_path() const
1,628✔
1937
{
3,344✔
1938
    using namespace std::string_view_literals;
3,344✔
1939
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,344✔
1940

1,628✔
1941
    std::string path;
3,344✔
1942
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,344✔
1943
    path += m_http_request_path_prefix;
1,716✔
1944
    path += param;
1,716✔
1945
    path += m_signed_access_token;
1,716✔
1946

1,182✔
1947
    return path;
2,898✔
1948
}
2,898✔
1949

1,182✔
1950

1,182✔
1951
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1,182✔
1952
{
1,322✔
1953
    std::ostringstream out;
1,322✔
1954
    out.imbue(std::locale::classic());
1,322✔
1955
    out << "Connection[" << ident << "]: "; // Throws
1,322✔
1956
    return out.str();                       // Throws
6,164✔
1957
}
6,164✔
1958

1,036✔
1959

1,036✔
1960
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
5,284✔
1961
                                                            util::Optional<SessionErrorInfo> error_info)
5,284✔
1962
{
10,398✔
1963
    if (m_force_closed) {
10,398✔
1964
        return;
4,978✔
1965
    }
4,978✔
1966
    auto handler = [=](ClientImpl::Session& sess) {
5,246✔
1967
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
5,246✔
1968
        sess_2.on_connection_state_changed(state, error_info); // Throws
5,246✔
1969
    };
5,246✔
1970
    for_each_active_session(std::move(handler)); // Throws
8,368✔
1971
}
8,368✔
1972

1973

1974
Client::Client(Config config)
1975
    : m_impl{new ClientImpl{std::move(config)}} // Throws
1976
{
4,560✔
1977
}
4,560✔
1978

1979

1980
Client::Client(Client&& client) noexcept
4,426✔
1981
    : m_impl{std::move(client.m_impl)}
1982
{
1983
}
1984

4,466✔
1985

4,466✔
1986
Client::~Client() noexcept {}
9,026✔
1987

1988

1989
void Client::shutdown() noexcept
382✔
1990
{
4,980✔
1991
    m_impl->shutdown();
4,980✔
1992
}
4,598✔
1993

1994
void Client::shutdown_and_wait()
910✔
1995
{
1,288✔
1996
    m_impl->shutdown_and_wait();
1,288✔
1997
}
378✔
1998

1999
void Client::cancel_reconnect_delay()
6✔
2000
{
816✔
2001
    m_impl->cancel_reconnect_delay();
816✔
2002
}
810✔
2003

2004
void Client::voluntary_disconnect_all_connections()
34✔
2005
{
40✔
2006
    m_impl->voluntary_disconnect_all_connections();
40✔
2007
}
6✔
2008

2009
bool Client::wait_for_session_terminations_or_client_stopped()
2010
{
334✔
2011
    return m_impl.get()->wait_for_session_terminations_or_client_stopped();
1,860✔
2012
}
1,860✔
2013

1,526✔
2014

2015
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2016
                                  port_type& port, std::string& path) const
2017
{
1,874✔
2018
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
7,206✔
2019
}
7,206✔
2020

5,332✔
2021

5,332✔
2022
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
5,332✔
2023
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
5,332✔
2024
{
11,008✔
2025
    util::bind_ptr<SessionWrapper> sess;
11,008✔
2026
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
11,008✔
2027
                                  std::move(config)}); // Throws
11,008✔
2028
    // The reference count passed back to the application is implicitly
2029
    // owned by a naked pointer. This is done to avoid exposing
2030
    // implementation details through the header file (that is, through the
2031
    // Session object).
1,564✔
2032
    m_impl = sess.release();
7,240✔
2033
}
7,240✔
2034

2035

2036
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2037
{
7,292✔
2038
    m_impl->set_progress_handler(std::move(handler)); // Throws
7,292✔
2039
}
7,292✔
2040

2041

2042
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2043
{
10,478✔
2044
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
10,478✔
2045
}
10,478✔
2046

2047

2048
void Session::bind()
2049
{
12,442✔
2050
    m_impl->initiate(); // Throws
12,442✔
2051
}
12,442✔
2052

2053

2054
void Session::nonsync_transact_notify(version_type new_version)
2055
{
7,762✔
2056
    m_impl->on_commit(new_version); // Throws
7,762✔
2057
}
7,762✔
2058

2059

2060
void Session::cancel_reconnect_delay()
2061
{
1,946✔
2062
    m_impl->cancel_reconnect_delay(); // Throws
1,946✔
2063
}
1,946✔
2064

2065

2066
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2067
{
8,610✔
2068
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
8,610✔
2069
}
8,610✔
2070

2071

2072
bool Session::wait_for_upload_complete_or_client_stopped()
2073
{
11,546✔
2074
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
11,546✔
2075
}
11,546✔
2076

2077

2078
bool Session::wait_for_download_complete_or_client_stopped()
2079
{
5,148✔
2080
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
5,148✔
2081
}
5,148✔
2082

2083

2084
void Session::refresh(const std::string& signed_access_token)
2085
{
5,436✔
2086
    m_impl->refresh(signed_access_token); // Throws
5,436✔
2087
}
5,436✔
2088

5,332✔
2089

5,332✔
2090
void Session::abandon() noexcept
5,332✔
2091
{
11,008✔
2092
    REALM_ASSERT(m_impl);
5,676✔
2093
    // Reabsorb the ownership assigned to the applications naked pointer by
2094
    // Session constructor
26✔
2095
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
5,702✔
2096
    SessionWrapper::abandon(std::move(wrapper));
5,702✔
2097
}
5,676✔
2098

2099
util::Future<std::string> Session::send_test_command(std::string body)
36✔
2100
{
62✔
2101
    return m_impl->send_test_command(std::move(body));
62✔
2102
}
26✔
2103

2104
std::string Session::get_appservices_connection_id()
2105
{
36✔
2106
    return m_impl->get_appservices_connection_id();
36✔
2107
}
36✔
2108

2109
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2110
{
×
2111
    switch (proxyType) {
×
2112
        case ProxyConfig::Type::HTTP:
×
2113
            return os << "HTTP";
2114
        case ProxyConfig::Type::HTTPS:
×
2115
            return os << "HTTPS";
2116
    }
2117
    REALM_TERMINATE("Invalid Proxy Type object.");
2118
}
2119

2120
} // namespace sync
2121
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc