• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_389

04 Jun 2024 02:21PM UTC coverage: 90.819% (-0.02%) from 90.843%
thomas.goyne_389

push

Evergreen

web-flow
RCORE-2063 Fix some client resets potentially failing with AutoClientResetFailed if a new client reset condition occurred before the first one completed (#7542)

* Updated PR 7542 with changes from master (and PR 7649)
* Ensure client reset failures get appropriate err info
* Updated changelog after release
* Updated changelog
* Removed unused SyncClientHookEvent entries

101700 of 180084 branches covered (56.47%)

60 of 67 new or added lines in 5 files covered. (89.55%)

266 existing lines in 18 files now uncovered.

214566 of 236256 relevant lines covered (90.82%)

5174800.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.74
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    void update_subscription_version_info();
108

109
    // Can be called from any thread.
110
    std::string get_appservices_connection_id();
111

112
    // Can be called from any thread, but inherently cannot be called
113
    // concurrently with calls to any of the other non-confined functions.
114
    bool mark_abandoned();
115

116
private:
117
    ClientImpl& m_client;
118
    DBRef m_db;
119
    Replication* m_replication;
120

121
    const ProtocolEnvelope m_protocol_envelope;
122
    const std::string m_server_address;
123
    const port_type m_server_port;
124
    const bool m_server_verified;
125
    const std::string m_user_id;
126
    const SyncServerMode m_sync_mode;
127
    const std::string m_authorization_header_name;
128
    const std::map<std::string, std::string> m_custom_http_headers;
129
    const bool m_verify_servers_ssl_certificate;
130
    const bool m_simulate_integration_error;
131
    const std::optional<std::string> m_ssl_trust_certificate_path;
132
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
133
    const size_t m_flx_bootstrap_batch_size_bytes;
134
    const std::string m_http_request_path_prefix;
135
    const std::string m_virt_path;
136
    const std::optional<ProxyConfig> m_proxy_config;
137

138
    // This one is different from null when, and only when the session wrapper
139
    // is in ClientImpl::m_abandoned_session_wrappers.
140
    SessionWrapper* m_next = nullptr;
141

142
    // These may only be accessed by the event loop thread.
143
    std::string m_signed_access_token;
144
    std::optional<ClientReset> m_client_reset_config;
145

146
    struct ReportedProgress {
147
        uint64_t snapshot = 0;
148
        uint64_t uploaded = 0;
149
        uint64_t uploadable = 0;
150
        uint64_t downloaded = 0;
151
        uint64_t downloadable = 0;
152
        uint64_t final_uploaded = 0;
153
        uint64_t final_downloaded = 0;
154
    } m_reported_progress;
155

23,318✔
156
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
23,318✔
157
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
23,318✔
158

23,318✔
159
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
160
    bool m_in_debug_hook = false;
161

162
    const SessionReason m_session_reason;
163

164
    const uint64_t m_schema_version;
165

166
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
167
    int64_t m_flx_active_version = 0;
168
    int64_t m_flx_last_seen_version = 0;
169
    int64_t m_flx_pending_mark_version = 0;
170
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
171

172
    std::shared_ptr<MigrationStore> m_migration_store;
173

174
    // Set to true when this session wrapper is actualized (i.e. the wrapped
175
    // session is created), or when the wrapper is finalized before actualization.
176
    // It is then never modified again.
177
    //
178
    // Actualization is scheduled during the construction of SessionWrapper, and
179
    // so a session specific post handler will always find that `m_actualized`
180
    // is true as the handler will always be run after the actualization job.
181
    // This holds even if the wrapper is finalized or closed before actualization.
182
    bool m_actualized = false;
183

184
    // Set to true when session deactivation is begun, either via force_close()
185
    // or finalize().
186
    bool m_closed = false;
187

188
    // Set to true in on_suspended() and then false in on_resumed(). Used to
189
    // suppress spurious connection state and error reporting while the session
190
    // is already in an error state.
191
    bool m_suspended = false;
192

193
    // Set when the session has been abandoned. After this point none of the
194
    // public API functions should be called again.
195
    bool m_abandoned = false;
196
    // Has the SessionWrapper been finalized?
197
    bool m_finalized = false;
198

199
    // Set to true when the first DOWNLOAD message is received to indicate that
200
    // the byte-level download progress parameters can be considered reasonable
201
    // reliable. Before that, a lot of time may have passed, so our record of
202
    // the download progress is likely completely out of date.
203
    bool m_reliable_download_progress = false;
204

205
    std::optional<double> m_download_estimate;
206
    std::optional<uint64_t> m_bootstrap_store_bytes;
207

208
    // Set to point to an activated session object during actualization of the
209
    // session wrapper. Set to null during finalization of the session
210
    // wrapper. Both modifications are guaranteed to be performed by the event
211
    // loop thread.
212
    //
213
    // If a session specific post handler, that is submitted after the
214
    // initiation of the session wrapper, sees that `m_sess` is null, it can
215
    // conclude that the session wrapper has either been force closed or has
216
    // been both abandoned and finalized.
217
    //
218
    // Must only be accessed from the event loop thread.
219
    SessionImpl* m_sess = nullptr;
220

221
    // These must only be accessed from the event loop thread.
222
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
223
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
224
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
225

226
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
227
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
228
    // event loop thread.
229
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
230
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
231
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
232

233
    void on_upload_progress(bool only_if_new_uploadable_data = false);
234
    void on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes = {});
235
    void on_upload_completion();
236
    void on_download_completion();
237
    void on_suspended(const SessionErrorInfo& error_info);
238
    void on_resumed();
239
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
240
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
241
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
242
    void on_flx_sync_version_complete(int64_t version);
243

244
    void init_progress_handler();
245
    // only_if_new_uploadable_data can be true only if is_download is false
246
    void report_progress(bool is_download, bool only_if_new_uploadable_data = false);
247

248
    friend class SessionWrapperStack;
249
    friend class ClientImpl::Session;
250
};
251

252

253
// ################ SessionWrapperStack ################
254

255
inline bool SessionWrapperStack::empty() const noexcept
256
{
257
    return !m_back;
258
}
19,812✔
259

19,812✔
260

19,812✔
261
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
262
{
263
    REALM_ASSERT(!w->m_next);
264
    w->m_next = m_back;
20,136✔
265
    m_back = w.release();
20,136✔
266
}
20,136✔
267

20,136✔
268

20,136✔
269
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
270
{
271
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
272
    if (m_back) {
59,546✔
273
        m_back = m_back->m_next;
59,546✔
274
        w->m_next = nullptr;
59,546✔
275
    }
20,056✔
276
    return w;
20,056✔
277
}
20,056✔
278

59,546✔
279

59,546✔
280
inline void SessionWrapperStack::clear() noexcept
281
{
282
    while (m_back) {
283
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
19,812✔
284
        m_back = w->m_next;
19,812✔
285
    }
×
UNCOV
286
}
×
UNCOV
287

×
288

19,812✔
289
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
290
{
291
    SessionWrapper** p = &m_back;
292
    while (*p && *p != w) {
10,100✔
293
        p = &(*p)->m_next;
10,100✔
294
    }
10,222✔
295
    if (!*p) {
122✔
296
        return false;
122✔
297
    }
10,100✔
298
    *p = w->m_next;
10,032✔
299
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
10,032✔
300
    return true;
68✔
301
}
68✔
302

68✔
303

10,100✔
304
SessionWrapperStack::~SessionWrapperStack()
305
{
306
    clear();
307
}
19,812✔
308

19,812✔
309

19,812✔
310
// ################ ClientImpl ################
311

312
ClientImpl::~ClientImpl()
313
{
314
    // Since no other thread is allowed to be accessing this client or any of
315
    // its subobjects at this time, no mutex locking is necessary.
9,918✔
316

317
    shutdown_and_wait();
318
    // Session wrappers are removed from m_unactualized_session_wrappers as they
319
    // are abandoned.
9,918✔
320
    REALM_ASSERT(m_stopped);
321
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
322
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
9,918✔
323
}
9,918✔
324

9,918✔
325

9,918✔
326
void ClientImpl::cancel_reconnect_delay()
327
{
328
    // Thread safety required
329
    post([this] {
1,896✔
330
        for (auto& p : m_server_slots) {
331
            ServerSlot& slot = p.second;
1,896✔
332
            if (m_one_connection_per_session) {
1,896✔
333
                REALM_ASSERT(!slot.connection);
1,896✔
334
                for (const auto& p : slot.alt_connections) {
1,896✔
335
                    ClientImpl::Connection& conn = *p.second;
×
336
                    conn.resume_active_sessions(); // Throws
×
337
                    conn.cancel_reconnect_delay(); // Throws
×
338
                }
×
339
            }
×
UNCOV
340
            else {
×
UNCOV
341
                REALM_ASSERT(slot.alt_connections.empty());
×
342
                if (slot.connection) {
1,896✔
343
                    ClientImpl::Connection& conn = *slot.connection;
1,896✔
344
                    conn.resume_active_sessions(); // Throws
1,896✔
345
                    conn.cancel_reconnect_delay(); // Throws
1,892✔
346
                }
1,892✔
347
                else {
1,892✔
348
                    slot.reconnect_info.reset();
1,892✔
349
                }
4✔
350
            }
4✔
351
        }
4✔
352
    }); // Throws
1,896✔
353
}
1,896✔
354

1,896✔
355

1,896✔
356
void ClientImpl::voluntary_disconnect_all_connections()
357
{
358
    auto done_pf = util::make_promise_future<void>();
359
    post([this, promise = std::move(done_pf.promise)]() mutable {
12✔
360
        try {
12✔
361
            for (auto& p : m_server_slots) {
12✔
362
                ServerSlot& slot = p.second;
12✔
363
                if (m_one_connection_per_session) {
12✔
364
                    REALM_ASSERT(!slot.connection);
12✔
365
                    for (const auto& [_, conn] : slot.alt_connections) {
12✔
366
                        conn->voluntary_disconnect();
×
367
                    }
×
368
                }
×
UNCOV
369
                else {
×
UNCOV
370
                    REALM_ASSERT(slot.alt_connections.empty());
×
371
                    if (slot.connection) {
12✔
372
                        slot.connection->voluntary_disconnect();
12✔
373
                    }
12✔
374
                }
12✔
375
            }
12✔
376
        }
12✔
377
        catch (...) {
12✔
378
            promise.set_error(exception_to_status());
12✔
379
            return;
12✔
380
        }
×
UNCOV
381
        promise.emplace_value();
×
UNCOV
382
    });
×
383
    done_pf.future.get();
12✔
384
}
12✔
385

12✔
386

12✔
387
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
388
{
389
    // Thread safety required
390

9,512✔
391
    {
392
        util::CheckedLockGuard lock{m_mutex};
393
        m_sessions_terminated = false;
9,512✔
394
    }
9,512✔
395

9,512✔
396
    // The technique employed here relies on the fact that
9,512✔
397
    // actualize_and_finalize_session_wrappers() must get to execute at least
398
    // once before the post handler submitted below gets to execute, but still
399
    // at a time where all session wrappers, that are abandoned prior to the
400
    // execution of wait_for_session_terminations_or_client_stopped(), have been
401
    // added to `m_abandoned_session_wrappers`.
402
    //
403
    // To see that this is the case, consider a session wrapper that was
404
    // abandoned before wait_for_session_terminations_or_client_stopped() was
405
    // invoked. Then the session wrapper will have been added to
406
    // `m_abandoned_session_wrappers`, and an invocation of
407
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
408
    // guarantees mentioned in the documentation of Trigger then ensure
409
    // that at least one execution of actualize_and_finalize_session_wrappers()
410
    // will happen after the session wrapper has been added to
411
    // `m_abandoned_session_wrappers`, but before the post handler submitted
412
    // below gets to execute.
413
    post([this] {
414
        {
415
            util::CheckedLockGuard lock{m_mutex};
9,512✔
416
            m_sessions_terminated = true;
9,512✔
417
        }
9,512✔
418
        m_wait_or_client_stopped_cond.notify_all();
9,512✔
419
    }); // Throws
9,512✔
420

9,512✔
421
    bool completion_condition_was_satisfied;
9,512✔
422
    {
423
        util::CheckedUniqueLock lock{m_mutex};
9,512✔
424
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
9,512✔
425
            return m_sessions_terminated || m_stopped;
9,512✔
426
        });
19,006✔
427
        completion_condition_was_satisfied = !m_stopped;
19,006✔
428
    }
19,006✔
429
    return completion_condition_was_satisfied;
9,512✔
430
}
9,512✔
431

9,512✔
432

9,512✔
433
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
434
util::Future<void> ClientImpl::notify_session_terminated()
435
{
436
    auto pf = util::make_promise_future<void>();
437
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
438
        // Includes operation_aborted
56✔
439
        if (!status.is_ok()) {
56✔
440
            promise.set_error(status);
441
            return;
56✔
442
        }
×
UNCOV
443

×
UNCOV
444
        promise.emplace_value();
×
445
    });
446

56✔
447
    return std::move(pf.future);
56✔
448
}
449

56✔
450
void ClientImpl::drain_connections_on_loop()
56✔
451
{
452
    post([this](Status status) {
453
        REALM_ASSERT(status.is_ok());
9,918✔
454
        drain_connections();
9,918✔
455
    });
9,906✔
456
}
9,906✔
457

9,906✔
458
void ClientImpl::shutdown_and_wait()
9,918✔
459
{
460
    shutdown();
461
    util::CheckedUniqueLock lock{m_drain_mutex};
10,686✔
462
    if (m_drained) {
10,686✔
463
        return;
10,686✔
464
    }
10,686✔
465

768✔
466
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
768✔
467
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
468
        return m_num_connections == 0 && m_outstanding_posts == 0;
9,918✔
469
    });
15,824✔
470

15,824✔
471
    m_drained = true;
15,824✔
472
}
473

9,918✔
474
void ClientImpl::shutdown() noexcept
9,918✔
475
{
476
    {
477
        util::CheckedLockGuard lock{m_mutex};
20,686✔
478
        if (m_stopped)
20,686✔
479
            return;
20,686✔
480
        m_stopped = true;
20,686✔
481
    }
10,768✔
482
    m_wait_or_client_stopped_cond.notify_all();
9,918✔
483

9,918✔
UNCOV
484
    drain_connections_on_loop();
×
485
}
486

9,918✔
487

9,918✔
488
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
489
{
490
    // Thread safety required.
491
    {
10,104✔
492
        util::CheckedLockGuard lock{m_mutex};
493
        // We can't actualize the session if we've already been stopped, so
10,104✔
494
        // just finalize it immediately.
10,104✔
495
        if (m_stopped) {
496
            wrapper->finalize_before_actualization();
497
            return;
10,104✔
498
        }
×
UNCOV
499

×
UNCOV
500
        REALM_ASSERT(m_actualize_and_finalize);
×
501
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
502
    }
10,104✔
503
    m_actualize_and_finalize->trigger();
10,104✔
504
}
10,104✔
UNCOV
505

×
506

10,104✔
507
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
508
{
509
    // Thread safety required.
510
    {
10,104✔
511
        util::CheckedLockGuard lock{m_mutex};
512
        REALM_ASSERT(m_actualize_and_finalize);
10,104✔
513
        // The wrapper may have already been finalized before being abandoned
10,104✔
514
        // if we were stopped when it was created.
10,104✔
515
        if (wrapper->mark_abandoned())
516
            return;
517

10,104✔
518
        // If the session wrapper has not yet been actualized (on the event loop
4✔
519
        // thread), it can be immediately finalized. This ensures that we will
520
        // generally not actualize a session wrapper that has already been
521
        // abandoned.
522
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
523
            wrapper->finalize_before_actualization();
524
            return;
10,100✔
525
        }
68✔
526
        m_abandoned_session_wrappers.push(std::move(wrapper));
68✔
527
    }
68✔
528
    m_actualize_and_finalize->trigger();
10,032✔
529
}
10,032✔
UNCOV
530

×
531

10,032✔
532
// Must be called from the event loop thread.
533
void ClientImpl::actualize_and_finalize_session_wrappers()
534
{
535
    // We need to pop from the wrapper stacks while holding the lock to ensure
536
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
14,730✔
537
    // release the lock before finalizing or actualizing because those functions
538
    // invoke user callbacks which may try to access the client and reacquire
539
    // the lock.
540
    //
541
    // Finalization must always happen before actualization because we may be
542
    // finalizing and actualizing sessions for the same Realm file, and
543
    // actualizing first would result in overlapping sessions. Because we're
544
    // releasing the lock new sessions may come in as we're looping, so we need
545
    // a single loop that checks both fields.
546
    while (true) {
547
        bool finalize = true;
548
        bool stopped;
34,784✔
549
        util::bind_ptr<SessionWrapper> wrapper;
34,782✔
550
        {
34,782✔
551
            util::CheckedLockGuard lock{m_mutex};
34,782✔
552
            wrapper = m_abandoned_session_wrappers.pop();
34,782✔
553
            if (!wrapper) {
34,782✔
554
                wrapper = m_unactualized_session_wrappers.pop();
34,782✔
555
                finalize = false;
34,782✔
556
            }
24,764✔
557
            stopped = m_stopped;
24,764✔
558
        }
24,764✔
559
        if (!wrapper)
34,782✔
560
            break;
34,782✔
561
        if (finalize)
34,782✔
562
            wrapper->finalize(); // Throws
14,728✔
563
        else if (stopped)
20,054✔
564
            wrapper->finalize_before_actualization();
10,020✔
565
        else
10,034✔
566
            wrapper->actualize(); // Throws
4✔
567
    }
10,030✔
568
}
10,030✔
569

20,054✔
570

14,730✔
571
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
572
                                                   const std::string& authorization_header_name,
573
                                                   const std::map<std::string, std::string>& custom_http_headers,
574
                                                   bool verify_servers_ssl_certificate,
575
                                                   Optional<std::string> ssl_trust_certificate_path,
576
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
577
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
578
{
579
    auto&& [server_slot_it, inserted] =
580
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,028✔
581
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,028✔
582

10,028✔
583
    // TODO: enable multiplexing with proxies
10,028✔
584
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
585
        // Use preexisting connection
586
        REALM_ASSERT(server_slot.alt_connections.empty());
10,028✔
587
        return *server_slot.connection;
588
    }
7,252✔
589

7,252✔
590
    // Create a new connection
7,252✔
591
    REALM_ASSERT(!server_slot.connection);
592
    connection_ident_type ident = m_prev_connection_ident + 1;
593
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,776✔
594
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,776✔
595
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,776✔
596
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,776✔
597
    ClientImpl::Connection& conn = *conn_2;
2,776✔
598
    if (!m_one_connection_per_session) {
2,776✔
599
        server_slot.connection = std::move(conn_2);
2,776✔
600
    }
2,776✔
601
    else {
2,760✔
602
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
2,760✔
603
    }
16✔
604
    m_prev_connection_ident = ident;
16✔
605
    was_created = true;
16✔
606
    {
2,776✔
607
        util::CheckedLockGuard lk(m_drain_mutex);
2,776✔
608
        ++m_num_connections;
2,776✔
609
    }
2,776✔
610
    return conn;
2,776✔
611
}
2,776✔
612

2,776✔
613

10,028✔
614
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
615
{
616
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
617
    auto i = m_server_slots.find(endpoint);
2,764✔
618
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,764✔
619
    ServerSlot& server_slot = i->second;
2,764✔
620
    if (!m_one_connection_per_session) {
2,764✔
621
        REALM_ASSERT(server_slot.alt_connections.empty());
2,764✔
622
        REALM_ASSERT(&*server_slot.connection == &conn);
2,764✔
623
        server_slot.reconnect_info = conn.get_reconnect_info();
2,752✔
624
        server_slot.connection.reset();
2,752✔
625
    }
2,752✔
626
    else {
2,752✔
627
        REALM_ASSERT(!server_slot.connection);
2,752✔
628
        connection_ident_type ident = conn.get_ident();
12✔
629
        auto j = server_slot.alt_connections.find(ident);
12✔
630
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
631
        REALM_ASSERT(&*j->second == &conn);
12✔
632
        server_slot.alt_connections.erase(j);
12✔
633
    }
12✔
634

12✔
635
    bool notify;
12✔
636
    {
637
        util::CheckedLockGuard lk(m_drain_mutex);
2,764✔
638
        REALM_ASSERT(m_num_connections);
2,764✔
639
        notify = --m_num_connections <= 0;
2,764✔
640
    }
2,764✔
641
    if (notify) {
2,764✔
642
        m_drain_cv.notify_all();
2,764✔
643
    }
2,764✔
644
}
2,152✔
645

2,152✔
646

2,764✔
647
// ################ SessionImpl ################
648

649
void SessionImpl::force_close()
650
{
651
    // Allow force_close() if session is active or hasn't been activated yet.
652
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102✔
653
        m_wrapper.force_close();
654
    }
102!
655
}
102✔
656

102✔
657
void SessionImpl::on_connection_state_changed(ConnectionState state,
102✔
658
                                              const std::optional<SessionErrorInfo>& error_info)
659
{
660
    // Only used to report errors back to the SyncSession while the Session is active
661
    if (m_state == SessionImpl::Active) {
11,840✔
662
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
663
    }
11,840✔
664
}
11,840✔
665

11,840✔
666

11,840✔
667
const std::string& SessionImpl::get_virt_path() const noexcept
668
{
669
    // Can only be called if the session is active or being activated
670
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,356✔
671
    return m_wrapper.m_virt_path;
672
}
7,356✔
673

7,356✔
674
const std::string& SessionImpl::get_realm_path() const noexcept
7,356✔
675
{
676
    // Can only be called if the session is active or being activated
677
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,324✔
678
    return m_wrapper.m_db->get_path();
679
}
10,324✔
680

10,324✔
681
DBRef SessionImpl::get_db() const noexcept
10,324✔
682
{
683
    // Can only be called if the session is active or being activated
684
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
26,126✔
685
    return m_wrapper.m_db;
686
}
26,126✔
687

26,126✔
688
ClientReplication& SessionImpl::get_repl() const noexcept
26,126✔
689
{
690
    // Can only be called if the session is active or being activated
691
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
119,536✔
692
    return m_wrapper.get_replication();
693
}
119,536✔
694

119,536✔
695
ClientHistory& SessionImpl::get_history() const noexcept
119,536✔
696
{
697
    return get_repl().get_history();
698
}
117,592✔
699

117,592✔
700
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
117,592✔
701
{
702
    // Can only be called if the session is active or being activated
703
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
17,360✔
704
    return m_wrapper.m_client_reset_config;
705
}
17,360✔
706

17,360✔
707
SessionReason SessionImpl::get_session_reason() noexcept
17,360✔
708
{
709
    // Can only be called if the session is active or being activated
710
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,468✔
711
    return m_wrapper.m_session_reason;
712
}
1,468!
713

1,468✔
714
uint64_t SessionImpl::get_schema_version() noexcept
1,468✔
715
{
716
    // Can only be called if the session is active or being activated
717
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,468✔
718
    return m_wrapper.m_schema_version;
719
}
1,468✔
720

1,468✔
721
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
1,468✔
722
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
723
{
724
    // Ignore the call if the session is not active
725
    if (m_state != State::Active) {
45,324✔
726
        return;
727
    }
45,324✔
UNCOV
728

×
UNCOV
729
    try {
×
730
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
731
        if (simulate_integration_error) {
45,324✔
732
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
45,324!
733
        }
45,324✔
UNCOV
734
        version_type client_version;
×
UNCOV
735
        if (REALM_LIKELY(!get_client().is_dry_run())) {
×
736
            VersionInfo version_info;
45,324✔
737
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
45,324✔
738
            client_version = version_info.realm_version;
45,324✔
739
        }
45,324✔
740
        else {
45,324✔
741
            // Fake it for "dry run" mode
45,324✔
UNCOV
742
            client_version = m_last_version_available + 1;
×
743
        }
UNCOV
744
        on_changesets_integrated(client_version, progress, !changesets.empty()); // Throws
×
UNCOV
745
    }
×
746
    catch (const IntegrationException& e) {
45,324✔
747
        on_integration_failure(e);
45,324✔
748
    }
45,324✔
749
}
24✔
750

24✔
751

45,324✔
752
void SessionImpl::on_upload_completion()
753
{
754
    // Ignore the call if the session is not active
755
    if (m_state == State::Active) {
14,798✔
756
        m_wrapper.on_upload_completion(); // Throws
757
    }
14,798✔
758
}
14,798✔
759

14,798✔
760

14,798✔
761
void SessionImpl::on_download_completion()
762
{
763
    // Ignore the call if the session is not active
764
    if (m_state == State::Active) {
16,110✔
765
        m_wrapper.on_download_completion(); // Throws
766
    }
16,110✔
767
}
16,110✔
768

16,110✔
769

16,110✔
770
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
771
{
772
    // Ignore the call if the session is not active
773
    if (m_state == State::Active) {
668✔
774
        m_wrapper.on_suspended(error_info); // Throws
775
    }
668✔
776
}
668✔
777

668✔
778

668✔
779
void SessionImpl::on_resumed()
780
{
781
    // Ignore the call if the session is not active
782
    if (m_state == State::Active) {
62✔
783
        m_wrapper.on_resumed(); // Throws
784
    }
62✔
785
}
62✔
786

62✔
787
void SessionImpl::handle_pending_client_reset_acknowledgement()
62✔
788
{
789
    // Ignore the call if the session is not active
790
    if (m_state == State::Active) {
10,324✔
791
        m_wrapper.handle_pending_client_reset_acknowledgement();
792
    }
10,324✔
793
}
10,324✔
794

10,324✔
795
void SessionImpl::update_subscription_version_info()
10,324✔
796
{
797
    // Ignore the call if the session is not active
798
    if (m_state == State::Active) {
296✔
799
        m_wrapper.update_subscription_version_info();
800
    }
296✔
801
}
296✔
802

296✔
803
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
296✔
804
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
805
{
806
    // Ignore the call if the session is not active
807
    if (m_state != State::Active) {
47,436✔
808
        return false;
809
    }
47,436✔
UNCOV
810

×
UNCOV
811
    if (is_steady_state_download_message(batch_state, query_version)) {
×
812
        return false;
813
    }
47,436✔
814

45,324✔
815
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
45,324✔
816
    std::optional<SyncProgress> maybe_progress;
817
    if (batch_state == DownloadBatchState::LastInBatch) {
2,112✔
818
        maybe_progress = progress;
2,112✔
819
    }
2,112✔
820

1,932✔
821
    bool new_batch = false;
1,932✔
822
    try {
823
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
2,112✔
824
    }
2,112✔
825
    catch (const LogicError& ex) {
2,112✔
826
        if (ex.code() == ErrorCodes::LimitExceeded) {
2,112✔
827
            IntegrationException ex(ErrorCodes::LimitExceeded,
2,112✔
828
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
829
                                    ProtocolError::bad_changeset_size);
×
830
            on_integration_failure(ex);
×
831
            return true;
×
832
        }
×
833
        throw;
×
834
    }
×
UNCOV
835

×
UNCOV
836
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
×
837
    // bootstrapping.
838
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
839
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
840
    }
2,116✔
841

48✔
842
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
48✔
843
                                       batch_state, received_changesets.size());
844
    if (hook_action == SyncClientHookAction::EarlyReturn) {
2,116✔
845
        return true;
2,116✔
846
    }
2,116✔
847
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
12✔
848

12✔
849
    if (batch_state == DownloadBatchState::MoreToCome) {
2,104✔
850
        notify_download_progress(bootstrap_store->pending_stats().pending_changeset_bytes);
851
        return true;
2,104✔
852
    }
180✔
853
    else {
180✔
854
        // FIXME (#7451) this variable is not needed in principle, and bootstrap store bytes could be passed just
180✔
855
        // through notify_download_progress, but since it is needed in report_progress, and it is also called on
856
        // upload progress for now until progress is reported separately. As soon as we understand here that there
1,924✔
857
        // are no more changesets for bootstrap store, and we want to process bootstrap, we don't need to notify
1,924✔
858
        // intermediate progress - so reset these bytes to not accidentally double report them.
1,924✔
859
        m_wrapper.m_bootstrap_store_bytes.reset();
1,924✔
860
    }
12✔
861

12✔
862
    try {
1,924✔
UNCOV
863
        process_pending_flx_bootstrap();
×
UNCOV
864
    }
×
865
    catch (const IntegrationException& e) {
866
        on_integration_failure(e);
1,924✔
867
    }
1,924✔
868
    catch (...) {
869
        on_integration_failure(IntegrationException(exception_to_status()));
870
    }
871

11,952✔
872
    return true;
873
}
11,952✔
874

8,534✔
875

8,534✔
876
void SessionImpl::process_pending_flx_bootstrap()
877
{
3,418✔
878
    // Ignore the call if not a flx session or session is not active
3,418✔
879
    if (!m_is_flx_sync_session || m_state != State::Active) {
3,418✔
880
        return;
1,474✔
881
    }
1,474✔
882
    // Should never be called if session is not active
883
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
1,944✔
884
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,944✔
885
    if (!bootstrap_store->has_pending()) {
1,944✔
886
        return;
1,944✔
887
    }
1,944✔
888

1,944✔
889
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,944✔
890
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,944✔
891
                "changeset size: %3)",
1,944✔
892
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,944✔
893
                pending_batch_stats.pending_changeset_bytes);
894
    auto& history = get_repl().get_history();
895
    VersionInfo new_version;
1,944✔
896
    SyncProgress progress;
4,028✔
897
    int64_t query_version = -1;
2,096✔
898
    size_t changesets_processed = 0;
2,096✔
899

2,096✔
900
    // Used to commit each batch after it was transformed.
8✔
901
    TransactionRef transact = get_db()->start_write();
902
    while (bootstrap_store->has_pending()) {
903
        auto start_time = std::chrono::steady_clock::now();
8✔
904
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
8✔
905
        if (!pending_batch.progress) {
8✔
906
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
907
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
908
            // bootstrap store requires a write transaction itself.
2,088✔
909
            transact->close();
2,088✔
910
            bootstrap_store->clear();
2,088✔
911
            return;
2,088✔
912
        }
2,088✔
913

2,088✔
914
        auto batch_state =
2,088✔
915
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
4✔
916
        uint64_t downloadable_bytes = 0;
4✔
917
        query_version = pending_batch.query_version;
918
        bool simulate_integration_error =
2,084✔
919
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,084✔
920
        if (simulate_integration_error) {
921
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
2,084✔
922
        }
2,084✔
923

2,084✔
924
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,072✔
925
                        batch_state, pending_batch.changesets.size());
2,072✔
926

2,072✔
927
        history.integrate_server_changesets(
2,084✔
928
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
2,084✔
929
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,084✔
930
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
931
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,084✔
932
            });
2,084✔
933
        progress = *pending_batch.progress;
2,084✔
934
        changesets_processed += pending_batch.changesets.size();
935
        auto duration = std::chrono::steady_clock::now() - start_time;
2,084✔
936

2,084✔
937
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,084✔
938
                                      batch_state, pending_batch.changesets.size());
2,084✔
939
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,084✔
940

2,084✔
941
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
942
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,932✔
943
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,932✔
944
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
945
                    pending_batch.remaining_changesets);
1,932✔
946
    }
1,932✔
947

1,932✔
948
    REALM_ASSERT_3(query_version, !=, -1);
949
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,932✔
950

1,932✔
951
    on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
952
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
953
                                  DownloadBatchState::LastInBatch, changesets_processed);
20✔
954
    // NoAction/EarlyReturn are both valid no-op actions to take here.
955
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
20✔
956
}
20✔
957

20✔
958
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
20✔
959
{
960
    // Ignore the call if the session is not active
961
    if (m_state == State::Active) {
1,968✔
962
        m_wrapper.on_flx_sync_error(version, err_msg);
963
    }
1,968✔
964
}
1,968✔
965

1,968✔
966
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
1,968✔
967
{
968
    // Ignore the call if the session is not active
969
    if (m_state == State::Active) {
18,144✔
970
        m_wrapper.on_flx_sync_progress(version, batch_state);
971
    }
18,144✔
972
}
18,144✔
973

18,144✔
974
SubscriptionStore* SessionImpl::get_flx_subscription_store()
975
{
976
    // Should never be called if session is not active
67,344✔
977
    REALM_ASSERT_EX(m_state == State::Active, m_state);
978
    return m_wrapper.get_flx_subscription_store();
67,344✔
979
}
67,344✔
980

67,344✔
981
MigrationStore* SessionImpl::get_migration_store()
982
{
983
    // Should never be called if session is not active
300✔
984
    REALM_ASSERT_EX(m_state == State::Active, m_state);
985
    return m_wrapper.get_migration_store();
300✔
986
}
300✔
987

300✔
988
void SessionImpl::on_flx_sync_version_complete(int64_t version)
300✔
989
{
990
    // Ignore the call if the session is not active
991
    if (m_state == State::Active) {
2,500✔
992
        m_wrapper.on_flx_sync_version_complete(version);
993
    }
2,500✔
994
}
995

996
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
2,500✔
997
{
24✔
998
    // Should never be called if session is not active
24✔
999
    REALM_ASSERT_EX(m_state == State::Active, m_state);
2,476✔
1000

2,476✔
1001
    // Make sure we don't call the debug hook recursively.
2,476✔
1002
    if (m_wrapper.m_in_debug_hook) {
2,476✔
1003
        return SyncClientHookAction::NoAction;
1004
    }
2,476✔
1005
    m_wrapper.m_in_debug_hook = true;
2,476✔
1006
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
12✔
1007
        m_wrapper.m_in_debug_hook = false;
12✔
1008
    });
12✔
1009

1010
    auto action = m_wrapper.m_debug_hook(data);
12✔
1011
    switch (action) {
12✔
1012
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
UNCOV
1013
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
×
1014
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
24✔
1015

24✔
1016
            auto err_processing_err = receive_error_message(err_info);
24✔
UNCOV
1017
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
×
1018
            return SyncClientHookAction::EarlyReturn;
2,432✔
1019
        }
2,432✔
1020
        case realm::SyncClientHookAction::TriggerReconnect: {
2,476✔
1021
            get_connection().voluntary_disconnect();
2,476✔
1022
            return SyncClientHookAction::EarlyReturn;
1023
        }
1024
        default:
1025
            return action;
1026
    }
120,120✔
1027
}
120,120✔
1028

117,816✔
1029
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
117,816✔
1030
                                                  int64_t query_version, DownloadBatchState batch_state,
2,304✔
UNCOV
1031
                                                  size_t num_changesets)
×
UNCOV
1032
{
×
1033
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1034
        return SyncClientHookAction::NoAction;
2,304✔
1035
    }
2,304✔
1036
    if (REALM_UNLIKELY(m_state != State::Active)) {
2,304✔
1037
        return SyncClientHookAction::NoAction;
2,304✔
1038
    }
2,304✔
1039

2,304✔
1040
    SyncClientHookData data;
1041
    data.event = event;
2,304✔
1042
    data.batch_state = batch_state;
2,304✔
1043
    data.progress = progress;
1044
    data.num_changesets = num_changesets;
1045
    data.query_version = query_version;
1,378✔
1046

1,378✔
1047
    return call_debug_hook(data);
1,178✔
1048
}
1,178✔
1049

200✔
UNCOV
1050
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
×
UNCOV
1051
{
×
1052
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1053
        return SyncClientHookAction::NoAction;
200✔
1054
    }
200✔
1055
    if (REALM_UNLIKELY(m_state != State::Active)) {
200✔
1056
        return SyncClientHookAction::NoAction;
200✔
1057
    }
200✔
1058

200✔
1059
    SyncClientHookData data;
200✔
1060
    data.event = event;
1061
    data.batch_state = DownloadBatchState::SteadyState;
200✔
1062
    data.progress = m_progress;
200✔
1063
    data.num_changesets = 0;
1064
    data.query_version = 0;
1065
    data.error_info = &error_info;
19,148✔
1066

19,148✔
1067
    return call_debug_hook(data);
19,148✔
1068
}
1069

1070
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
94,896✔
1071
{
1072
    return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
94,896✔
1073
}
94,896✔
1074

45,320✔
1075
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
45,320✔
1076
{
1077
    // Should never be called if session is not active
49,576✔
1078
    REALM_ASSERT_EX(m_state == State::Active, m_state);
44,020✔
1079
    if (batch_state == DownloadBatchState::SteadyState) {
44,020✔
1080
        return true;
1081
    }
1082

5,556✔
1083
    if (!m_is_flx_sync_session) {
1,312✔
1084
        return true;
1,312✔
1085
    }
1086

4,244✔
1087
    // If this is a steady state DOWNLOAD, no need for special handling.
5,556✔
1088
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
1089
        return true;
1090
    }
296✔
1091

296✔
1092
    return false;
296✔
1093
}
296✔
1094

1095
void SessionImpl::init_progress_handler()
1096
{
45,846✔
1097
    if (m_state != State::Unactivated && m_state != State::Active)
45,846✔
1098
        return;
45,846✔
1099

1100
    m_wrapper.init_progress_handler();
1101
}
47,400✔
1102

47,400✔
UNCOV
1103
void SessionImpl::enable_progress_notifications()
×
1104
{
1105
    m_wrapper.m_reliable_download_progress = true;
47,400✔
1106
}
47,400✔
1107

1108
void SessionImpl::notify_upload_progress()
1109
{
60✔
1110
    if (m_state != State::Active)
60✔
1111
        return;
×
UNCOV
1112

×
1113
    m_wrapper.on_upload_progress();
1114
}
60✔
1115

60✔
1116
void SessionImpl::update_download_estimate(double download_estimate)
60✔
1117
{
4✔
1118
    if (m_state != State::Active)
4✔
1119
        return;
4✔
1120

56✔
UNCOV
1121
    m_wrapper.m_download_estimate = download_estimate;
×
UNCOV
1122
}
×
1123

56✔
1124
void SessionImpl::notify_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
60✔
1125
{
4✔
1126
    if (m_state != State::Active)
4✔
1127
        return;
1128

52✔
1129
    m_wrapper.on_download_progress(bootstrap_store_bytes); // Throws
52✔
1130
}
1131

52✔
UNCOV
1132
util::Future<std::string> SessionImpl::send_test_command(std::string body)
×
UNCOV
1133
{
×
UNCOV
1134
    if (m_state != State::Active) {
×
1135
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
1136
    }
52✔
1137

52✔
1138
    try {
52✔
1139
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
52✔
1140
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
1141
            return Status{ErrorCodes::LogicError,
52✔
1142
                          "Must supply command name in \"command\" field of test command json object"};
60✔
1143
        }
1144
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
1145
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
1146
        }
1147
    }
1148
    catch (const nlohmann::json::parse_error& e) {
1149
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
1150
    }
1151

4,880✔
1152
    auto pf = util::make_promise_future<std::string>();
4,880✔
1153
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
4,880✔
1154
        // Includes operation_aborted
4,880✔
1155
        if (!status.is_ok()) {
4,880✔
1156
            promise.set_error(status);
4,880✔
1157
            return;
4,880✔
1158
        }
4,880✔
1159

4,880✔
1160
        auto id = ++m_last_pending_test_command_ident;
4,880✔
1161
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
4,880✔
1162
        ensure_enlisted_to_send();
4,880✔
1163
    });
4,880✔
1164

4,880✔
1165
    return std::move(pf.future);
4,880✔
1166
}
4,880✔
1167

4,880✔
1168
// ################ SessionWrapper ################
4,880✔
1169

4,880✔
1170
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
4,880✔
1171
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
4,880✔
1172
// the ClientImpl::Connection that owns the ClientImpl::Session.
4,880✔
1173
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
4,880✔
1174
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
4,880✔
1175
    : m_client{client}
4,880✔
1176
    , m_db(std::move(db))
4,880✔
1177
    , m_replication(m_db->get_replication())
4,880✔
1178
    , m_protocol_envelope{config.protocol_envelope}
4,880✔
1179
    , m_server_address{std::move(config.server_address)}
10,104✔
1180
    , m_server_port{config.server_port}
10,104✔
1181
    , m_server_verified{config.server_verified}
10,104✔
1182
    , m_user_id(std::move(config.user_id))
10,104✔
1183
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1184
    , m_authorization_header_name{config.authorization_header_name}
1185
    , m_custom_http_headers{std::move(config.custom_http_headers)}
1186
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1187
    , m_simulate_integration_error{config.simulate_integration_error}
10,104✔
1188
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
10,104✔
1189
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
10,104✔
1190
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1191
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1192
    , m_virt_path{std::move(config.realm_identifier)}
10,092✔
1193
    , m_proxy_config{std::move(config.proxy_config)}
1194
    , m_signed_access_token{std::move(config.signed_user_token)}
1195
    , m_client_reset_config{std::move(config.client_reset_config)}
1196
    , m_progress_handler(std::move(config.progress_handler))
10,092✔
1197
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
10,092✔
1198
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
10,092✔
1199
    , m_session_reason(m_client_reset_config ? SessionReason::ClientReset : config.session_reason)
10,092✔
1200
    , m_schema_version(config.schema_version)
10,092✔
1201
    , m_flx_subscription_store(std::move(flx_sub_store))
10,092✔
1202
    , m_migration_store(std::move(migration_store))
1203
{
1204
    REALM_ASSERT(m_db);
1205
    REALM_ASSERT(m_db->get_replication());
119,532✔
1206
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
119,532✔
1207

119,532✔
1208
    // SessionWrapper begins at +1 retain count because Client retains and
119,532✔
1209
    // releases it while performing async operations, and these need to not
1210
    // take it to 0 or it could be deleted before the caller can retain it.
1211
    bind_ptr();
UNCOV
1212
    m_client.register_unactualized_session_wrapper(this);
×
UNCOV
1213
}
×
UNCOV
1214

×
1215
SessionWrapper::~SessionWrapper() noexcept
1216
{
1217
    // We begin actualization in the constructor and do not delete the wrapper
1,968✔
1218
    // until both the Client is done with it and the Session has abandoned it,
1,968✔
1219
    // so at this point we must have actualized, finalized, and been abandoned.
1,968✔
1220
    REALM_ASSERT(m_actualized);
1221
    REALM_ASSERT(m_abandoned);
1222
    REALM_ASSERT(m_finalized);
20✔
1223
    REALM_ASSERT(m_closed);
20✔
1224
    REALM_ASSERT(!m_db);
20✔
1225
}
20✔
1226

1227

1228
inline ClientReplication& SessionWrapper::get_replication() noexcept
2,220✔
1229
{
2,220✔
1230
    REALM_ASSERT(m_db);
2,220✔
1231
    return static_cast<ClientReplication&>(*m_replication);
2,220✔
1232
}
2,220✔
1233

1234

1235
inline ClientImpl& SessionWrapper::get_client() noexcept
1,968✔
1236
{
1,968✔
1237
    return m_client;
×
1238
}
×
1239

1,968✔
1240
bool SessionWrapper::has_flx_subscription_store() const
1,968✔
1241
{
1,968✔
1242
    return static_cast<bool>(m_flx_subscription_store);
1,968✔
1243
}
1244

1,968✔
1245
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1246
{
1,968✔
UNCOV
1247
    REALM_ASSERT(!m_finalized);
✔
1248
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
1249
}
1250

1,920✔
1251
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1,920✔
UNCOV
1252
{
×
UNCOV
1253
    REALM_ASSERT(!m_finalized);
×
1254
    m_flx_last_seen_version = version;
1,920✔
1255
    m_flx_active_version = version;
1,920✔
1256
}
912✔
1257

912✔
1258
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1,008✔
1259
{
1,008✔
1260
    if (!has_flx_subscription_store()) {
1,008✔
1261
        return;
1,008✔
1262
    }
1,920✔
1263
    REALM_ASSERT(!m_finalized);
48✔
1264
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
48✔
UNCOV
1265
    REALM_ASSERT(new_version >= m_flx_active_version);
×
UNCOV
1266
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
×
1267

1268
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
48✔
1269

48✔
1270
    switch (batch_state) {
48✔
1271
        case DownloadBatchState::SteadyState:
1,968✔
1272
            // Cannot be called with this value.
1273
            REALM_UNREACHABLE();
1,968✔
1274
        case DownloadBatchState::LastInBatch:
1,968✔
1275
            if (m_flx_active_version == new_version) {
1276
                return;
1277
            }
20,132✔
1278
            on_flx_sync_version_complete(new_version);
20,132✔
1279
            if (new_version == 0) {
20,132✔
1280
                new_state = SubscriptionSet::State::Complete;
20,132✔
1281
            }
1282
            else {
1283
                new_state = SubscriptionSet::State::AwaitingMark;
5,534✔
1284
                m_flx_pending_mark_version = new_version;
5,534✔
1285
            }
5,534✔
1286
            break;
5,534✔
1287
        case DownloadBatchState::MoreToCome:
1288
            if (m_flx_last_seen_version == new_version) {
1289
                return;
67,342✔
1290
            }
67,342✔
1291

67,342✔
1292
            m_flx_last_seen_version = new_version;
67,342✔
1293
            new_state = SubscriptionSet::State::Bootstrapping;
1294
            break;
1295
    }
10,104✔
1296

10,104✔
1297
    get_flx_subscription_store()->update_state(new_version, new_state);
10,104✔
1298
}
10,104✔
1299

10,104✔
1300
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1301
{
1302
    REALM_ASSERT(!m_finalized);
1303
    return m_flx_subscription_store.get();
113,610✔
1304
}
1305

113,610✔
1306
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
113,610✔
1307
{
113,610✔
1308
    REALM_ASSERT(!m_finalized);
880✔
1309
    return m_flx_pending_bootstrap_store.get();
112,730✔
1310
}
112,730✔
1311

112,730✔
1312
MigrationStore* SessionWrapper::get_migration_store()
112,730✔
1313
{
113,610✔
1314
    REALM_ASSERT(!m_finalized);
1315
    return m_migration_store.get();
1316
}
1317

28✔
1318
inline bool SessionWrapper::mark_abandoned()
1319
{
1320
    REALM_ASSERT(!m_abandoned);
28✔
1321
    m_abandoned = true;
28✔
1322
    return m_finalized;
28✔
UNCOV
1323
}
×
UNCOV
1324

×
1325

1326
void SessionWrapper::on_commit(version_type new_version)
28✔
UNCOV
1327
{
×
1328
    // Thread safety required
28✔
1329
    m_client.post([self = util::bind_ptr{this}, new_version] {
28✔
1330
        REALM_ASSERT(self->m_actualized);
28✔
1331
        if (REALM_UNLIKELY(!self->m_sess))
28✔
1332
            return; // Already finalized
28✔
1333
        SessionImpl& sess = *self->m_sess;
28✔
1334
        sess.recognize_sync_version(new_version);                           // Throws
1335
        self->on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
1336
    });
1337
}
5,182✔
1338

5,182✔
1339

1340
void SessionWrapper::cancel_reconnect_delay()
5,182✔
1341
{
5,182✔
1342
    // Thread safety required
5,182✔
1343

5,182✔
1344
    m_client.post([self = util::bind_ptr{this}] {
1345
        REALM_ASSERT(self->m_actualized);
106✔
1346
        if (REALM_UNLIKELY(self->m_closed)) {
106✔
1347
            return;
106✔
1348
        }
5,076✔
1349

2,600✔
1350
        if (REALM_UNLIKELY(!self->m_sess))
1351
            return; // Already finalized
316✔
1352
        SessionImpl& sess = *self->m_sess;
316✔
1353
        sess.cancel_resumption_delay(); // Throws
2,284✔
1354
        ClientImpl::Connection& conn = sess.get_connection();
1355
        conn.cancel_reconnect_delay(); // Throws
2,284✔
1356
    });                                // Throws
2,284✔
1357
}
2,600✔
1358

2,476✔
1359
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1360
                                    WaitOperCompletionHandler handler)
2,476✔
1361
{
2,476✔
1362
    REALM_ASSERT(upload_completion || download_completion);
5,076✔
1363

5,076✔
1364
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
2,600✔
1365
                   download_completion]() mutable {
5,076✔
1366
        REALM_ASSERT(self->m_actualized);
2,792✔
1367
        if (REALM_UNLIKELY(!self->m_sess)) {
5,076✔
1368
            // Already finalized
5,182✔
1369
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
1370
            return;
1371
        }
1372
        if (upload_completion) {
12,900✔
1373
            if (download_completion) {
1374
                // Wait for upload and download completion
12,900✔
1375
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
1376
            }
12,900✔
1377
            else {
12,900✔
1378
                // Wait for upload completion only
12,900✔
1379
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
12,900✔
1380
            }
12,900✔
1381
        }
1382
        else {
12,900✔
1383
            // Wait for download completion only
12,900✔
1384
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
1385
        }
1386
        SessionImpl& sess = *self->m_sess;
1387
        if (upload_completion)
1388
            sess.request_upload_completion_notification(); // Throws
12,900✔
1389
        if (download_completion)
20✔
1390
            sess.request_download_completion_notification(); // Throws
12,880✔
1391
    });                                                      // Throws
12,880✔
1392
}
12,880✔
1393

12,880✔
1394

12,880✔
1395
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
12,880✔
1396
{
1397
    // Thread safety required
12,900✔
1398
    REALM_ASSERT(!m_abandoned);
12,900✔
1399

12,900✔
1400
    std::int_fast64_t target_mark;
32,724✔
1401
    {
32,724✔
1402
        util::CheckedLockGuard lock{m_client.m_mutex};
32,724✔
1403
        target_mark = ++m_target_upload_mark;
12,900✔
1404
    }
12,900✔
1405

12,900✔
1406
    m_client.post([self = util::bind_ptr{this}, target_mark] {
12,900✔
1407
        REALM_ASSERT(self->m_actualized);
1408
        // The session wrapper may already have been finalized. This can only
1409
        // happen if it was abandoned, but in that case, the call of
1410
        // wait_for_upload_complete_or_client_stopped() must have returned
9,988✔
1411
        // already.
1412
        if (REALM_UNLIKELY(!self->m_sess))
9,988✔
1413
            return;
1414
        if (target_mark > self->m_staged_upload_mark) {
9,988✔
1415
            self->m_staged_upload_mark = target_mark;
9,988✔
1416
            SessionImpl& sess = *self->m_sess;
9,988✔
1417
            sess.request_upload_completion_notification(); // Throws
9,988✔
1418
        }
9,988✔
1419
    }); // Throws
1420

9,988✔
1421
    bool completion_condition_was_satisfied;
9,988✔
1422
    {
1423
        util::CheckedUniqueLock lock{m_client.m_mutex};
1424
        m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
1425
            return m_reached_upload_mark >= target_mark || m_client.m_stopped;
1426
        });
9,988✔
1427
        completion_condition_was_satisfied = !m_client.m_stopped;
60✔
1428
    }
9,928✔
1429
    return completion_condition_was_satisfied;
9,926✔
1430
}
9,926✔
1431

9,926✔
1432

9,926✔
1433
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
9,928✔
1434
{
1435
    // Thread safety required
9,988✔
1436
    REALM_ASSERT(!m_abandoned);
9,988✔
1437

9,988✔
1438
    std::int_fast64_t target_mark;
20,306✔
1439
    {
20,306✔
1440
        util::CheckedLockGuard lock{m_client.m_mutex};
20,306✔
1441
        target_mark = ++m_target_download_mark;
9,988✔
1442
    }
9,988✔
1443

9,988✔
1444
    m_client.post([self = util::bind_ptr{this}, target_mark] {
9,988✔
1445
        REALM_ASSERT(self->m_actualized);
1446
        // The session wrapper may already have been finalized. This can only
1447
        // happen if it was abandoned, but in that case, the call of
1448
        // wait_for_download_complete_or_client_stopped() must have returned
216✔
1449
        // already.
1450
        if (REALM_UNLIKELY(!self->m_sess))
216✔
1451
            return;
1452
        if (target_mark > self->m_staged_download_mark) {
216✔
1453
            self->m_staged_download_mark = target_mark;
216✔
1454
            SessionImpl& sess = *self->m_sess;
216✔
1455
            sess.request_download_completion_notification(); // Throws
4✔
1456
        }
212✔
1457
    }); // Throws
212✔
1458

212✔
1459
    bool completion_condition_was_satisfied;
1460
    {
212✔
1461
        util::CheckedUniqueLock lock{m_client.m_mutex};
212✔
1462
        m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
212✔
1463
            return m_reached_download_mark >= target_mark || m_client.m_stopped;
212✔
1464
        });
216✔
1465
        completion_condition_was_satisfied = !m_client.m_stopped;
1466
    }
1467
    return completion_condition_was_satisfied;
1468
}
10,104✔
1469

10,104✔
1470

10,104✔
1471
void SessionWrapper::refresh(std::string_view signed_access_token)
10,104✔
1472
{
1473
    // Thread safety required
1474
    REALM_ASSERT(!m_abandoned);
1475

1476
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
10,032✔
1477
        REALM_ASSERT(self->m_actualized);
1478
        if (REALM_UNLIKELY(!self->m_sess))
10,032✔
1479
            return; // Already finalized
10,032✔
1480
        self->m_signed_access_token = std::move(token);
1481
        SessionImpl& sess = *self->m_sess;
1482
        ClientImpl::Connection& conn = sess.get_connection();
10,032✔
1483
        // FIXME: This only makes sense when each session uses a separate connection.
10,032✔
1484
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
1485
        sess.cancel_resumption_delay();                                                          // Throws
10,032✔
1486
        conn.cancel_reconnect_delay();                                                           // Throws
1487
    });
10,032✔
1488
}
4✔
1489

4✔
1490

1491
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
10,032✔
1492
{
10,032✔
1493
    ClientImpl& client = wrapper->m_client;
10,032✔
UNCOV
1494
    client.register_abandoned_session_wrapper(std::move(wrapper));
×
UNCOV
1495
}
×
1496

1497

10,032✔
1498
// Must be called from event loop thread
10,032✔
1499
void SessionWrapper::actualize()
10,032✔
1500
{
10,032✔
1501
    // actualize() can only ever be called once
10,032✔
1502
    REALM_ASSERT(!m_actualized);
10,032✔
1503
    REALM_ASSERT(!m_sess);
10,032✔
1504
    // The client should have removed this wrapper from those pending
10,032✔
UNCOV
1505
    // actualization if it called force_close() or finalize_before_actualize()
×
UNCOV
1506
    REALM_ASSERT(!m_finalized);
×
UNCOV
1507
    REALM_ASSERT(!m_closed);
×
1508

1509
    m_actualized = true;
1510

10,032✔
1511
    ScopeExitFail close_on_error([&]() noexcept {
10,032✔
1512
        m_closed = true;
10,032✔
1513
    });
1,494✔
1514

1,494✔
1515
    m_db->claim_sync_agent();
1516
    m_db->add_commit_listener(this);
10,032✔
1517
    ScopeExitFail remove_commit_listener([&]() noexcept {
10,032✔
1518
        m_db->remove_commit_listener(this);
10,032✔
1519
    });
×
UNCOV
1520

×
1521
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,032✔
1522
                            m_user_id,           m_sync_mode,      m_server_verified};
1523
    bool was_created = false;
1524
    ClientImpl::Connection& conn = m_client.get_connection(
1525
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,032✔
1526
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
1527
        was_created); // Throws
10,032✔
1528
    ScopeExitFail remove_connection([&]() noexcept {
2,776✔
1529
        if (was_created)
1530
            m_client.remove_connection(conn);
10,032✔
1531
    });
10,018✔
1532

10,018✔
1533
    // FIXME: This only makes sense when each session uses a separate connection.
7,096✔
1534
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
7,096✔
1535
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
6,942✔
1536
    if (m_sync_mode == SyncServerMode::FLX) {
7,096✔
1537
        m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
10,018✔
1538
    }
1539

10,032✔
1540
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,032✔
1541
    m_sess = sess.get();
9,652✔
1542
    ScopeExitFail clear_sess([&]() noexcept {
10,032✔
1543
        m_sess = nullptr;
1544
    });
1545
    conn.activate_session(std::move(sess)); // Throws
10,120✔
1546

10,120✔
1547
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
106✔
1548
    // session cannot change the state of the bootstrap store at the same time.
106✔
1549
    update_subscription_version_info();
10,014✔
1550

10,014✔
1551
    if (was_created)
10,014✔
1552
        conn.activate(); // Throws
1553

10,014✔
1554
    if (m_connection_state_change_listener) {
10,014✔
1555
        ConnectionState state = conn.get_state();
1556
        if (state != ConnectionState::disconnected) {
1557
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
1558
            if (state == ConnectionState::connected)
10,014✔
1559
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
1560
        }
1561
    }
10,014✔
1562

1563
    if (!m_client_reset_config)
10,014✔
1564
        on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
10,014✔
1565
}
10,014✔
1566

1567
void SessionWrapper::force_close()
10,014✔
1568
{
10,014✔
1569
    if (m_closed) {
1570
        return;
1571
    }
1572
    REALM_ASSERT(m_actualized);
1573
    REALM_ASSERT(m_sess);
1574
    m_closed = true;
1575

1576
    ClientImpl::Connection& conn = m_sess->get_connection();
10,018✔
1577
    conn.initiate_session_deactivation(m_sess); // Throws
10,018✔
1578

10,018✔
1579
    // We need to keep the DB open until finalization, but we no longer want to
10,018✔
1580
    // know when commits are made
1581
    m_db->remove_commit_listener(this);
10,018✔
1582

1583
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
10,018✔
1584
    m_flx_pending_bootstrap_store.reset();
1585
    // Clear the subscription and migration store refs since they are owned by SyncSession
1586
    m_flx_subscription_store.reset();
1587
    m_migration_store.reset();
1588
    m_sess = nullptr;
10,018✔
1589
    // Everything is being torn down, no need to report connection state anymore
10,018✔
1590
    m_connection_state_change_listener = {};
1591
}
1592

10,452✔
1593
// Must be called from event loop thread
434✔
1594
//
434✔
1595
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
434✔
1596
// have been acquired at some point in between the final read or write ever made
434✔
1597
// from a different thread and when this is called.
434✔
1598
void SessionWrapper::finalize()
10,246✔
1599
{
228✔
1600
    REALM_ASSERT(m_actualized);
228✔
1601
    REALM_ASSERT(m_abandoned);
228✔
1602
    REALM_ASSERT(!m_finalized);
228✔
1603

228✔
1604
    force_close();
10,036✔
1605

18✔
1606
    m_finalized = true;
18✔
1607

18✔
1608
    // The Realm file can be closed now, as no access to the Realm file is
18✔
1609
    // supposed to happen on behalf of a session after initiation of
10,018✔
1610
    // deactivation.
1611
    m_db->release_sync_agent();
1612
    m_db = nullptr;
1613

1614
    // All outstanding wait operations must be canceled
1615
    while (!m_upload_completion_handlers.empty()) {
1616
        auto handler = std::move(m_upload_completion_handlers.back());
72✔
1617
        m_upload_completion_handlers.pop_back();
72✔
1618
        handler(
72✔
1619
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
72✔
1620
    }
72✔
1621
    while (!m_download_completion_handlers.empty()) {
72✔
1622
        auto handler = std::move(m_download_completion_handlers.back());
72✔
1623
        m_download_completion_handlers.pop_back();
72✔
1624
        handler(
72✔
1625
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
72✔
1626
    }
1627
    while (!m_sync_completion_handlers.empty()) {
1628
        auto handler = std::move(m_sync_completion_handlers.back());
14,798✔
1629
        m_sync_completion_handlers.pop_back();
14,798✔
1630
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
16,746✔
1631
    }
1,948✔
1632
}
1,948✔
1633

1,948✔
1634

1,948✔
1635
// Must be called only when an unactualized session wrapper becomes abandoned.
14,998✔
1636
//
200✔
1637
// Called with a lock on `m_client.m_mutex`.
200✔
1638
inline void SessionWrapper::finalize_before_actualization() noexcept
200✔
1639
{
200✔
1640
    REALM_ASSERT(!m_finalized);
14,798✔
1641
    REALM_ASSERT(!m_sess);
14,798✔
1642
    m_actualized = true;
12,866✔
1643
    m_finalized = true;
12,866✔
1644
    m_closed = true;
12,866✔
1645
    m_db->remove_commit_listener(this);
14,798✔
1646
    m_db->release_sync_agent();
1647
    m_db = nullptr;
1648
}
1649

16,110✔
1650
inline void SessionWrapper::on_upload_progress(bool only_if_new_uploadable_data)
18,546✔
1651
{
2,436✔
1652
    REALM_ASSERT(!m_finalized);
2,436✔
1653
    report_progress(/* is_download = */ false, only_if_new_uploadable_data); // Throws
2,436✔
1654
}
2,436✔
1655

16,208✔
1656
inline void SessionWrapper::on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
98✔
1657
{
98✔
1658
    REALM_ASSERT(!m_finalized);
98✔
1659
    m_bootstrap_store_bytes = bootstrap_store_bytes;
98✔
1660
    report_progress(/* is_download = */ true); // Throws
1661
}
16,110✔
1662

882✔
1663

882✔
1664
void SessionWrapper::on_upload_completion()
882✔
1665
{
882✔
1666
    REALM_ASSERT(!m_finalized);
882✔
1667
    while (!m_upload_completion_handlers.empty()) {
1668
        auto handler = std::move(m_upload_completion_handlers.back());
16,110✔
1669
        m_upload_completion_handlers.pop_back();
16,110✔
1670
        handler(Status::OK()); // Throws
9,858✔
1671
    }
9,858✔
1672
    while (!m_sync_completion_handlers.empty()) {
9,858✔
1673
        auto handler = std::move(m_sync_completion_handlers.back());
16,110✔
1674
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
1675
        m_sync_completion_handlers.pop_back();
1676
    }
1677
    util::CheckedLockGuard lock{m_client.m_mutex};
668✔
1678
    if (m_staged_upload_mark > m_reached_upload_mark) {
668✔
1679
        m_reached_upload_mark = m_staged_upload_mark;
668✔
1680
        m_client.m_wait_or_client_stopped_cond.notify_all();
668✔
1681
    }
668✔
1682
}
668✔
1683

668✔
1684

1685
void SessionWrapper::on_download_completion()
1686
{
1687
    while (!m_download_completion_handlers.empty()) {
62✔
1688
        auto handler = std::move(m_download_completion_handlers.back());
62✔
1689
        m_download_completion_handlers.pop_back();
62✔
1690
        handler(Status::OK()); // Throws
62✔
1691
    }
62✔
1692
    while (!m_sync_completion_handlers.empty()) {
62✔
1693
        auto handler = std::move(m_sync_completion_handlers.back());
52✔
1694
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
62✔
1695
        m_sync_completion_handlers.pop_back();
46✔
1696
    }
62✔
1697

62✔
1698
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
1699
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
1700
                             m_flx_pending_mark_version);
1701
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
1702
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
11,842✔
1703
    }
11,842✔
1704

11,802✔
1705
    util::CheckedLockGuard lock{m_client.m_mutex};
11,802✔
1706
    if (m_staged_download_mark > m_reached_download_mark) {
11,842✔
1707
        m_reached_download_mark = m_staged_download_mark;
1708
        m_client.m_wait_or_client_stopped_cond.notify_all();
1709
    }
10,322✔
1710
}
10,322✔
1711

10,322✔
1712

10,322✔
1713
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
10,322✔
1714
{
1715
    REALM_ASSERT(!m_finalized);
1716
    m_suspended = true;
169,780✔
1717
    if (m_connection_state_change_listener) {
169,780✔
1718
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
169,780✔
1719
    }
1720
}
169,780✔
1721

116,472✔
1722

1723
void SessionWrapper::on_resumed()
1724
{
53,308✔
1725
    REALM_ASSERT(!m_finalized);
27,454✔
1726
    m_suspended = false;
1727
    if (m_connection_state_change_listener) {
25,854✔
1728
        ClientImpl::Connection& conn = m_sess->get_connection();
25,854✔
1729
        if (conn.get_state() != ConnectionState::disconnected) {
25,854✔
1730
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
1731
            if (conn.get_state() == ConnectionState::connected)
25,854✔
1732
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
20,408✔
1733
        }
20,408✔
1734
    }
1735
}
1736

1737

1738
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1739
                                                 const std::optional<SessionErrorInfo>& error_info)
1740
{
1741
    if (m_connection_state_change_listener && !m_suspended) {
1742
        m_connection_state_change_listener(state, error_info); // Throws
1743
    }
20,408✔
1744
}
20,408✔
1745

10,472✔
1746
void SessionWrapper::init_progress_handler()
20,408✔
1747
{
20,408✔
1748
    uint64_t unused = 0;
1749
    ClientHistory::get_upload_download_bytes(m_db.get(), m_reported_progress.final_downloaded, unused,
25,854✔
1750
                                             m_reported_progress.final_uploaded, unused, unused);
25,854✔
1751
}
25,854✔
1752

10,392✔
1753
void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadable_data)
1754
{
25,854✔
1755
    REALM_ASSERT(!m_finalized);
25,854✔
1756
    REALM_ASSERT(m_sess);
25,854✔
1757
    REALM_ASSERT(!(only_if_new_uploadable_data && is_download));
12,076✔
1758

708✔
1759
    if (!m_progress_handler)
12,076✔
1760
        return;
12,076✔
1761

1762
    // Ignore progress messages from before we first receive a DOWNLOAD message
1763
    if (!m_reliable_download_progress)
1764
        return;
1765

12,076✔
1766
    ReportedProgress p = m_reported_progress;
12,076!
UNCOV
1767
    ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, p.downloadable, p.uploaded, p.uploadable,
×
1768
                                             p.snapshot);
12,076✔
1769

13,778✔
1770
    // If this progress notification was triggered by a commit being made we
1771
    // only want to send it if the uploadable bytes has actually increased,
1772
    // and not if it was an empty commit.
1773
    if (only_if_new_uploadable_data && m_reported_progress.uploadable == p.uploadable)
13,778✔
1774
        return;
13,778✔
1775

10,016✔
1776
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
13,778✔
1777
    // is only the remaining to download. This is confusing, so make them use
1778
    // the same units.
25,854✔
1779
    p.downloadable += p.downloaded;
15,832✔
1780

25,854✔
1781
    bool is_completed = false;
15,456✔
1782
    if (is_download) {
1783
        if (m_download_estimate)
25,854✔
1784
            is_completed = *m_download_estimate >= 1.0;
18,538✔
1785
        else
1786
            is_completed = p.downloaded == p.downloadable;
7,316✔
1787
    }
1788
    else {
7,316✔
1789
        is_completed = p.uploaded == p.uploadable;
14,144✔
1790
    }
14,144✔
1791

1792
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
14,144✔
1793
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
14,144✔
1794
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
14,144✔
1795

7,072✔
1796
        // The effect of this calculation is that if new bytes are added for download/upload,
7,072✔
1797
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
7,072✔
1798
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
7,072✔
1799
        // before progress has reached 1.0.
7,072✔
1800
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
7,072✔
1801
        // Example for upload progress reported:
1802
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
7,316✔
1803

7,316✔
1804
        double progress_estimate = 1.0;
7,316✔
1805
        if (final_transferred < transferable && transferred < transferable)
1806
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
1807
        return progress_estimate;
60✔
1808
    };
60✔
UNCOV
1809

×
UNCOV
1810
    double upload_estimate = 1.0, download_estimate = 1.0;
×
1811

1812
    // calculate estimate for both download/upload since the progress is reported all at once
60✔
1813
    if (!is_completed || is_download)
60✔
1814
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, p.final_uploaded);
1815

1816
    // download estimate only known for flx
10,324✔
1817
    if (m_download_estimate) {
10,324✔
1818
        download_estimate = *m_download_estimate;
1819

10,324✔
1820
        // ... bootstrap store bytes should be null after initial sync when every changeset integrated immediately
10,324✔
1821
        if (m_bootstrap_store_bytes)
9,998✔
1822
            p.downloaded += *m_bootstrap_store_bytes;
9,998✔
1823

1824
        // FIXME for flx with download estimate these bytes are not known
326✔
1825
        // provide some sensible value for non-streaming version of object-store callbacks
1826
        // until these field are completely removed from the api after pbs deprecation
1827
        p.downloadable = p.downloaded;
326✔
1828
        if (0.01 <= download_estimate && download_estimate <= 0.99)
326✔
1829
            if (p.downloaded > p.final_downloaded)
326✔
1830
                p.downloadable =
150✔
1831
                    p.final_downloaded + uint64_t((p.downloaded - p.final_downloaded) / download_estimate);
150✔
1832
    }
176✔
1833
    else {
176✔
UNCOV
1834
        if (!is_completed || !is_download)
×
UNCOV
1835
            download_estimate = calculate_progress(p.downloaded, p.downloadable, p.final_downloaded);
×
UNCOV
1836
    }
×
UNCOV
1837

×
1838
    if (is_completed) {
1839
        if (is_download)
176✔
1840
            p.final_downloaded = p.downloaded;
1841
        else
176✔
1842
            p.final_uploaded = p.uploaded;
176✔
1843
    }
176✔
1844

4✔
1845
    m_reported_progress = p;
4✔
1846

4✔
1847
    if (m_sess->logger.would_log(Logger::Level::debug)) {
172✔
1848
        auto to_str = [](double d) {
168✔
1849
            std::ostringstream ss;
168✔
1850
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
4✔
1851
            ss << std::fixed << std::setprecision(4) << d;
4✔
1852
            return ss.str();
4✔
1853
        };
172✔
1854
        m_sess->logger.debug(
172✔
1855
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
172✔
1856
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
326✔
1857
            p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
1858
            to_str(upload_estimate), p.snapshot, m_flx_active_version);
1859
    }
10,324✔
1860

10,324✔
1861
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
8,718✔
1862
                       upload_estimate, m_flx_last_seen_version);
1,606✔
1863
}
1,606✔
1864

1,606✔
1865
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1,606✔
1866
{
1867
    if (!m_sess) {
1868
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
72✔
1869
    }
72✔
1870

1871
    return m_sess->send_test_command(std::move(body));
72✔
1872
}
72✔
UNCOV
1873

×
UNCOV
1874
void SessionWrapper::handle_pending_client_reset_acknowledgement()
×
UNCOV
1875
{
×
1876
    REALM_ASSERT(!m_finalized);
1877

72✔
UNCOV
1878
    auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
×
UNCOV
1879
    if (!has_pending_reset) {
×
UNCOV
1880
        return; // nothing to do
×
1881
    }
1882

72✔
1883
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
72✔
1884

1885
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
72✔
1886
    async_wait_for(
72✔
1887
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
1888
            if (status == ErrorCodes::OperationAborted) {
1889
                return;
1890
            }
1891
            auto& logger = self->m_sess->logger;
1892
            if (!status.is_ok()) {
1893
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
1894
                             status);
1895
                return;
1896
            }
1897

1,322✔
1898
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
1,322✔
1899

1,322✔
1900
            auto tr = self->m_db->start_write();
1,322✔
1901
            auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
1,322✔
1902
            if (!cur_pending_reset) {
1,322✔
1903
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
1,322✔
1904
                return;
1,322✔
1905
            }
1,322✔
1906
            if (*cur_pending_reset == pending_reset) {
1,322✔
1907
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
1,322✔
1908
            }
1,322✔
1909
            else {
1,322✔
1910
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
2,776✔
1911
            }
2,776✔
1912
            PendingResetStore::clear_pending_reset(tr);
2,772✔
UNCOV
1913
            tr->commit();
×
1914
        });
2,772✔
UNCOV
1915
}
×
1916

1917
void SessionWrapper::update_subscription_version_info()
2,772✔
1918
{
2,774✔
1919
    if (!m_flx_subscription_store)
2,764✔
1920
        return;
1921
    auto versions_info = m_flx_subscription_store->get_version_info();
2,764✔
1922
    m_flx_active_version = versions_info.active;
2,772✔
1923
    m_flx_pending_mark_version = versions_info.pending_mark;
2,776✔
1924
}
1925

1926
std::string SessionWrapper::get_appservices_connection_id()
12✔
1927
{
12✔
1928
    auto pf = util::make_promise_future<std::string>();
12✔
1929

1930
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
1931
        if (!status.is_ok()) {
1932
            promise.set_error(status);
2,764✔
1933
            return;
2,764✔
1934
        }
2,764✔
1935

1936
        if (!self->m_sess) {
1937
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
1938
            return;
10,240✔
1939
        }
10,240✔
1940

10,240✔
1941
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
10,240✔
1942
    });
1943

1944
    return pf.future.get();
1945
}
1,892✔
1946

3,780✔
1947
// ################ ClientImpl::Connection ################
3,780✔
1948

3,780✔
1949
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1,892✔
1950
                                   const std::string& authorization_header_name,
1,892✔
1951
                                   const std::map<std::string, std::string>& custom_http_headers,
1952
                                   bool verify_servers_ssl_certificate,
1953
                                   Optional<std::string> ssl_trust_certificate_path,
2,764✔
1954
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
2,764✔
1955
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
2,764✔
1956
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
2,764✔
1957
                                                      client.logger_ptr)} // Throws
1958
    , logger{*logger_ptr}
2,764✔
1959
    , m_client{client}
1960
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1961
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1962
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
3,744✔
1963
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
3,744✔
1964
    , m_reconnect_info{reconnect_info}
3,744✔
1965
    , m_ident{ident}
1966
    , m_server_endpoint{std::move(endpoint)}
3,744✔
1967
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
3,744✔
1968
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
3,744✔
1969
{
3,744✔
1970
    m_on_idle = m_client.create_trigger([this](Status status) {
3,744✔
1971
        if (status == ErrorCodes::OperationAborted)
1972
            return;
3,744✔
1973
        else if (!status.is_ok())
3,744✔
1974
            throw Exception(status);
1975

1976
        REALM_ASSERT(m_activated);
1977
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,774✔
1978
            on_idle(); // Throws
2,774✔
1979
            // Connection object may be destroyed now.
2,774✔
1980
        }
1981
    });
1982
}
1983

1984
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
11,018✔
1985
{
11,018✔
1986
    return m_ident;
2,404✔
1987
}
2,404✔
1988

11,686✔
1989

11,686✔
1990
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
11,686✔
1991
{
11,686✔
1992
    return m_server_endpoint;
8,614✔
1993
}
8,614✔
1994

1995
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1996
                                                        const std::string& signed_access_token)
1997
{
4,890✔
1998
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
9,918✔
1999
    m_signed_access_token = signed_access_token;           // Throws (copy)
9,918✔
2000
}
2001

2002

2003
void ClientImpl::Connection::resume_active_sessions()
UNCOV
2004
{
×
UNCOV
2005
    auto handler = [=](ClientImpl::Session& sess) {
×
2006
        sess.cancel_resumption_delay(); // Throws
2007
    };
2008
    for_each_active_session(std::move(handler)); // Throws
9,918✔
2009
}
2010

2011
void ClientImpl::Connection::on_idle()
2012
{
10,000✔
2013
    logger.debug(util::LogCategory::session, "Destroying connection object");
10,000✔
2014
    ClientImpl& client = get_client();
10,000✔
2015
    client.remove_connection(*this);
2016
    // NOTE: This connection object is now destroyed!
2017
}
768✔
2018

768✔
2019

768✔
2020
std::string ClientImpl::Connection::get_http_request_path() const
2021
{
2022
    using namespace std::string_view_literals;
1,896✔
2023
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
1,896✔
2024

1,896✔
2025
    std::string path;
2026
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
2027
    path += m_http_request_path_prefix;
12✔
2028
    path += param;
12✔
2029
    path += m_signed_access_token;
12✔
2030

2031
    return path;
2032
}
9,512✔
2033

9,512✔
2034

9,512✔
2035
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
2036
{
2037
    return util::format("Connection[%1] ", ident);
56✔
2038
}
56✔
2039

56✔
2040

2041
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
2042
                                                            std::optional<SessionErrorInfo> error_info)
2043
{
4,016✔
2044
    if (m_force_closed) {
4,016✔
2045
        return;
4,016✔
2046
    }
2047
    auto handler = [=](ClientImpl::Session& sess) {
2048
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
2049
        sess_2.on_connection_state_changed(state, error_info); // Throws
2050
    };
10,104✔
2051
    for_each_active_session(std::move(handler)); // Throws
10,104✔
2052
}
10,104✔
2053

10,104✔
2054

2055
Client::Client(Config config)
2056
    : m_impl{new ClientImpl{std::move(config)}} // Throws
2057
{
17,682✔
2058
}
17,682✔
2059

17,682✔
2060

2061
Client::Client(Client&& client) noexcept
2062
    : m_impl{std::move(client.m_impl)}
2063
{
28✔
2064
}
28✔
2065

28✔
2066

2067
Client::~Client() noexcept {}
2068

2069

4,856✔
2070
void Client::shutdown() noexcept
4,856✔
2071
{
4,856✔
2072
    m_impl->shutdown();
2073
}
2074

2075
void Client::shutdown_and_wait()
12,900✔
2076
{
12,900✔
2077
    m_impl->shutdown_and_wait();
12,900✔
2078
}
2079

2080
void Client::cancel_reconnect_delay()
2081
{
9,988✔
2082
    m_impl->cancel_reconnect_delay();
9,988✔
2083
}
9,988✔
2084

2085
void Client::voluntary_disconnect_all_connections()
2086
{
2087
    m_impl->voluntary_disconnect_all_connections();
216✔
2088
}
216✔
2089

216✔
2090
bool Client::wait_for_session_terminations_or_client_stopped()
2091
{
2092
    return m_impl->wait_for_session_terminations_or_client_stopped();
2093
}
10,104✔
2094

10,104✔
2095
util::Future<void> Client::notify_session_terminated()
2096
{
2097
    return m_impl->notify_session_terminated();
10,104✔
2098
}
10,104✔
2099

10,104✔
2100
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2101
                                  port_type& port, std::string& path) const
2102
{
60✔
2103
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
60✔
2104
}
60✔
2105

2106

2107
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
72✔
2108
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
72✔
2109
{
72✔
2110
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
2111
                                std::move(config)}; // Throws
UNCOV
2112
}
×
UNCOV
2113

×
UNCOV
2114

×
UNCOV
2115
void Session::nonsync_transact_notify(version_type new_version)
×
UNCOV
2116
{
×
UNCOV
2117
    m_impl->on_commit(new_version); // Throws
×
UNCOV
2118
}
×
2119

UNCOV
2120

×
2121
void Session::cancel_reconnect_delay()
2122
{
2123
    m_impl->cancel_reconnect_delay(); // Throws
2124
}
2125

2126

2127
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2128
{
2129
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
2130
}
2131

2132

2133
bool Session::wait_for_upload_complete_or_client_stopped()
2134
{
2135
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
2136
}
2137

2138

2139
bool Session::wait_for_download_complete_or_client_stopped()
2140
{
2141
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
2142
}
2143

2144

2145
void Session::refresh(std::string_view signed_access_token)
2146
{
2147
    m_impl->refresh(signed_access_token); // Throws
2148
}
2149

2150

2151
void Session::abandon() noexcept
2152
{
2153
    REALM_ASSERT(m_impl);
2154
    // Reabsorb the ownership assigned to the applications naked pointer by
2155
    // Session constructor
2156
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
2157
    SessionWrapper::abandon(std::move(wrapper));
2158
}
2159

2160
util::Future<std::string> Session::send_test_command(std::string body)
2161
{
2162
    return m_impl->send_test_command(std::move(body));
2163
}
2164

2165
std::string Session::get_appservices_connection_id()
2166
{
2167
    return m_impl->get_appservices_connection_id();
2168
}
2169

2170
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2171
{
2172
    switch (proxyType) {
2173
        case ProxyConfig::Type::HTTP:
2174
            return os << "HTTP";
2175
        case ProxyConfig::Type::HTTPS:
2176
            return os << "HTTPS";
2177
    }
2178
    REALM_TERMINATE("Invalid Proxy Type object.");
2179
}
2180

2181
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc