• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / nicola.cabiddu_1042

27 Sep 2023 06:04PM UTC coverage: 91.085% (-1.8%) from 92.915%
nicola.cabiddu_1042

Pull #6766

Evergreen

nicola-cab
Fix logic for dictionaries
Pull Request #6766: Client Reset for collections in mixed / nested collections

97276 of 178892 branches covered (0.0%)

1994 of 2029 new or added lines in 7 files covered. (98.28%)

4556 existing lines in 112 files now uncovered.

237059 of 260260 relevant lines covered (91.09%)

6321099.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.48
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactReporter            = ClientHistory::SyncTransactReporter;
34
using SyncTransactCallback            = Session::SyncTransactCallback;
35
using ProgressHandler                 = Session::ProgressHandler;
36
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
37
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
38
using port_type                       = Session::port_type;
39
using connection_ident_type           = std::int_fast64_t;
40
using ProxyConfig                     = SyncConfig::ProxyConfig;
41
// clang-format on
42

43
} // unnamed namespace
44

45

46
// Life cycle states of a session wrapper:
47
//
48
//  - Uninitiated
49
//  - Unactualized
50
//  - Actualized
51
//  - Finalized
52
//
53
// The session wrapper moves from the Uninitiated to the Unactualized state when
54
// it is initiated, i.e., when initiate() is called. This may happen on any
55
// thread.
56
//
57
// The session wrapper moves from the Unactualized to the Actualized state when
58
// it is associated with a session object, i.e., when `m_sess` is made to refer
59
// to an object of type SessionImpl. This always happens on the event loop
60
// thread.
61
//
62
// The session wrapper moves from the Actualized to the Finalized state when it
63
// is dissociated from the session object. This happens in response to the
64
// session wrapper having been abandoned by the application. This always happens
65
// on the event loop thread.
66
//
67
// The session wrapper will exist in the Finalized state only while referenced
68
// from a post handler waiting to be executed.
69
//
70
// If the session wrapper is abandoned by the application while in the
71
// Uninitiated state, it will be destroyed immediately, since no post handlers
72
// can have been scheduled prior to initiation.
73
//
74
// If the session wrapper is abandoned while in the Unactivated state, it will
75
// move immediately to the Finalized state. This may happen on any thread.
76
//
77
// The moving of a session wrapper to, or from the Actualized state always
78
// happen on the event loop thread. All other state transitions may happen on
79
// any thread.
80
//
81
// NOTE: Activation of the session happens no later than during actualization,
82
// and initiation of deactivation happens no earlier than during
83
// finalization. See also activate_session() and initiate_session_deactivation()
84
// in ClientImpl::Connection.
85
class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransactReporter {
86
public:
87
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
88
                   Session::Config);
89
    ~SessionWrapper() noexcept;
90

91
    ClientReplication& get_replication() noexcept;
92
    ClientImpl& get_client() noexcept;
93

94
    bool has_flx_subscription_store() const;
95
    SubscriptionStore* get_flx_subscription_store();
96
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
97

98
    MigrationStore* get_migration_store();
99

100
    void set_sync_transact_handler(util::UniqueFunction<SyncTransactCallback>);
101
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
102
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
103

104
    void initiate();
105

106
    void force_close();
107

108
    void nonsync_transact_notify(version_type new_version);
109
    void cancel_reconnect_delay();
110

111
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
112
    bool wait_for_upload_complete_or_client_stopped();
113
    bool wait_for_download_complete_or_client_stopped();
114

115
    void refresh(std::string signed_access_token);
116

117
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
118

119
    // These are called from ClientImpl
120
    void actualize(ServerEndpoint);
121
    void finalize();
122
    void finalize_before_actualization() noexcept;
123

124
    // Overriding member function in SyncTransactReporter
125
    void report_sync_transact(VersionID, VersionID) override;
126

127
    void on_new_flx_subscription_set(int64_t new_version);
128

129
    util::Future<std::string> send_test_command(std::string body);
130

131
    void handle_pending_client_reset_acknowledgement();
132

133
    std::string get_appservices_connection_id();
134

135
private:
136
    ClientImpl& m_client;
137
    DBRef m_db;
138
    Replication* m_replication;
139

140
    const ProtocolEnvelope m_protocol_envelope;
141
    const std::string m_server_address;
142
    const port_type m_server_port;
143
    const std::string m_user_id;
144
    const SyncServerMode m_sync_mode;
145
    const std::string m_authorization_header_name;
146
    const std::map<std::string, std::string> m_custom_http_headers;
147
    const bool m_verify_servers_ssl_certificate;
148
    const bool m_simulate_integration_error;
149
    const Optional<std::string> m_ssl_trust_certificate_path;
150
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
151
    const size_t m_flx_bootstrap_batch_size_bytes;
152

153
    // This one is different from null when, and only when the session wrapper
154
    // is in ClientImpl::m_abandoned_session_wrappers.
155
    SessionWrapper* m_next = nullptr;
156

157
    // After initiation, these may only be accessed by the event loop thread.
158
    std::string m_http_request_path_prefix;
159
    std::string m_virt_path;
160
    std::string m_signed_access_token;
161

162
    util::Optional<ClientReset> m_client_reset_config;
163

164
    util::Optional<ProxyConfig> m_proxy_config;
165

166
    util::UniqueFunction<SyncTransactCallback> m_sync_transact_handler;
167
    util::UniqueFunction<ProgressHandler> m_progress_handler;
168
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
169

170
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
171
    bool m_in_debug_hook = false;
172

173
    SessionReason m_session_reason;
174

175
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
176
    int64_t m_flx_active_version = 0;
177
    int64_t m_flx_last_seen_version = 0;
178
    int64_t m_flx_latest_version = 0;
179
    int64_t m_flx_pending_mark_version = 0;
180
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
181

182
    std::shared_ptr<MigrationStore> m_migration_store;
183

184
    bool m_initiated = false;
185

186
    // Set to true when this session wrapper is actualized (or when it is
187
    // finalized before proper actualization). It is then never modified again.
188
    //
189
    // A session specific post handler submitted after the initiation of the
190
    // session wrapper (initiate()) will always find that `m_actualized` is
191
    // true. This is the case, because the scheduling of such a post handler
192
    // will have been preceded by the triggering of
193
    // `ClientImpl::m_actualize_and_finalize` (in
194
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
195
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
196
    // before the post handler. If the session wrapper is no longer in
197
    // `ClientImpl::m_unactualized_session_wrappers` when
198
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
199
    // have been abandoned already, but in that case,
200
    // finalize_before_actualization() has already been called.
201
    bool m_actualized = false;
202

203
    bool m_force_closed = false;
204

205
    bool m_suspended = false;
206

207
    // Has the SessionWrapper been finalized?
208
    bool m_finalized = false;
209

210
    // Set to true when the first DOWNLOAD message is received to indicate that
211
    // the byte-level download progress parameters can be considered reasonable
212
    // reliable. Before that, a lot of time may have passed, so our record of
213
    // the download progress is likely completely out of date.
214
    bool m_reliable_download_progress = false;
215

216
    // Set to point to an activated session object during actualization of the
217
    // session wrapper. Set to null during finalization of the session
218
    // wrapper. Both modifications are guaranteed to be performed by the event
219
    // loop thread.
220
    //
221
    // If a session specific post handler, that is submitted after the
222
    // initiation of the session wrapper, sees that `m_sess` is null, it can
223
    // conclude that the session wrapper has been both abandoned and
224
    // finalized. This is true, because the scheduling of such a post handler
225
    // will have been preceded by the triggering of
226
    // `ClientImpl::m_actualize_and_finalize` (in
227
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
228
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
229
    // before the post handler, so the session wrapper must have been actualized
230
    // unless it was already abandoned by the application. If it was abandoned
231
    // before it was actualized, it will already have been finalized by
232
    // finalize_before_actualization().
233
    //
234
    // Must only be accessed from the event loop thread.
235
    SessionImpl* m_sess = nullptr;
236

237
    // These must only be accessed from the event loop thread.
238
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
239
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
240
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
241

242
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
243
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
244
    // event loop thread.
245
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
246
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
247
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
248

249
    void on_sync_progress();
250
    void on_upload_completion();
251
    void on_download_completion();
252
    void on_suspended(const SessionErrorInfo& error_info);
253
    void on_resumed();
254
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
255
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
256
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
257
    void on_flx_sync_version_complete(int64_t version);
258

259
    void report_progress();
260

261
    friend class SessionWrapperStack;
262
    friend class ClientImpl::Session;
263
};
264

265

266
// ################ SessionWrapperStack ################
267

268
inline bool SessionWrapperStack::empty() const noexcept
269
{
×
270
    return !m_back;
×
271
}
×
272

273

274
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
275
{
9,614✔
276
    REALM_ASSERT(!w->m_next);
9,614✔
277
    w->m_next = m_back;
9,614✔
278
    m_back = w.release();
9,614✔
279
}
9,614✔
280

281

282
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
283
{
23,712✔
284
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
23,712✔
285
    if (m_back) {
23,712✔
286
        m_back = m_back->m_next;
9,614✔
287
        w->m_next = nullptr;
9,614✔
288
    }
9,614✔
289
    return w;
23,712✔
290
}
23,712✔
291

292

293
inline void SessionWrapperStack::clear() noexcept
294
{
23,218✔
295
    while (m_back) {
23,218✔
296
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
297
        m_back = w->m_next;
×
298
    }
×
299
}
23,218✔
300

301

302
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
303
    : m_back{q.m_back}
304
{
305
    q.m_back = nullptr;
306
}
307

308

309
SessionWrapperStack::~SessionWrapperStack()
310
{
23,218✔
311
    clear();
23,218✔
312
}
23,218✔
313

314

315
// ################ ClientImpl ################
316

317

318
ClientImpl::~ClientImpl()
319
{
9,122✔
320
    // Since no other thread is allowed to be accessing this client or any of
4,492✔
321
    // its subobjects at this time, no mutex locking is necessary.
4,492✔
322

4,492✔
323
    shutdown_and_wait();
9,122✔
324
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,492✔
325
    // are abandoned.
4,492✔
326
    REALM_ASSERT(m_stopped);
9,122✔
327
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,122✔
328
}
9,122✔
329

330

331
void ClientImpl::cancel_reconnect_delay()
332
{
1,852✔
333
    // Thread safety required
1,062✔
334
    post([this](Status status) {
1,852✔
335
        if (status == ErrorCodes::OperationAborted)
1,852✔
336
            return;
×
337
        else if (!status.is_ok())
1,852✔
338
            throw Exception(status);
×
339

1,062✔
340
        for (auto& p : m_server_slots) {
1,852✔
341
            ServerSlot& slot = p.second;
1,852✔
342
            if (m_one_connection_per_session) {
1,852✔
343
                REALM_ASSERT(!slot.connection);
×
344
                for (const auto& p : slot.alt_connections) {
×
345
                    ClientImpl::Connection& conn = *p.second;
×
346
                    conn.resume_active_sessions(); // Throws
×
347
                    conn.cancel_reconnect_delay(); // Throws
×
348
                }
×
349
            }
×
350
            else {
1,852✔
351
                REALM_ASSERT(slot.alt_connections.empty());
1,852✔
352
                if (slot.connection) {
1,852✔
353
                    ClientImpl::Connection& conn = *slot.connection;
1,848✔
354
                    conn.resume_active_sessions(); // Throws
1,848✔
355
                    conn.cancel_reconnect_delay(); // Throws
1,848✔
356
                }
1,848✔
357
                else {
4✔
358
                    slot.reconnect_info.reset();
4✔
359
                }
4✔
360
            }
1,852✔
361
        }
1,852✔
362
    }); // Throws
1,852✔
363
}
1,852✔
364

365

366
void ClientImpl::voluntary_disconnect_all_connections()
367
{
12✔
368
    auto done_pf = util::make_promise_future<void>();
12✔
369
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
370
        if (status == ErrorCodes::OperationAborted) {
12✔
371
            return;
×
372
        }
×
373

6✔
374
        REALM_ASSERT(status.is_ok());
12✔
375

6✔
376
        try {
12✔
377
            for (auto& p : m_server_slots) {
12✔
378
                ServerSlot& slot = p.second;
12✔
379
                if (m_one_connection_per_session) {
12✔
380
                    REALM_ASSERT(!slot.connection);
×
381
                    for (const auto& p : slot.alt_connections) {
×
382
                        ClientImpl::Connection& conn = *p.second;
×
383
                        if (conn.get_state() == ConnectionState::disconnected) {
×
384
                            continue;
×
385
                        }
×
386
                        conn.voluntary_disconnect();
×
387
                    }
×
388
                }
×
389
                else {
12✔
390
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
391
                    if (!slot.connection) {
12✔
392
                        continue;
×
393
                    }
×
394
                    ClientImpl::Connection& conn = *slot.connection;
12✔
395
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
396
                        continue;
×
397
                    }
×
398
                    conn.voluntary_disconnect();
12✔
399
                }
12✔
400
            }
12✔
401
        }
12✔
402
        catch (...) {
6✔
403
            promise.set_error(exception_to_status());
×
404
            return;
×
405
        }
×
406
        promise.emplace_value();
12✔
407
    });
12✔
408
    done_pf.future.get();
12✔
409
}
12✔
410

411

412
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
413
{
368✔
414
    // Thread safety required
34✔
415

34✔
416
    {
368✔
417
        std::lock_guard lock{m_mutex};
368✔
418
        m_sessions_terminated = false;
368✔
419
    }
368✔
420

34✔
421
    // The technique employed here relies on the fact that
34✔
422
    // actualize_and_finalize_session_wrappers() must get to execute at least
34✔
423
    // once before the post handler submitted below gets to execute, but still
34✔
424
    // at a time where all session wrappers, that are abandoned prior to the
34✔
425
    // execution of wait_for_session_terminations_or_client_stopped(), have been
34✔
426
    // added to `m_abandoned_session_wrappers`.
34✔
427
    //
34✔
428
    // To see that this is the case, consider a session wrapper that was
34✔
429
    // abandoned before wait_for_session_terminations_or_client_stopped() was
34✔
430
    // invoked. Then the session wrapper will have been added to
34✔
431
    // `m_abandoned_session_wrappers`, and an invocation of
34✔
432
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
34✔
433
    // guarantees mentioned in the documentation of Trigger then ensure
34✔
434
    // that at least one execution of actualize_and_finalize_session_wrappers()
34✔
435
    // will happen after the session wrapper has been added to
34✔
436
    // `m_abandoned_session_wrappers`, but before the post handler submitted
34✔
437
    // below gets to execute.
34✔
438
    post([this](Status status) mutable {
368✔
439
        if (status == ErrorCodes::OperationAborted)
368✔
440
            return;
×
441
        else if (!status.is_ok())
368✔
442
            throw Exception(status);
×
443

34✔
444
        std::lock_guard lock{m_mutex};
368✔
445
        m_sessions_terminated = true;
368✔
446
        m_wait_or_client_stopped_cond.notify_all();
368✔
447
    }); // Throws
368✔
448

34✔
449
    bool completion_condition_was_satisfied;
368✔
450
    {
368✔
451
        std::unique_lock lock{m_mutex};
368✔
452
        while (!m_sessions_terminated && !m_stopped)
706✔
453
            m_wait_or_client_stopped_cond.wait(lock);
338✔
454
        completion_condition_was_satisfied = !m_stopped;
368✔
455
    }
368✔
456
    return completion_condition_was_satisfied;
368✔
457
}
368✔
458

459

460
void ClientImpl::drain_connections_on_loop()
461
{
9,122✔
462
    post([this](Status status) mutable {
9,122✔
463
        REALM_ASSERT(status.is_ok());
9,122✔
464
        drain_connections();
9,122✔
465
    });
9,122✔
466
}
9,122✔
467

468
void ClientImpl::shutdown_and_wait()
469
{
9,882✔
470
    shutdown();
9,882✔
471
    std::unique_lock lock{m_drain_mutex};
9,882✔
472
    if (m_drained) {
9,882✔
473
        return;
756✔
474
    }
756✔
475

4,494✔
476
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,126✔
477
    m_drain_cv.wait(lock, [&] {
19,688✔
478
        return m_num_connections == 0 && m_outstanding_posts == 0;
19,688✔
479
    });
19,688✔
480

4,494✔
481
    m_drained = true;
9,126✔
482
}
9,126✔
483

484
void ClientImpl::shutdown() noexcept
485
{
19,082✔
486
    {
19,082✔
487
        std::lock_guard lock{m_mutex};
19,082✔
488
        if (m_stopped)
19,082✔
489
            return;
9,960✔
490
        m_stopped = true;
9,122✔
491
        m_wait_or_client_stopped_cond.notify_all();
9,122✔
492
    }
9,122✔
493

4,492✔
494
    drain_connections_on_loop();
9,122✔
495
}
9,122✔
496

497

498
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
499
{
9,782✔
500
    // Thread safety required.
4,718✔
501

4,718✔
502
    std::lock_guard lock{m_mutex};
9,782✔
503
    REALM_ASSERT(m_actualize_and_finalize);
9,782✔
504
    m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
9,782✔
505
    bool retrigger = !m_actualize_and_finalize_needed;
9,782✔
506
    m_actualize_and_finalize_needed = true;
9,782✔
507
    // The conditional triggering needs to happen before releasing the mutex,
4,718✔
508
    // because if two threads call register_unactualized_session_wrapper()
4,718✔
509
    // roughly concurrently, then only the first one is guaranteed to be asked
4,718✔
510
    // to retrigger, but that retriggering must have happened before the other
4,718✔
511
    // thread returns from register_unactualized_session_wrapper().
4,718✔
512
    //
4,718✔
513
    // Note that a similar argument applies when two threads call
4,718✔
514
    // register_abandoned_session_wrapper(), and when one thread calls one of
4,718✔
515
    // them and another thread call the other.
4,718✔
516
    if (retrigger)
9,782✔
517
        m_actualize_and_finalize->trigger();
5,226✔
518
}
9,782✔
519

520

521
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
522
{
9,782✔
523
    // Thread safety required.
4,718✔
524

4,718✔
525
    std::lock_guard lock{m_mutex};
9,782✔
526
    REALM_ASSERT(m_actualize_and_finalize);
9,782✔
527

4,718✔
528
    // If the session wrapper has not yet been actualized (on the event loop
4,718✔
529
    // thread), it can be immediately finalized. This ensures that we will
4,718✔
530
    // generally not actualize a session wrapper that has already been
4,718✔
531
    // abandoned.
4,718✔
532
    auto i = m_unactualized_session_wrappers.find(wrapper.get());
9,782✔
533
    if (i != m_unactualized_session_wrappers.end()) {
9,782✔
534
        m_unactualized_session_wrappers.erase(i);
168✔
535
        wrapper->finalize_before_actualization();
168✔
536
        return;
168✔
537
    }
168✔
538
    m_abandoned_session_wrappers.push(std::move(wrapper));
9,614✔
539
    bool retrigger = !m_actualize_and_finalize_needed;
9,614✔
540
    m_actualize_and_finalize_needed = true;
9,614✔
541
    // The conditional triggering needs to happen before releasing the
4,630✔
542
    // mutex. See implementation of register_unactualized_session_wrapper() for
4,630✔
543
    // details.
4,630✔
544
    if (retrigger)
9,614✔
545
        m_actualize_and_finalize->trigger();
8,874✔
546
}
9,614✔
547

548

549
// Must be called from the event loop thread.
550
void ClientImpl::actualize_and_finalize_session_wrappers()
551
{
14,100✔
552
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
14,100✔
553
    SessionWrapperStack abandoned_session_wrappers;
14,100✔
554
    bool stopped;
14,100✔
555
    {
14,100✔
556
        std::lock_guard lock{m_mutex};
14,100✔
557
        m_actualize_and_finalize_needed = false;
14,100✔
558
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
14,100✔
559
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
14,100✔
560
        stopped = m_stopped;
14,100✔
561
    }
14,100✔
562
    // Note, we need to finalize old session wrappers before we actualize new
6,822✔
563
    // ones. This ensures that deactivation of old sessions is initiated before
6,822✔
564
    // new session are activated. This, in turn, ensures that the server does
6,822✔
565
    // not see two overlapping sessions for the same local Realm file.
6,822✔
566
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
23,714✔
567
        wrapper->finalize(); // Throws
9,614✔
568
    if (stopped) {
14,100✔
569
        for (auto& p : unactualized_session_wrappers) {
396✔
570
            SessionWrapper& wrapper = *p.first;
4✔
571
            wrapper.finalize_before_actualization();
4✔
572
        }
4✔
573
        return;
744✔
574
    }
744✔
575
    for (auto& p : unactualized_session_wrappers) {
13,356✔
576
        SessionWrapper& wrapper = *p.first;
9,610✔
577
        ServerEndpoint server_endpoint = std::move(p.second);
9,610✔
578
        wrapper.actualize(std::move(server_endpoint)); // Throws
9,610✔
579
    }
9,610✔
580
}
13,356✔
581

582

583
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
584
                                                   const std::string& authorization_header_name,
585
                                                   const std::map<std::string, std::string>& custom_http_headers,
586
                                                   bool verify_servers_ssl_certificate,
587
                                                   Optional<std::string> ssl_trust_certificate_path,
588
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
589
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
590
{
9,606✔
591
    auto&& [server_slot_it, inserted] =
9,606✔
592
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
9,606✔
593
    ServerSlot& server_slot = server_slot_it->second; // Throws
9,606✔
594

4,626✔
595
    // TODO: enable multiplexing with proxies
4,626✔
596
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
9,606✔
597
        // Use preexisting connection
3,472✔
598
        REALM_ASSERT(server_slot.alt_connections.empty());
7,158✔
599
        return *server_slot.connection;
7,158✔
600
    }
7,158✔
601

1,154✔
602
    // Create a new connection
1,154✔
603
    REALM_ASSERT(!server_slot.connection);
2,448✔
604
    connection_ident_type ident = m_prev_connection_ident + 1;
2,448✔
605
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,448✔
606
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,448✔
607
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,448✔
608
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,448✔
609
    ClientImpl::Connection& conn = *conn_2;
2,448✔
610
    if (!m_one_connection_per_session) {
2,448✔
611
        server_slot.connection = std::move(conn_2);
2,436✔
612
    }
2,436✔
613
    else {
12✔
614
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
12✔
615
    }
12✔
616
    m_prev_connection_ident = ident;
2,448✔
617
    was_created = true;
2,448✔
618
    {
2,448✔
619
        std::lock_guard lk(m_drain_mutex);
2,448✔
620
        ++m_num_connections;
2,448✔
621
    }
2,448✔
622
    return conn;
2,448✔
623
}
2,448✔
624

625

626
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
627
{
2,448✔
628
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,448✔
629
    auto i = m_server_slots.find(endpoint);
2,448✔
630
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,448✔
631
    ServerSlot& server_slot = i->second;
2,448✔
632
    if (!m_one_connection_per_session) {
2,448✔
633
        REALM_ASSERT(server_slot.alt_connections.empty());
2,436✔
634
        REALM_ASSERT(&*server_slot.connection == &conn);
2,436✔
635
        server_slot.reconnect_info = conn.get_reconnect_info();
2,436✔
636
        server_slot.connection.reset();
2,436✔
637
    }
2,436✔
638
    else {
12✔
639
        REALM_ASSERT(!server_slot.connection);
12✔
640
        connection_ident_type ident = conn.get_ident();
12✔
641
        auto j = server_slot.alt_connections.find(ident);
12✔
642
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
643
        REALM_ASSERT(&*j->second == &conn);
12✔
644
        server_slot.alt_connections.erase(j);
12✔
645
    }
12✔
646

1,154✔
647
    {
2,448✔
648
        std::lock_guard lk(m_drain_mutex);
2,448✔
649
        REALM_ASSERT(m_num_connections);
2,448✔
650
        --m_num_connections;
2,448✔
651
        m_drain_cv.notify_all();
2,448✔
652
    }
2,448✔
653
}
2,448✔
654

655

656
// ################ SessionImpl ################
657

658
void SessionImpl::force_close()
659
{
150✔
660
    // Allow force_close() if session is active or hasn't been activated yet.
74✔
661
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
150!
662
        m_wrapper.force_close();
150✔
663
    }
150✔
664
}
150✔
665

666
void SessionImpl::on_connection_state_changed(ConnectionState state,
667
                                              const util::Optional<SessionErrorInfo>& error_info)
668
{
10,856✔
669
    // Only used to report errors back to the SyncSession while the Session is active
5,702✔
670
    if (m_state == SessionImpl::Active) {
10,858✔
671
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
10,858✔
672
    }
10,858✔
673
}
10,856✔
674

675

676
const std::string& SessionImpl::get_virt_path() const noexcept
677
{
7,232✔
678
    // Can only be called if the session is active or being activated
3,638✔
679
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,232!
680
    return m_wrapper.m_virt_path;
7,232✔
681
}
7,232✔
682

683
const std::string& SessionImpl::get_realm_path() const noexcept
684
{
9,874✔
685
    // Can only be called if the session is active or being activated
4,760✔
686
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,874✔
687
    return m_wrapper.m_db->get_path();
9,874✔
688
}
9,874✔
689

690
DBRef SessionImpl::get_db() const noexcept
691
{
22,332✔
692
    // Can only be called if the session is active or being activated
11,850✔
693
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
22,332✔
694
    return m_wrapper.m_db;
22,332✔
695
}
22,332✔
696

697
SyncTransactReporter* SessionImpl::get_transact_reporter() noexcept
698
{
22,380✔
699
    // Can only be called if the session is active or being activated
11,874✔
700
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
22,380!
701
    return &m_wrapper;
22,380✔
702
}
22,380✔
703

704
ClientReplication& SessionImpl::access_realm()
705
{
109,392✔
706
    // Can only be called if the session is active or being activated
56,186✔
707
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
109,392✔
708
    return m_wrapper.get_replication();
109,392✔
709
}
109,392✔
710

711
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
712
{
9,606✔
713
    // Can only be called if the session is active or being activated
4,626✔
714
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,606✔
715
    return m_wrapper.m_client_reset_config;
9,606✔
716
}
9,606✔
717

718
SessionReason SessionImpl::get_session_reason() noexcept
719
{
1,334✔
720
    // Can only be called if the session is active or being activated
676✔
721
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,334!
722
    return m_wrapper.m_session_reason;
1,334✔
723
}
1,334✔
724

725
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
726
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
727
{
41,134✔
728
    // Ignore the call if the session is not active
21,822✔
729
    if (m_state != State::Active) {
41,134✔
730
        return;
×
731
    }
×
732

21,822✔
733
    try {
41,134✔
734
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
41,134!
735
        if (simulate_integration_error) {
41,134✔
736
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
737
        }
×
738
        version_type client_version;
41,134✔
739
        if (REALM_LIKELY(!get_client().is_dry_run())) {
41,134✔
740
            VersionInfo version_info;
41,134✔
741
            ClientReplication& repl = access_realm(); // Throws
41,134✔
742
            integrate_changesets(repl, progress, downloadable_bytes, changesets, version_info,
41,134✔
743
                                 batch_state); // Throws
41,134✔
744
            client_version = version_info.realm_version;
41,134✔
745
        }
41,134✔
UNCOV
746
        else {
×
747
            // Fake it for "dry run" mode
UNCOV
748
            client_version = m_last_version_available + 1;
×
UNCOV
749
        }
×
750
        on_changesets_integrated(client_version, progress); // Throws
41,134✔
751
    }
41,134✔
752
    catch (const IntegrationException& e) {
21,834✔
753
        on_integration_failure(e);
24✔
754
    }
24✔
755
    m_wrapper.on_sync_progress(); // Throws
41,134✔
756
}
41,134✔
757

758

759
void SessionImpl::on_upload_completion()
760
{
14,686✔
761
    // Ignore the call if the session is not active
7,256✔
762
    if (m_state == State::Active) {
14,686✔
763
        m_wrapper.on_upload_completion(); // Throws
14,686✔
764
    }
14,686✔
765
}
14,686✔
766

767

768
void SessionImpl::on_download_completion()
769
{
15,442✔
770
    // Ignore the call if the session is not active
7,612✔
771
    if (m_state == State::Active) {
15,442✔
772
        m_wrapper.on_download_completion(); // Throws
15,442✔
773
    }
15,442✔
774
}
15,442✔
775

776

777
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
778
{
894✔
779
    // Ignore the call if the session is not active
452✔
780
    if (m_state == State::Active) {
894✔
781
        m_wrapper.on_suspended(error_info); // Throws
894✔
782
    }
894✔
783
}
894✔
784

785

786
void SessionImpl::on_resumed()
787
{
370✔
788
    // Ignore the call if the session is not active
192✔
789
    if (m_state == State::Active) {
370✔
790
        m_wrapper.on_resumed(); // Throws
370✔
791
    }
370✔
792
}
370✔
793

794
void SessionImpl::handle_pending_client_reset_acknowledgement()
795
{
292✔
796
    // Ignore the call if the session is not active
146✔
797
    if (m_state == State::Active) {
292✔
798
        m_wrapper.handle_pending_client_reset_acknowledgement();
292✔
799
    }
292✔
800
}
292✔
801

802

803
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
804
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
805
{
42,692✔
806
    // Ignore the call if the session is not active
22,602✔
807
    if (m_state != State::Active) {
42,692✔
808
        return false;
×
809
    }
×
810

22,602✔
811
    if (is_steady_state_download_message(batch_state, query_version)) {
42,692✔
812
        return false;
41,134✔
813
    }
41,134✔
814

780✔
815
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,558✔
816
    util::Optional<SyncProgress> maybe_progress;
1,558✔
817
    if (batch_state == DownloadBatchState::LastInBatch) {
1,558✔
818
        maybe_progress = progress;
1,398✔
819
    }
1,398✔
820

780✔
821
    bool new_batch = false;
1,558✔
822
    try {
1,558✔
823
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,558✔
824
    }
1,558✔
825
    catch (const LogicError& ex) {
780✔
826
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
827
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
828
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
829
                                    ProtocolError::bad_changeset_size);
×
830
            on_integration_failure(ex);
×
831
            return true;
×
832
        }
×
833
        throw;
×
834
    }
×
835

780✔
836
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
780✔
837
    // bootstrapping.
780✔
838
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,558✔
839
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
40✔
840
    }
40✔
841

780✔
842
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
1,558✔
843
                                       batch_state, received_changesets.size());
1,558✔
844
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,558✔
845
        return true;
12✔
846
    }
12✔
847
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,546✔
848

774✔
849
    if (batch_state == DownloadBatchState::MoreToCome) {
1,546✔
850
        return true;
156✔
851
    }
156✔
852

696✔
853
    try {
1,390✔
854
        process_pending_flx_bootstrap();
1,390✔
855
    }
1,390✔
856
    catch (const IntegrationException& e) {
700✔
857
        on_integration_failure(e);
8✔
858
    }
8✔
859

696✔
860
    return true;
1,390✔
861
}
1,390✔
862

863

864
void SessionImpl::process_pending_flx_bootstrap()
865
{
10,996✔
866
    // Ignore the call if not a flx session or session is not active
5,322✔
867
    if (!m_is_flx_sync_session || m_state != State::Active) {
10,996✔
868
        return;
8,626✔
869
    }
8,626✔
870
    // Should never be called if session is not active
1,186✔
871
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
2,370✔
872
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,370✔
873
    if (!bootstrap_store->has_pending()) {
2,370✔
874
        return;
964✔
875
    }
964✔
876

704✔
877
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,406✔
878
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,406✔
879
                "changeset size: %3)",
1,406✔
880
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,406✔
881
                pending_batch_stats.pending_changeset_bytes);
1,406✔
882
    auto& history = access_realm().get_history();
1,406✔
883
    VersionInfo new_version;
1,406✔
884
    SyncProgress progress;
1,406✔
885
    int64_t query_version = -1;
1,406✔
886
    size_t changesets_processed = 0;
1,406✔
887

704✔
888
    // Used to commit each batch after it was transformed.
704✔
889
    TransactionRef transact = get_db()->start_write();
1,406✔
890
    while (bootstrap_store->has_pending()) {
2,928✔
891
        auto start_time = std::chrono::steady_clock::now();
1,538✔
892
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,538✔
893
        if (!pending_batch.progress) {
1,538✔
894
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
895
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
4✔
896
            // bootstrap store requires a write transaction itself.
4✔
897
            transact->close();
8✔
898
            bootstrap_store->clear();
8✔
899
            return;
8✔
900
        }
8✔
901

766✔
902
        auto batch_state =
1,530✔
903
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,462✔
904
        uint64_t downloadable_bytes = 0;
1,530✔
905
        query_version = pending_batch.query_version;
1,530✔
906
        bool simulate_integration_error =
1,530✔
907
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,530✔
908
        if (simulate_integration_error) {
1,530✔
909
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
8✔
910
        }
8✔
911

762✔
912
        history.integrate_server_changesets(
1,522✔
913
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
1,522✔
914
            transact,
1,522✔
915
            [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
1,520✔
916
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,518✔
917
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,518✔
918
            },
1,518✔
919
            get_transact_reporter());
1,522✔
920
        progress = *pending_batch.progress;
1,522✔
921
        changesets_processed += pending_batch.changesets.size();
1,522✔
922
        auto duration = std::chrono::steady_clock::now() - start_time;
1,522✔
923

762✔
924
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,522✔
925
                                      batch_state, pending_batch.changesets.size());
1,522✔
926
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,522✔
927

762✔
928
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,522✔
929
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,522✔
930
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,522✔
931
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,522✔
932
                    pending_batch.remaining_changesets);
1,522✔
933
    }
1,522✔
934
    on_changesets_integrated(new_version.realm_version, progress);
1,398✔
935

696✔
936
    REALM_ASSERT_3(query_version, !=, -1);
1,390✔
937
    m_wrapper.on_sync_progress();
1,390✔
938
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,390✔
939

696✔
940
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,390✔
941
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,390✔
942
    // NoAction/EarlyReturn are both valid no-op actions to take here.
696✔
943
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,390✔
944
}
1,390✔
945

946
void SessionImpl::on_new_flx_subscription_set(int64_t new_version)
947
{
726✔
948
    // If m_state == State::Active then we know that we haven't sent an UNBIND message and all we need to
362✔
949
    // check is that we have completed the IDENT message handshake and have not yet received an ERROR
362✔
950
    // message to call ensure_enlisted_to_send().
362✔
951
    if (m_state == State::Active && m_ident_message_sent && !m_error_message_received) {
726✔
952
        logger.trace("Requesting QUERY change message for new subscription set version %1", new_version);
354✔
953
        ensure_enlisted_to_send();
354✔
954
    }
354✔
955
}
726✔
956

957
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
958
{
16✔
959
    // Ignore the call if the session is not active
8✔
960
    if (m_state == State::Active) {
16✔
961
        m_wrapper.on_flx_sync_error(version, err_msg);
16✔
962
    }
16✔
963
}
16✔
964

965
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
966
{
1,426✔
967
    // Ignore the call if the session is not active
714✔
968
    if (m_state == State::Active) {
1,426✔
969
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,426✔
970
    }
1,426✔
971
}
1,426✔
972

973
SubscriptionStore* SessionImpl::get_flx_subscription_store()
974
{
15,288✔
975
    // Should never be called if session is not active
7,806✔
976
    REALM_ASSERT_EX(m_state == State::Active, m_state);
15,288✔
977
    return m_wrapper.get_flx_subscription_store();
15,288✔
978
}
15,288✔
979

980
MigrationStore* SessionImpl::get_migration_store()
981
{
55,700✔
982
    // Should never be called if session is not active
29,212✔
983
    REALM_ASSERT_EX(m_state == State::Active, m_state);
55,700✔
984
    return m_wrapper.get_migration_store();
55,700✔
985
}
55,700✔
986

987
void SessionImpl::on_flx_sync_version_complete(int64_t version)
988
{
104✔
989
    // Ignore the call if the session is not active
52✔
990
    if (m_state == State::Active) {
104✔
991
        m_wrapper.on_flx_sync_version_complete(version);
104✔
992
    }
104✔
993
}
104✔
994

995
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
996
{
820✔
997
    // Should never be called if session is not active
412✔
998
    REALM_ASSERT_EX(m_state == State::Active, m_state);
820✔
999

412✔
1000
    // Make sure we don't call the debug hook recursively.
412✔
1001
    if (m_wrapper.m_in_debug_hook) {
820✔
1002
        return SyncClientHookAction::NoAction;
×
1003
    }
×
1004
    m_wrapper.m_in_debug_hook = true;
820✔
1005
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
820✔
1006
        m_wrapper.m_in_debug_hook = false;
820✔
1007
    });
820✔
1008

412✔
1009
    auto action = m_wrapper.m_debug_hook(data);
820✔
1010
    switch (action) {
820✔
1011
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
✔
1012
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
×
1013
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
×
1014

1015
            auto err_processing_err = receive_error_message(err_info);
×
1016
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
×
1017
            return SyncClientHookAction::EarlyReturn;
×
1018
        }
×
1019
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1020
            get_connection().voluntary_disconnect();
24✔
1021
            return SyncClientHookAction::EarlyReturn;
24✔
1022
        }
×
1023
        default:
796✔
1024
            return action;
796✔
1025
    }
820✔
1026
}
820✔
1027

1028
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1029
                                                  int64_t query_version, DownloadBatchState batch_state,
1030
                                                  size_t num_changesets)
1031
{
88,298✔
1032
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
88,298✔
1033
        return SyncClientHookAction::NoAction;
87,540✔
1034
    }
87,540✔
1035
    if (REALM_UNLIKELY(m_state != State::Active)) {
758✔
1036
        return SyncClientHookAction::NoAction;
×
1037
    }
×
1038

380✔
1039
    SyncClientHookData data;
758✔
1040
    data.event = event;
758✔
1041
    data.batch_state = batch_state;
758✔
1042
    data.progress = progress;
758✔
1043
    data.num_changesets = num_changesets;
758✔
1044
    data.query_version = query_version;
758✔
1045

380✔
1046
    return call_debug_hook(data);
758✔
1047
}
758✔
1048

1049
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1050
{
866✔
1051
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
866✔
1052
        return SyncClientHookAction::NoAction;
802✔
1053
    }
802✔
1054
    if (REALM_UNLIKELY(m_state != State::Active)) {
64✔
1055
        return SyncClientHookAction::NoAction;
×
1056
    }
×
1057

32✔
1058
    SyncClientHookData data;
64✔
1059
    data.event = event;
64✔
1060
    data.batch_state = DownloadBatchState::SteadyState;
64✔
1061
    data.progress = m_progress;
64✔
1062
    data.num_changesets = 0;
64✔
1063
    data.query_version = 0;
64✔
1064
    data.error_info = &error_info;
64✔
1065

32✔
1066
    return call_debug_hook(data);
64✔
1067
}
64✔
1068

1069
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1070
{
85,390✔
1071
    // Should never be called if session is not active
45,204✔
1072
    REALM_ASSERT_EX(m_state == State::Active, m_state);
85,390✔
1073
    if (batch_state == DownloadBatchState::SteadyState) {
85,390✔
1074
        return true;
41,134✔
1075
    }
41,134✔
1076

23,382✔
1077
    if (!m_is_flx_sync_session) {
44,256✔
1078
        return true;
40,108✔
1079
    }
40,108✔
1080

2,098✔
1081
    // If this is a steady state DOWNLOAD, no need for special handling.
2,098✔
1082
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,148✔
1083
        return true;
1,030✔
1084
    }
1,030✔
1085

1,558✔
1086
    return false;
3,118✔
1087
}
3,118✔
1088

1089
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1090
{
52✔
1091
    if (m_state != State::Active) {
52✔
1092
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1093
    }
×
1094

26✔
1095
    try {
52✔
1096
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
52✔
1097
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
52✔
1098
            return Status{ErrorCodes::LogicError,
4✔
1099
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1100
        }
4✔
1101
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
48✔
1102
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1103
        }
×
1104
    }
4✔
1105
    catch (const nlohmann::json::parse_error& e) {
4✔
1106
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1107
    }
4✔
1108

22✔
1109
    auto pf = util::make_promise_future<std::string>();
44✔
1110

22✔
1111
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
44✔
1112
        // Includes operation_aborted
22✔
1113
        if (!status.is_ok())
44✔
1114
            promise.set_error(status);
×
1115

22✔
1116
        auto id = ++m_last_pending_test_command_ident;
44✔
1117
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
44✔
1118
        ensure_enlisted_to_send();
44✔
1119
    });
44✔
1120

22✔
1121
    return std::move(pf.future);
44✔
1122
}
44✔
1123

1124
// ################ SessionWrapper ################
1125

1126
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1127
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1128
// the ClientImpl::Connection that owns the ClientImpl::Session.
1129
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1130
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1131
    : m_client{client}
1132
    , m_db(std::move(db))
1133
    , m_replication(m_db->get_replication())
1134
    , m_protocol_envelope{config.protocol_envelope}
1135
    , m_server_address{std::move(config.server_address)}
1136
    , m_server_port{config.server_port}
1137
    , m_user_id(std::move(config.user_id))
1138
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1139
    , m_authorization_header_name{config.authorization_header_name}
1140
    , m_custom_http_headers{config.custom_http_headers}
1141
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1142
    , m_simulate_integration_error{config.simulate_integration_error}
1143
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1144
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
1145
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1146
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1147
    , m_virt_path{std::move(config.realm_identifier)}
1148
    , m_signed_access_token{std::move(config.signed_user_token)}
1149
    , m_client_reset_config{std::move(config.client_reset_config)}
1150
    , m_proxy_config{config.proxy_config} // Throws
1151
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1152
    , m_session_reason(config.session_reason)
1153
    , m_flx_subscription_store(std::move(flx_sub_store))
1154
    , m_migration_store(std::move(migration_store))
1155
{
10,934✔
1156
    REALM_ASSERT(m_db);
10,934✔
1157
    REALM_ASSERT(m_db->get_replication());
10,934✔
1158
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,934✔
1159

5,294✔
1160
    if (m_flx_subscription_store) {
10,934✔
1161
        auto versions_info = m_flx_subscription_store->get_version_info();
984✔
1162
        m_flx_active_version = versions_info.active;
984✔
1163
        m_flx_latest_version = versions_info.latest;
984✔
1164
        m_flx_pending_mark_version = versions_info.pending_mark;
984✔
1165
    }
984✔
1166
}
10,934✔
1167

1168
SessionWrapper::~SessionWrapper() noexcept
1169
{
10,930✔
1170
    if (m_db && m_actualized)
10,930✔
1171
        m_db->release_sync_agent();
168✔
1172
}
10,930✔
1173

1174

1175
inline ClientReplication& SessionWrapper::get_replication() noexcept
1176
{
109,390✔
1177
    REALM_ASSERT(m_db);
109,390✔
1178
    return static_cast<ClientReplication&>(*m_replication);
109,390✔
1179
}
109,390✔
1180

1181

1182
inline ClientImpl& SessionWrapper::get_client() noexcept
1183
{
72✔
1184
    return m_client;
72✔
1185
}
72✔
1186

1187
bool SessionWrapper::has_flx_subscription_store() const
1188
{
1,426✔
1189
    return static_cast<bool>(m_flx_subscription_store);
1,426✔
1190
}
1,426✔
1191

1192
void SessionWrapper::on_new_flx_subscription_set(int64_t new_version)
1193
{
726✔
1194
    if (!m_initiated) {
726✔
1195
        return;
×
1196
    }
×
1197
    REALM_ASSERT(!m_finalized);
726✔
1198

362✔
1199
    auto self = util::bind_ptr<SessionWrapper>(this);
726✔
1200
    m_client.post([new_version, self = std::move(self)](Status status) {
726✔
1201
        if (status == ErrorCodes::OperationAborted)
726✔
1202
            return;
×
1203
        else if (!status.is_ok())
726✔
1204
            throw Exception(status);
×
1205
        REALM_ASSERT(self->m_actualized);
726✔
1206
        if (REALM_UNLIKELY(!self->m_sess)) {
726✔
1207
            return; // Already finalized
×
1208
        }
×
1209
        auto& sess = *self->m_sess;
726✔
1210
        sess.recognize_sync_version(self->m_db->get_version_of_latest_snapshot());
726✔
1211
        self->m_flx_latest_version = new_version;
726✔
1212
        sess.on_new_flx_subscription_set(new_version);
726✔
1213
    });
726✔
1214
}
726✔
1215

1216
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1217
{
16✔
1218
    REALM_ASSERT(!m_finalized);
16✔
1219
    REALM_ASSERT(m_flx_latest_version != 0);
16✔
1220
    REALM_ASSERT(m_flx_latest_version >= version);
16✔
1221

8✔
1222
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(version);
16✔
1223
    mut_subs.update_state(SubscriptionSet::State::Error, err_msg);
16✔
1224
    mut_subs.commit();
16✔
1225
}
16✔
1226

1227
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1228
{
1,490✔
1229
    REALM_ASSERT(!m_finalized);
1,490✔
1230
    m_flx_last_seen_version = version;
1,490✔
1231
    m_flx_active_version = version;
1,490✔
1232
}
1,490✔
1233

1234
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1235
{
1,426✔
1236
    if (!has_flx_subscription_store()) {
1,426✔
1237
        return;
×
1238
    }
×
1239
    REALM_ASSERT(!m_finalized);
1,426✔
1240
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,426✔
1241
    REALM_ASSERT(new_version >= m_flx_active_version);
1,426✔
1242
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,426✔
1243

714✔
1244
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,426✔
1245

714✔
1246
    switch (batch_state) {
1,426✔
1247
        case DownloadBatchState::SteadyState:
✔
1248
            // Cannot be called with this value.
1249
            REALM_UNREACHABLE();
×
1250
        case DownloadBatchState::LastInBatch:
1,386✔
1251
            if (m_flx_active_version == new_version) {
1,386✔
1252
                return;
×
1253
            }
×
1254
            on_flx_sync_version_complete(new_version);
1,386✔
1255
            if (new_version == 0) {
1,386✔
1256
                new_state = SubscriptionSet::State::Complete;
624✔
1257
            }
624✔
1258
            else {
762✔
1259
                new_state = SubscriptionSet::State::AwaitingMark;
762✔
1260
                m_flx_pending_mark_version = new_version;
762✔
1261
            }
762✔
1262
            break;
1,386✔
1263
        case DownloadBatchState::MoreToCome:
714✔
1264
            if (m_flx_last_seen_version == new_version) {
40✔
1265
                return;
×
1266
            }
×
1267

20✔
1268
            m_flx_last_seen_version = new_version;
40✔
1269
            new_state = SubscriptionSet::State::Bootstrapping;
40✔
1270
            break;
40✔
1271
    }
1,426✔
1272

714✔
1273
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(new_version);
1,426✔
1274
    mut_subs.update_state(new_state);
1,426✔
1275
    mut_subs.commit();
1,426✔
1276
}
1,426✔
1277

1278
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1279
{
16,730✔
1280
    REALM_ASSERT(!m_finalized);
16,730✔
1281
    return m_flx_subscription_store.get();
16,730✔
1282
}
16,730✔
1283

1284
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1285
{
3,928✔
1286
    REALM_ASSERT(!m_finalized);
3,928✔
1287
    return m_flx_pending_bootstrap_store.get();
3,928✔
1288
}
3,928✔
1289

1290
MigrationStore* SessionWrapper::get_migration_store()
1291
{
55,700✔
1292
    REALM_ASSERT(!m_finalized);
55,700✔
1293
    return m_migration_store.get();
55,700✔
1294
}
55,700✔
1295

1296
inline void SessionWrapper::set_sync_transact_handler(util::UniqueFunction<SyncTransactCallback> handler)
1297
{
3,330✔
1298
    REALM_ASSERT(!m_initiated);
3,330✔
1299
    m_sync_transact_handler = std::move(handler); // Throws
3,330✔
1300
}
3,330✔
1301

1302

1303
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1304
{
3,394✔
1305
    REALM_ASSERT(!m_initiated);
3,394✔
1306
    m_progress_handler = std::move(handler);
3,394✔
1307
}
3,394✔
1308

1309

1310
inline void
1311
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1312
{
11,028✔
1313
    REALM_ASSERT(!m_initiated);
11,028✔
1314
    m_connection_state_change_listener = std::move(listener);
11,028✔
1315
}
11,028✔
1316

1317

1318
inline void SessionWrapper::initiate()
1319
{
9,782✔
1320
    REALM_ASSERT(!m_initiated);
9,782✔
1321
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
9,782✔
1322
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
9,782✔
1323
    m_initiated = true;
9,782✔
1324
}
9,782✔
1325

1326

1327
void SessionWrapper::nonsync_transact_notify(version_type new_version)
1328
{
41,902✔
1329
    // Thread safety required
20,224✔
1330
    REALM_ASSERT(m_initiated);
41,902✔
1331

20,224✔
1332
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
41,902✔
1333
        return;
×
1334
    }
×
1335

20,224✔
1336
    util::bind_ptr<SessionWrapper> self{this};
41,902✔
1337
    m_client.post([self = std::move(self), new_version](Status status) {
41,902✔
1338
        if (status == ErrorCodes::OperationAborted)
41,902✔
1339
            return;
×
1340
        else if (!status.is_ok())
41,902✔
1341
            throw Exception(status);
×
1342

20,224✔
1343
        REALM_ASSERT(self->m_actualized);
41,902✔
1344
        if (REALM_UNLIKELY(!self->m_sess))
41,902✔
1345
            return; // Already finalized
20,224✔
1346
        SessionImpl& sess = *self->m_sess;
41,902✔
1347
        sess.recognize_sync_version(new_version); // Throws
41,902✔
1348
        self->report_progress();                  // Throws
41,902✔
1349
    });                                           // Throws
41,902✔
1350
}
41,902✔
1351

1352

1353
void SessionWrapper::cancel_reconnect_delay()
1354
{
12✔
1355
    // Thread safety required
6✔
1356
    REALM_ASSERT(m_initiated);
12✔
1357

6✔
1358
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
12✔
1359
        return;
×
1360
    }
×
1361

6✔
1362
    util::bind_ptr<SessionWrapper> self{this};
12✔
1363
    m_client.post([self = std::move(self)](Status status) {
12✔
1364
        if (status == ErrorCodes::OperationAborted)
12✔
1365
            return;
×
1366
        else if (!status.is_ok())
12✔
1367
            throw Exception(status);
×
1368

6✔
1369
        REALM_ASSERT(self->m_actualized);
12✔
1370
        if (REALM_UNLIKELY(!self->m_sess))
12✔
1371
            return; // Already finalized
6✔
1372
        SessionImpl& sess = *self->m_sess;
12✔
1373
        sess.cancel_resumption_delay(); // Throws
12✔
1374
        ClientImpl::Connection& conn = sess.get_connection();
12✔
1375
        conn.cancel_reconnect_delay(); // Throws
12✔
1376
    });                                // Throws
12✔
1377
}
12✔
1378

1379
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1380
                                    WaitOperCompletionHandler handler)
1381
{
4,394✔
1382
    REALM_ASSERT(upload_completion || download_completion);
4,394✔
1383
    REALM_ASSERT(m_initiated);
4,394✔
1384
    REALM_ASSERT(!m_finalized);
4,394✔
1385

2,110✔
1386
    util::bind_ptr<SessionWrapper> self{this};
4,394✔
1387
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
4,394✔
1388
                   download_completion](Status status) mutable {
4,394✔
1389
        if (status == ErrorCodes::OperationAborted)
4,394✔
1390
            return;
×
1391
        else if (!status.is_ok())
4,394✔
1392
            throw Exception(status);
×
1393

2,110✔
1394
        REALM_ASSERT(self->m_actualized);
4,394✔
1395
        if (REALM_UNLIKELY(!self->m_sess)) {
4,394✔
1396
            // Already finalized
38✔
1397
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
70✔
1398
            return;
70✔
1399
        }
70✔
1400
        if (upload_completion) {
4,324✔
1401
            if (download_completion) {
2,296✔
1402
                // Wait for upload and download completion
140✔
1403
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
278✔
1404
            }
278✔
1405
            else {
2,018✔
1406
                // Wait for upload completion only
918✔
1407
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,018✔
1408
            }
2,018✔
1409
        }
2,296✔
1410
        else {
2,028✔
1411
            // Wait for download completion only
1,014✔
1412
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,028✔
1413
        }
2,028✔
1414
        SessionImpl& sess = *self->m_sess;
4,324✔
1415
        if (upload_completion)
4,324✔
1416
            sess.request_upload_completion_notification(); // Throws
2,296✔
1417
        if (download_completion)
4,324✔
1418
            sess.request_download_completion_notification(); // Throws
2,306✔
1419
    });                                                      // Throws
4,324✔
1420
}
4,394✔
1421

1422

1423
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1424
{
12,984✔
1425
    // Thread safety required
6,492✔
1426
    REALM_ASSERT(m_initiated);
12,984✔
1427
    REALM_ASSERT(!m_finalized);
12,984✔
1428

6,492✔
1429
    std::int_fast64_t target_mark;
12,984✔
1430
    {
12,984✔
1431
        std::lock_guard lock{m_client.m_mutex};
12,984✔
1432
        target_mark = ++m_target_upload_mark;
12,984✔
1433
    }
12,984✔
1434

6,492✔
1435
    util::bind_ptr<SessionWrapper> self{this};
12,984✔
1436
    m_client.post([self = std::move(self), target_mark](Status status) {
12,984✔
1437
        if (status == ErrorCodes::OperationAborted)
12,984✔
1438
            return;
×
1439
        else if (!status.is_ok())
12,984✔
1440
            throw Exception(status);
×
1441

6,492✔
1442
        REALM_ASSERT(self->m_actualized);
12,984✔
1443
        // The session wrapper may already have been finalized. This can only
6,492✔
1444
        // happen if it was abandoned, but in that case, the call of
6,492✔
1445
        // wait_for_upload_complete_or_client_stopped() must have returned
6,492✔
1446
        // already.
6,492✔
1447
        if (REALM_UNLIKELY(!self->m_sess))
12,984✔
1448
            return;
6,502✔
1449
        if (target_mark > self->m_staged_upload_mark) {
12,972✔
1450
            self->m_staged_upload_mark = target_mark;
12,972✔
1451
            SessionImpl& sess = *self->m_sess;
12,972✔
1452
            sess.request_upload_completion_notification(); // Throws
12,972✔
1453
        }
12,972✔
1454
    }); // Throws
12,972✔
1455

6,492✔
1456
    bool completion_condition_was_satisfied;
12,984✔
1457
    {
12,984✔
1458
        std::unique_lock lock{m_client.m_mutex};
12,984✔
1459
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
32,836✔
1460
            m_client.m_wait_or_client_stopped_cond.wait(lock);
19,852✔
1461
        completion_condition_was_satisfied = !m_client.m_stopped;
12,984✔
1462
    }
12,984✔
1463
    return completion_condition_was_satisfied;
12,984✔
1464
}
12,984✔
1465

1466

1467
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1468
{
10,112✔
1469
    // Thread safety required
5,058✔
1470
    REALM_ASSERT(m_initiated);
10,112✔
1471
    REALM_ASSERT(!m_finalized);
10,112✔
1472

5,058✔
1473
    std::int_fast64_t target_mark;
10,112✔
1474
    {
10,112✔
1475
        std::lock_guard lock{m_client.m_mutex};
10,112✔
1476
        target_mark = ++m_target_download_mark;
10,112✔
1477
    }
10,112✔
1478

5,058✔
1479
    util::bind_ptr<SessionWrapper> self{this};
10,112✔
1480
    m_client.post([self = std::move(self), target_mark](Status status) {
10,112✔
1481
        if (status == ErrorCodes::OperationAborted)
10,112✔
1482
            return;
×
1483
        else if (!status.is_ok())
10,112✔
1484
            throw Exception(status);
×
1485

5,058✔
1486
        REALM_ASSERT(self->m_actualized);
10,112✔
1487
        // The session wrapper may already have been finalized. This can only
5,058✔
1488
        // happen if it was abandoned, but in that case, the call of
5,058✔
1489
        // wait_for_download_complete_or_client_stopped() must have returned
5,058✔
1490
        // already.
5,058✔
1491
        if (REALM_UNLIKELY(!self->m_sess))
10,112✔
1492
            return;
5,088✔
1493
        if (target_mark > self->m_staged_download_mark) {
10,052✔
1494
            self->m_staged_download_mark = target_mark;
10,052✔
1495
            SessionImpl& sess = *self->m_sess;
10,052✔
1496
            sess.request_download_completion_notification(); // Throws
10,052✔
1497
        }
10,052✔
1498
    }); // Throws
10,052✔
1499

5,058✔
1500
    bool completion_condition_was_satisfied;
10,112✔
1501
    {
10,112✔
1502
        std::unique_lock lock{m_client.m_mutex};
10,112✔
1503
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,580✔
1504
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,468✔
1505
        completion_condition_was_satisfied = !m_client.m_stopped;
10,112✔
1506
    }
10,112✔
1507
    return completion_condition_was_satisfied;
10,112✔
1508
}
10,112✔
1509

1510

1511
void SessionWrapper::refresh(std::string signed_access_token)
1512
{
208✔
1513
    // Thread safety required
104✔
1514
    REALM_ASSERT(m_initiated);
208✔
1515
    REALM_ASSERT(!m_finalized);
208✔
1516

104✔
1517
    m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
208✔
1518
        if (status == ErrorCodes::OperationAborted)
208✔
1519
            return;
×
1520
        else if (!status.is_ok())
208✔
1521
            throw Exception(status);
×
1522

104✔
1523
        REALM_ASSERT(self->m_actualized);
208✔
1524
        if (REALM_UNLIKELY(!self->m_sess))
208✔
1525
            return; // Already finalized
104✔
1526
        self->m_signed_access_token = std::move(token);
208✔
1527
        SessionImpl& sess = *self->m_sess;
208✔
1528
        ClientImpl::Connection& conn = sess.get_connection();
208✔
1529
        // FIXME: This only makes sense when each session uses a separate connection.
104✔
1530
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
208✔
1531
        sess.cancel_resumption_delay();                                                          // Throws
208✔
1532
        conn.cancel_reconnect_delay();                                                           // Throws
208✔
1533
    });
208✔
1534
}
208✔
1535

1536

1537
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1538
{
10,934✔
1539
    if (wrapper->m_initiated) {
10,934✔
1540
        ClientImpl& client = wrapper->m_client;
9,782✔
1541
        client.register_abandoned_session_wrapper(std::move(wrapper));
9,782✔
1542
    }
9,782✔
1543
}
10,934✔
1544

1545

1546
// Must be called from event loop thread
1547
void SessionWrapper::actualize(ServerEndpoint endpoint)
1548
{
9,610✔
1549
    REALM_ASSERT(!m_actualized);
9,610✔
1550
    REALM_ASSERT(!m_sess);
9,610✔
1551
    // Cannot be actualized if it's already been finalized or force closed
4,628✔
1552
    REALM_ASSERT(!m_finalized);
9,610✔
1553
    REALM_ASSERT(!m_force_closed);
9,610✔
1554
    try {
9,610✔
1555
        m_db->claim_sync_agent();
9,610✔
1556
    }
9,610✔
1557
    catch (const MultipleSyncAgents&) {
4,630✔
1558
        finalize_before_actualization();
4✔
1559
        throw;
4✔
1560
    }
4✔
1561
    auto sync_mode = endpoint.server_mode;
9,606✔
1562

4,626✔
1563
    bool was_created = false;
9,606✔
1564
    ClientImpl::Connection& conn = m_client.get_connection(
9,606✔
1565
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
9,606✔
1566
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
9,606✔
1567
        was_created); // Throws
9,606✔
1568
    try {
9,606✔
1569
        // FIXME: This only makes sense when each session uses a separate connection.
4,626✔
1570
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
9,606✔
1571
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
9,606✔
1572
        if (sync_mode == SyncServerMode::FLX) {
9,606✔
1573
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
980✔
1574
        }
980✔
1575

4,626✔
1576
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
9,606✔
1577
        m_sess = sess.get();
9,606✔
1578
        conn.activate_session(std::move(sess)); // Throws
9,606✔
1579
    }
9,606✔
1580
    catch (...) {
4,626✔
1581
        if (was_created)
×
1582
            m_client.remove_connection(conn);
×
1583

1584
        finalize_before_actualization();
×
1585
        throw;
×
1586
    }
×
1587

4,626✔
1588
    m_actualized = true;
9,604✔
1589
    if (was_created)
9,604✔
1590
        conn.activate(); // Throws
2,446✔
1591

4,626✔
1592
    if (m_connection_state_change_listener) {
9,604✔
1593
        ConnectionState state = conn.get_state();
9,584✔
1594
        if (state != ConnectionState::disconnected) {
9,584✔
1595
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,012✔
1596
            if (state == ConnectionState::connected)
7,012✔
1597
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,716✔
1598
        }
7,012✔
1599
    }
9,584✔
1600

4,626✔
1601
    if (!m_client_reset_config)
9,604✔
1602
        report_progress(); // Throws
9,264✔
1603
}
9,604✔
1604

1605
void SessionWrapper::force_close()
1606
{
150✔
1607
    if (m_force_closed || m_finalized) {
150✔
1608
        return;
×
1609
    }
×
1610
    REALM_ASSERT(m_actualized);
150✔
1611
    REALM_ASSERT(m_sess);
150✔
1612
    m_force_closed = true;
150✔
1613

74✔
1614
    ClientImpl::Connection& conn = m_sess->get_connection();
150✔
1615
    conn.initiate_session_deactivation(m_sess); // Throws
150✔
1616

74✔
1617
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
74✔
1618
    m_flx_pending_bootstrap_store.reset();
150✔
1619
    // Clear the subscription and migration store refs since they are owned by SyncSession
74✔
1620
    m_flx_subscription_store.reset();
150✔
1621
    m_migration_store.reset();
150✔
1622
    m_sess = nullptr;
150✔
1623
    // Everything is being torn down, no need to report connection state anymore
74✔
1624
    m_connection_state_change_listener = {};
150✔
1625
}
150✔
1626

1627
// Must be called from event loop thread
1628
void SessionWrapper::finalize()
1629
{
9,614✔
1630
    REALM_ASSERT(m_actualized);
9,614✔
1631

4,630✔
1632
    // Already finalized?
4,630✔
1633
    if (m_finalized) {
9,614✔
1634
        return;
×
1635
    }
×
1636

4,630✔
1637
    m_finalized = true;
9,614✔
1638

4,630✔
1639
    if (!m_force_closed) {
9,614✔
1640
        REALM_ASSERT(m_sess);
9,456✔
1641
        ClientImpl::Connection& conn = m_sess->get_connection();
9,456✔
1642
        conn.initiate_session_deactivation(m_sess); // Throws
9,456✔
1643

4,552✔
1644
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
4,552✔
1645
        m_flx_pending_bootstrap_store.reset();
9,456✔
1646
        // Clear the subscription and migration store refs since they are owned by SyncSession
4,552✔
1647
        m_flx_subscription_store.reset();
9,456✔
1648
        m_migration_store.reset();
9,456✔
1649
        m_sess = nullptr;
9,456✔
1650
    }
9,456✔
1651

4,630✔
1652
    // The Realm file can be closed now, as no access to the Realm file is
4,630✔
1653
    // supposed to happen on behalf of a session after initiation of
4,630✔
1654
    // deactivation.
4,630✔
1655
    m_db->release_sync_agent();
9,614✔
1656
    m_db = nullptr;
9,614✔
1657

4,630✔
1658
    // All outstanding wait operations must be canceled
4,630✔
1659
    while (!m_upload_completion_handlers.empty()) {
9,982✔
1660
        auto handler = std::move(m_upload_completion_handlers.back());
368✔
1661
        m_upload_completion_handlers.pop_back();
368✔
1662
        handler(
368✔
1663
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
368✔
1664
    }
368✔
1665
    while (!m_download_completion_handlers.empty()) {
9,778✔
1666
        auto handler = std::move(m_download_completion_handlers.back());
164✔
1667
        m_download_completion_handlers.pop_back();
164✔
1668
        handler(
164✔
1669
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
164✔
1670
    }
164✔
1671
    while (!m_sync_completion_handlers.empty()) {
9,628✔
1672
        auto handler = std::move(m_sync_completion_handlers.back());
14✔
1673
        m_sync_completion_handlers.pop_back();
14✔
1674
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
14✔
1675
    }
14✔
1676
}
9,614✔
1677

1678

1679
// Must be called only when an unactualized session wrapper becomes abandoned.
1680
//
1681
// Called with a lock on `m_client.m_mutex`.
1682
inline void SessionWrapper::finalize_before_actualization() noexcept
1683
{
176✔
1684
    REALM_ASSERT(!m_sess);
176✔
1685
    m_actualized = true;
176✔
1686
    m_force_closed = true;
176✔
1687
}
176✔
1688

1689

1690
inline void SessionWrapper::report_sync_transact(VersionID old_version, VersionID new_version)
1691
{
22,352✔
1692
    REALM_ASSERT(!m_finalized);
22,352✔
1693
    if (m_sync_transact_handler)
22,352✔
1694
        m_sync_transact_handler(old_version, new_version); // Throws
3,300✔
1695
}
22,352✔
1696

1697

1698
inline void SessionWrapper::on_sync_progress()
1699
{
42,520✔
1700
    REALM_ASSERT(!m_finalized);
42,520✔
1701
    m_reliable_download_progress = true;
42,520✔
1702
    report_progress(); // Throws
42,520✔
1703
}
42,520✔
1704

1705

1706
void SessionWrapper::on_upload_completion()
1707
{
14,686✔
1708
    REALM_ASSERT(!m_finalized);
14,686✔
1709
    while (!m_upload_completion_handlers.empty()) {
16,420✔
1710
        auto handler = std::move(m_upload_completion_handlers.back());
1,734✔
1711
        m_upload_completion_handlers.pop_back();
1,734✔
1712
        handler(Status::OK()); // Throws
1,734✔
1713
    }
1,734✔
1714
    while (!m_sync_completion_handlers.empty()) {
14,866✔
1715
        auto handler = std::move(m_sync_completion_handlers.back());
180✔
1716
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
180✔
1717
        m_sync_completion_handlers.pop_back();
180✔
1718
    }
180✔
1719
    std::lock_guard lock{m_client.m_mutex};
14,686✔
1720
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,686✔
1721
        m_reached_upload_mark = m_staged_upload_mark;
12,952✔
1722
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,952✔
1723
    }
12,952✔
1724
}
14,686✔
1725

1726

1727
void SessionWrapper::on_download_completion()
1728
{
15,442✔
1729
    while (!m_download_completion_handlers.empty()) {
17,486✔
1730
        auto handler = std::move(m_download_completion_handlers.back());
2,044✔
1731
        m_download_completion_handlers.pop_back();
2,044✔
1732
        handler(Status::OK()); // Throws
2,044✔
1733
    }
2,044✔
1734
    while (!m_sync_completion_handlers.empty()) {
15,526✔
1735
        auto handler = std::move(m_sync_completion_handlers.back());
84✔
1736
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
84✔
1737
        m_sync_completion_handlers.pop_back();
84✔
1738
    }
84✔
1739

7,612✔
1740
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
15,442✔
1741
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
652✔
1742
                             m_flx_pending_mark_version);
652✔
1743
        auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
652✔
1744
        mutable_subs.update_state(SubscriptionSet::State::Complete);
652✔
1745
        mutable_subs.commit();
652✔
1746
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
652✔
1747
    }
652✔
1748

7,612✔
1749
    std::lock_guard lock{m_client.m_mutex};
15,442✔
1750
    if (m_staged_download_mark > m_reached_download_mark) {
15,442✔
1751
        m_reached_download_mark = m_staged_download_mark;
9,982✔
1752
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,982✔
1753
    }
9,982✔
1754
}
15,442✔
1755

1756

1757
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1758
{
894✔
1759
    REALM_ASSERT(!m_finalized);
894✔
1760
    m_suspended = true;
894✔
1761
    if (m_connection_state_change_listener) {
894✔
1762
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
894✔
1763
    }
894✔
1764
}
894✔
1765

1766

1767
void SessionWrapper::on_resumed()
1768
{
370✔
1769
    REALM_ASSERT(!m_finalized);
370✔
1770
    m_suspended = false;
370✔
1771
    if (m_connection_state_change_listener) {
370✔
1772
        ClientImpl::Connection& conn = m_sess->get_connection();
370✔
1773
        if (conn.get_state() != ConnectionState::disconnected) {
370✔
1774
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
370✔
1775
            if (conn.get_state() == ConnectionState::connected)
370✔
1776
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
370✔
1777
        }
370✔
1778
    }
370✔
1779
}
370✔
1780

1781

1782
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1783
                                                 const util::Optional<SessionErrorInfo>& error_info)
1784
{
10,860✔
1785
    if (m_connection_state_change_listener) {
10,860✔
1786
        if (!m_suspended)
10,850✔
1787
            m_connection_state_change_listener(state, error_info); // Throws
10,850✔
1788
    }
10,850✔
1789
}
10,860✔
1790

1791

1792
void SessionWrapper::report_progress()
1793
{
93,684✔
1794
    REALM_ASSERT(!m_finalized);
93,684✔
1795
    REALM_ASSERT(m_sess);
93,684✔
1796

47,194✔
1797
    if (!m_progress_handler)
93,684✔
1798
        return;
73,584✔
1799

8,734✔
1800
    std::uint_fast64_t downloaded_bytes = 0;
20,100✔
1801
    std::uint_fast64_t downloadable_bytes = 0;
20,100✔
1802
    std::uint_fast64_t uploaded_bytes = 0;
20,100✔
1803
    std::uint_fast64_t uploadable_bytes = 0;
20,100✔
1804
    std::uint_fast64_t snapshot_version = 0;
20,100✔
1805
    ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
20,100✔
1806
                                             uploadable_bytes, snapshot_version);
20,100✔
1807

8,734✔
1808
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
8,734✔
1809
    // is only the remaining to download. This is confusing, so make them use
8,734✔
1810
    // the same units.
8,734✔
1811
    std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
20,100✔
1812

8,734✔
1813
    m_sess->logger.debug("Progress handler called, downloaded = %1, "
20,100✔
1814
                         "downloadable(total) = %2, uploaded = %3, "
20,100✔
1815
                         "uploadable = %4, reliable_download_progress = %5, "
20,100✔
1816
                         "snapshot version = %6",
20,100✔
1817
                         downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
20,100✔
1818
                         m_reliable_download_progress, snapshot_version);
20,100✔
1819

8,734✔
1820
    // FIXME: Why is this boolean status communicated to the application as
8,734✔
1821
    // a 64-bit integer? Also, the name `progress_version` is confusing.
8,734✔
1822
    std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
15,056✔
1823
    m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
20,100✔
1824
                       snapshot_version);
20,100✔
1825
}
20,100✔
1826

1827
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1828
{
52✔
1829
    if (!m_sess) {
52✔
1830
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1831
    }
×
1832

26✔
1833
    return m_sess->send_test_command(std::move(body));
52✔
1834
}
52✔
1835

1836
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1837
{
292✔
1838
    REALM_ASSERT(!m_finalized);
292✔
1839

146✔
1840
    auto pending_reset = [&] {
292✔
1841
        auto ft = m_db->start_frozen();
292✔
1842
        return _impl::client_reset::has_pending_reset(ft);
292✔
1843
    }();
292✔
1844
    REALM_ASSERT(pending_reset);
292✔
1845
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
292✔
1846
                        pending_reset->time);
292✔
1847
    util::bind_ptr<SessionWrapper> self(this);
292✔
1848
    async_wait_for(true, true, [self = std::move(self), pending_reset = *pending_reset](Status status) {
292✔
1849
        if (status == ErrorCodes::OperationAborted) {
292✔
1850
            return;
152✔
1851
        }
152✔
1852
        auto& logger = self->m_sess->logger;
140✔
1853
        if (!status.is_ok()) {
140✔
1854
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1855
            return;
×
1856
        }
×
1857

70✔
1858
        auto wt = self->m_db->start_write();
140✔
1859
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(wt);
140✔
1860
        if (!cur_pending_reset) {
140✔
1861
            logger.debug(
×
1862
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1863
                pending_reset.type, pending_reset.time);
×
1864
            return;
×
1865
        }
×
1866
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
140✔
1867
            logger.debug(
×
1868
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
1869
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
1870
        }
×
1871
        else {
140✔
1872
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
140✔
1873
                         "Removing cycle detection tracker.",
140✔
1874
                         pending_reset.type, pending_reset.time);
140✔
1875
        }
140✔
1876
        _impl::client_reset::remove_pending_client_resets(wt);
140✔
1877
        wt->commit();
140✔
1878
    });
140✔
1879
}
292✔
1880

1881
std::string SessionWrapper::get_appservices_connection_id()
1882
{
72✔
1883
    auto pf = util::make_promise_future<std::string>();
72✔
1884
    REALM_ASSERT(m_initiated);
72✔
1885

36✔
1886
    util::bind_ptr<SessionWrapper> self(this);
72✔
1887
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
1888
        if (!status.is_ok()) {
72✔
1889
            promise.set_error(status);
×
1890
            return;
×
1891
        }
×
1892

36✔
1893
        if (!self->m_sess) {
72✔
1894
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1895
            return;
×
1896
        }
×
1897

36✔
1898
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1899
    });
72✔
1900

36✔
1901
    return pf.future.get();
72✔
1902
}
72✔
1903

1904
// ################ ClientImpl::Connection ################
1905

1906
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1907
                                   const std::string& authorization_header_name,
1908
                                   const std::map<std::string, std::string>& custom_http_headers,
1909
                                   bool verify_servers_ssl_certificate,
1910
                                   Optional<std::string> ssl_trust_certificate_path,
1911
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1912
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1913
    : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws
1914
    , logger{*logger_ptr}
1915
    , m_client{client}
1916
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1917
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1918
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1919
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1920
    , m_reconnect_info{reconnect_info}
1921
    , m_session_history{}
1922
    , m_ident{ident}
1923
    , m_server_endpoint{std::move(endpoint)}
1924
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1925
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1926
{
2,448✔
1927
    m_on_idle = m_client.create_trigger([this](Status status) {
2,450✔
1928
        if (status == ErrorCodes::OperationAborted)
2,450✔
1929
            return;
×
1930
        else if (!status.is_ok())
2,450✔
1931
            throw Exception(status);
×
1932

1,154✔
1933
        REALM_ASSERT(m_activated);
2,450✔
1934
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,450✔
1935
            on_idle(); // Throws
2,448✔
1936
            // Connection object may be destroyed now.
1,154✔
1937
        }
2,448✔
1938
    });
2,450✔
1939
}
2,448✔
1940

1941
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1942
{
12✔
1943
    return m_ident;
12✔
1944
}
12✔
1945

1946

1947
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1948
{
2,448✔
1949
    return m_server_endpoint;
2,448✔
1950
}
2,448✔
1951

1952
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1953
                                                        const std::string& signed_access_token)
1954
{
9,814✔
1955
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
9,814✔
1956
    m_signed_access_token = signed_access_token;           // Throws (copy)
9,814✔
1957
}
9,814✔
1958

1959

1960
void ClientImpl::Connection::resume_active_sessions()
1961
{
1,848✔
1962
    auto handler = [=](ClientImpl::Session& sess) {
3,692✔
1963
        sess.cancel_resumption_delay(); // Throws
3,692✔
1964
    };
3,692✔
1965
    for_each_active_session(std::move(handler)); // Throws
1,848✔
1966
}
1,848✔
1967

1968
void ClientImpl::Connection::on_idle()
1969
{
2,448✔
1970
    logger.debug("Destroying connection object");
2,448✔
1971
    ClientImpl& client = get_client();
2,448✔
1972
    client.remove_connection(*this);
2,448✔
1973
    // NOTE: This connection object is now destroyed!
1,154✔
1974
}
2,448✔
1975

1976

1977
std::string ClientImpl::Connection::get_http_request_path() const
1978
{
3,352✔
1979
    using namespace std::string_view_literals;
3,352✔
1980
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,352✔
1981

1,676✔
1982
    std::string path;
3,352✔
1983
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,352✔
1984
    path += m_http_request_path_prefix;
3,352✔
1985
    path += param;
3,352✔
1986
    path += m_signed_access_token;
3,352✔
1987

1,676✔
1988
    return path;
3,352✔
1989
}
3,352✔
1990

1991

1992
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1993
{
2,448✔
1994
    std::ostringstream out;
2,448✔
1995
    out.imbue(std::locale::classic());
2,448✔
1996
    out << "Connection[" << ident << "]: "; // Throws
2,448✔
1997
    return out.str();                       // Throws
2,448✔
1998
}
2,448✔
1999

2000

2001
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
2002
                                                            util::Optional<SessionErrorInfo> error_info)
2003
{
9,936✔
2004
    if (m_force_closed) {
9,936✔
2005
        return;
2,156✔
2006
    }
2,156✔
2007
    auto handler = [=](ClientImpl::Session& sess) {
10,786✔
2008
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
10,786✔
2009
        sess_2.on_connection_state_changed(state, error_info); // Throws
10,786✔
2010
    };
10,786✔
2011
    for_each_active_session(std::move(handler)); // Throws
7,780✔
2012
}
7,780✔
2013

2014

2015
Client::Client(Config config)
2016
    : m_impl{new ClientImpl{std::move(config)}} // Throws
2017
{
9,122✔
2018
}
9,122✔
2019

2020

2021
Client::Client(Client&& client) noexcept
2022
    : m_impl{std::move(client.m_impl)}
2023
{
×
2024
}
×
2025

2026

2027
Client::~Client() noexcept {}
9,122✔
2028

2029

2030
void Client::shutdown() noexcept
2031
{
9,200✔
2032
    m_impl->shutdown();
9,200✔
2033
}
9,200✔
2034

2035
void Client::shutdown_and_wait()
2036
{
760✔
2037
    m_impl->shutdown_and_wait();
760✔
2038
}
760✔
2039

2040
void Client::cancel_reconnect_delay()
2041
{
1,852✔
2042
    m_impl->cancel_reconnect_delay();
1,852✔
2043
}
1,852✔
2044

2045
void Client::voluntary_disconnect_all_connections()
2046
{
12✔
2047
    m_impl->voluntary_disconnect_all_connections();
12✔
2048
}
12✔
2049

2050
bool Client::wait_for_session_terminations_or_client_stopped()
2051
{
368✔
2052
    return m_impl.get()->wait_for_session_terminations_or_client_stopped();
368✔
2053
}
368✔
2054

2055

2056
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2057
                                  port_type& port, std::string& path) const
2058
{
3,322✔
2059
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
3,322✔
2060
}
3,322✔
2061

2062

2063
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2064
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2065
{
10,934✔
2066
    util::bind_ptr<SessionWrapper> sess;
10,934✔
2067
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,934✔
2068
                                  std::move(config)}); // Throws
10,934✔
2069
    // The reference count passed back to the application is implicitly
5,294✔
2070
    // owned by a naked pointer. This is done to avoid exposing
5,294✔
2071
    // implementation details through the header file (that is, through the
5,294✔
2072
    // Session object).
5,294✔
2073
    m_impl = sess.release();
10,934✔
2074
}
10,934✔
2075

2076

2077
void Session::set_sync_transact_callback(util::UniqueFunction<SyncTransactCallback> handler)
2078
{
3,330✔
2079
    m_impl->set_sync_transact_handler(std::move(handler)); // Throws
3,330✔
2080
}
3,330✔
2081

2082

2083
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2084
{
3,394✔
2085
    m_impl->set_progress_handler(std::move(handler)); // Throws
3,394✔
2086
}
3,394✔
2087

2088

2089
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2090
{
11,030✔
2091
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,030✔
2092
}
11,030✔
2093

2094

2095
void Session::bind()
2096
{
9,782✔
2097
    m_impl->initiate(); // Throws
9,782✔
2098
}
9,782✔
2099

2100

2101
void Session::nonsync_transact_notify(version_type new_version)
2102
{
41,900✔
2103
    m_impl->nonsync_transact_notify(new_version); // Throws
41,900✔
2104
}
41,900✔
2105

2106

2107
void Session::cancel_reconnect_delay()
2108
{
12✔
2109
    m_impl->cancel_reconnect_delay(); // Throws
12✔
2110
}
12✔
2111

2112

2113
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2114
{
4,102✔
2115
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,102✔
2116
}
4,102✔
2117

2118

2119
bool Session::wait_for_upload_complete_or_client_stopped()
2120
{
12,984✔
2121
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,984✔
2122
}
12,984✔
2123

2124

2125
bool Session::wait_for_download_complete_or_client_stopped()
2126
{
10,112✔
2127
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,112✔
2128
}
10,112✔
2129

2130

2131
void Session::refresh(const std::string& signed_access_token)
2132
{
208✔
2133
    m_impl->refresh(signed_access_token); // Throws
208✔
2134
}
208✔
2135

2136

2137
void Session::abandon() noexcept
2138
{
10,934✔
2139
    REALM_ASSERT(m_impl);
10,934✔
2140
    // Reabsorb the ownership assigned to the applications naked pointer by
5,294✔
2141
    // Session constructor
5,294✔
2142
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,934✔
2143
    SessionWrapper::abandon(std::move(wrapper));
10,934✔
2144
}
10,934✔
2145

2146
void Session::on_new_flx_sync_subscription(int64_t new_version)
2147
{
726✔
2148
    m_impl->on_new_flx_subscription_set(new_version);
726✔
2149
}
726✔
2150

2151
util::Future<std::string> Session::send_test_command(std::string body)
2152
{
52✔
2153
    return m_impl->send_test_command(std::move(body));
52✔
2154
}
52✔
2155

2156
std::string Session::get_appservices_connection_id()
2157
{
72✔
2158
    return m_impl->get_appservices_connection_id();
72✔
2159
}
72✔
2160

2161
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2162
{
×
2163
    switch (proxyType) {
×
2164
        case ProxyConfig::Type::HTTP:
×
2165
            return os << "HTTP";
×
2166
        case ProxyConfig::Type::HTTPS:
×
2167
            return os << "HTTPS";
×
2168
    }
×
2169
    REALM_TERMINATE("Invalid Proxy Type object.");
×
2170
}
×
2171

2172
} // namespace sync
2173
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc