• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2357

29 May 2024 11:05PM UTC coverage: 90.843% (+0.04%) from 90.801%
2357

push

Evergreen

web-flow
Merge pull request #7609 from realm/tg/session-lifecycle

RCORE-2092 Simplify the SessionWrapper lifecycle a bit

101578 of 179868 branches covered (56.47%)

506 of 544 new or added lines in 14 files covered. (93.01%)

42 existing lines in 13 files now uncovered.

214462 of 236080 relevant lines covered (90.84%)

5704793.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.39
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/protocol.hpp>
8
#include <realm/sync/subscriptions.hpp>
9
#include <realm/util/bind_ptr.hpp>
10

11
namespace realm::sync {
12
namespace {
13
using namespace realm::util;
14

15

16
// clang-format off
17
using SessionImpl                     = ClientImpl::Session;
18
using SyncTransactCallback            = Session::SyncTransactCallback;
19
using ProgressHandler                 = Session::ProgressHandler;
20
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
21
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
22
using port_type                       = Session::port_type;
23
using connection_ident_type           = std::int_fast64_t;
24
using ProxyConfig                     = SyncConfig::ProxyConfig;
25
// clang-format on
26

27
} // unnamed namespace
28

29

30
// Life cycle states of a session wrapper:
31
//
32
// The session wrapper begins life with an associated Client, but no underlying
33
// SessionImpl. On construction, it begins the actualization process by posting
34
// a job to the client's event loop. That job will set `m_sess` to a session impl
35
// and then set `m_actualized = true`. Once this happens `m_actualized` will
36
// never change again.
37
//
38
// When the external reference to the session (`sync::Session`, which in
39
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
40
// begins finalization. If the wrapper has not yet been actualized this takes
41
// place immediately and `m_finalized = true` is set directly on the calling
42
// thread. If it has been actualized, a job is posted to the client's event loop
43
// which will tear down the session and then set `m_finalized = true`. Regardless
44
// of whether or not the session has been actualized, `m_abandoned = true` is
45
// immediately set when the external reference is released.
46
//
47
// When the associated Client is destroyed it calls force_close() on all
48
// actualized wrappers from its event loop. This causes the wrapper to tear down
49
// the session, but not not make it proceed to the finalized state. In normal
50
// usage the client will outlive all sessions, but in tests getting the teardown
51
// correct and race-free can be tricky so we permit either order.
52
//
53
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
54
// only while waiting for finalization to happen. It will exist with
55
// `m_finalized = true` only while there are pending post handlers yet to be
56
// executed.
57
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
58
public:
59
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
60
                   Session::Config&&);
61
    ~SessionWrapper() noexcept;
62

63
    ClientReplication& get_replication() noexcept;
64
    ClientImpl& get_client() noexcept;
65

66
    bool has_flx_subscription_store() const;
67
    SubscriptionStore* get_flx_subscription_store();
68
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
69

70
    MigrationStore* get_migration_store();
71

72
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
73
    // but *not* m_finalized.
74
    // Must be called from event loop thread.
75
    void force_close();
76

77
    // Can be called from any thread.
78
    void on_commit(version_type new_version) override;
79
    // Can be called from any thread.
80
    void cancel_reconnect_delay();
81

82
    // Can be called from any thread.
83
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
84
    // Can be called from any thread.
85
    bool wait_for_upload_complete_or_client_stopped();
86
    // Can be called from any thread.
87
    bool wait_for_download_complete_or_client_stopped();
88

89
    // Can be called from any thread.
90
    void refresh(std::string_view signed_access_token);
91

92
    // Can be called from any thread.
93
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
94

95
    // These are called from ClientImpl
96
    // Must be called from event loop thread.
97
    void actualize();
98
    void finalize();
99
    void finalize_before_actualization() noexcept;
100

101
    // Can be called from any thread.
102
    util::Future<std::string> send_test_command(std::string body);
103

104
    void handle_pending_client_reset_acknowledgement();
105

106
    void update_subscription_version_info();
107

108
    // Can be called from any thread.
109
    std::string get_appservices_connection_id();
110

111
    // Can be called from any thread, but inherently cannot be called
112
    // concurrently with calls to any of the other non-confined functions.
113
    bool mark_abandoned();
114

115
private:
116
    ClientImpl& m_client;
117
    DBRef m_db;
118
    Replication* m_replication;
119

120
    const ProtocolEnvelope m_protocol_envelope;
121
    const std::string m_server_address;
122
    const port_type m_server_port;
123
    const bool m_server_verified;
124
    const std::string m_user_id;
125
    const SyncServerMode m_sync_mode;
126
    const std::string m_authorization_header_name;
127
    const std::map<std::string, std::string> m_custom_http_headers;
128
    const bool m_verify_servers_ssl_certificate;
129
    const bool m_simulate_integration_error;
130
    const std::optional<std::string> m_ssl_trust_certificate_path;
131
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
132
    const size_t m_flx_bootstrap_batch_size_bytes;
133
    const std::string m_http_request_path_prefix;
134
    const std::string m_virt_path;
135
    const std::optional<ProxyConfig> m_proxy_config;
136

137
    // This one is different from null when, and only when the session wrapper
138
    // is in ClientImpl::m_abandoned_session_wrappers.
139
    SessionWrapper* m_next = nullptr;
140

141
    // These may only be accessed by the event loop thread.
142
    std::string m_signed_access_token;
143
    std::optional<ClientReset> m_client_reset_config;
144

145
    struct ReportedProgress {
146
        uint64_t snapshot = 0;
147
        uint64_t uploaded = 0;
148
        uint64_t uploadable = 0;
149
        uint64_t downloaded = 0;
150
        uint64_t downloadable = 0;
151
        uint64_t final_uploaded = 0;
152
        uint64_t final_downloaded = 0;
153
    } m_reported_progress;
154

155
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
156
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
157

158
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
159
    bool m_in_debug_hook = false;
160

161
    const SessionReason m_session_reason;
162

163
    const uint64_t m_schema_version;
164

165
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
166
    int64_t m_flx_active_version = 0;
167
    int64_t m_flx_last_seen_version = 0;
168
    int64_t m_flx_pending_mark_version = 0;
169
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
170

171
    std::shared_ptr<MigrationStore> m_migration_store;
172

173
    // Set to true when this session wrapper is actualized (i.e. the wrapped
174
    // session is created), or when the wrapper is finalized before actualization.
175
    // It is then never modified again.
176
    //
177
    // Actualization is scheduled during the construction of SessionWrapper, and
178
    // so a session specific post handler will always find that `m_actualized`
179
    // is true as the handler will always be run after the actualization job.
180
    // This holds even if the wrapper is finalized or closed before actualization.
181
    bool m_actualized = false;
182

183
    // Set to true when session deactivation is begun, either via force_close()
184
    // or finalize().
185
    bool m_closed = false;
186

187
    // Set to true in on_suspended() and then false in on_resumed(). Used to
188
    // suppress spurious connection state and error reporting while the session
189
    // is already in an error state.
190
    bool m_suspended = false;
191

192
    // Set when the session has been abandoned. After this point none of the
193
    // public API functions should be called again.
194
    bool m_abandoned = false;
195
    // Has the SessionWrapper been finalized?
196
    bool m_finalized = false;
197

198
    // Set to true when the first DOWNLOAD message is received to indicate that
199
    // the byte-level download progress parameters can be considered reasonable
200
    // reliable. Before that, a lot of time may have passed, so our record of
201
    // the download progress is likely completely out of date.
202
    bool m_reliable_download_progress = false;
203

204
    std::optional<double> m_download_estimate;
205
    std::optional<uint64_t> m_bootstrap_store_bytes;
206

207
    // Set to point to an activated session object during actualization of the
208
    // session wrapper. Set to null during finalization of the session
209
    // wrapper. Both modifications are guaranteed to be performed by the event
210
    // loop thread.
211
    //
212
    // If a session specific post handler, that is submitted after the
213
    // initiation of the session wrapper, sees that `m_sess` is null, it can
214
    // conclude that the session wrapper has either been force closed or has
215
    // been both abandoned and finalized.
216
    //
217
    // Must only be accessed from the event loop thread.
218
    SessionImpl* m_sess = nullptr;
219

220
    // These must only be accessed from the event loop thread.
221
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
222
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
223
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
224

225
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
226
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
227
    // event loop thread.
228
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
229
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
230
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
231

232
    void on_upload_progress(bool only_if_new_uploadable_data = false);
233
    void on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes = {});
234
    void on_upload_completion();
235
    void on_download_completion();
236
    void on_suspended(const SessionErrorInfo& error_info);
237
    void on_resumed();
238
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
239
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
240
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
241
    void on_flx_sync_version_complete(int64_t version);
242

243
    void init_progress_handler();
244
    // only_if_new_uploadable_data can be true only if is_download is false
245
    void report_progress(bool is_download, bool only_if_new_uploadable_data = false);
246

247
    friend class SessionWrapperStack;
248
    friend class ClientImpl::Session;
249
};
250

251

252
// ################ SessionWrapperStack ################
253

254
inline bool SessionWrapperStack::empty() const noexcept
255
{
19,812✔
256
    return !m_back;
19,812✔
257
}
19,812✔
258

259

260
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
261
{
20,136✔
262
    REALM_ASSERT(!w->m_next);
20,136✔
263
    w->m_next = m_back;
20,136✔
264
    m_back = w.release();
20,136✔
265
}
20,136✔
266

267

268
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
269
{
58,338✔
270
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
58,338✔
271
    if (m_back) {
58,338✔
272
        m_back = m_back->m_next;
20,054✔
273
        w->m_next = nullptr;
20,054✔
274
    }
20,054✔
275
    return w;
58,338✔
276
}
58,338✔
277

278

279
inline void SessionWrapperStack::clear() noexcept
280
{
19,812✔
281
    while (m_back) {
19,812✔
282
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
283
        m_back = w->m_next;
×
284
    }
×
285
}
19,812✔
286

287

288
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
289
{
10,100✔
290
    SessionWrapper** p = &m_back;
10,100✔
291
    while (*p && *p != w) {
10,278✔
292
        p = &(*p)->m_next;
178✔
293
    }
178✔
294
    if (!*p) {
10,100✔
295
        return false;
10,032✔
296
    }
10,032✔
297
    *p = w->m_next;
68✔
298
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
68✔
299
    return true;
68✔
300
}
10,100✔
301

302

303
SessionWrapperStack::~SessionWrapperStack()
304
{
19,812✔
305
    clear();
19,812✔
306
}
19,812✔
307

308

309
// ################ ClientImpl ################
310

311
ClientImpl::~ClientImpl()
312
{
9,918✔
313
    // Since no other thread is allowed to be accessing this client or any of
314
    // its subobjects at this time, no mutex locking is necessary.
315

316
    shutdown_and_wait();
9,918✔
317
    // Session wrappers are removed from m_unactualized_session_wrappers as they
318
    // are abandoned.
319
    REALM_ASSERT(m_stopped);
9,918✔
320
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,918✔
321
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
9,918✔
322
}
9,918✔
323

324

325
void ClientImpl::cancel_reconnect_delay()
326
{
1,956✔
327
    // Thread safety required
328
    post([this] {
1,956✔
329
        for (auto& p : m_server_slots) {
1,956✔
330
            ServerSlot& slot = p.second;
1,956✔
331
            if (m_one_connection_per_session) {
1,956✔
332
                REALM_ASSERT(!slot.connection);
×
333
                for (const auto& p : slot.alt_connections) {
×
334
                    ClientImpl::Connection& conn = *p.second;
×
335
                    conn.resume_active_sessions(); // Throws
×
336
                    conn.cancel_reconnect_delay(); // Throws
×
337
                }
×
338
            }
×
339
            else {
1,956✔
340
                REALM_ASSERT(slot.alt_connections.empty());
1,956✔
341
                if (slot.connection) {
1,956✔
342
                    ClientImpl::Connection& conn = *slot.connection;
1,952✔
343
                    conn.resume_active_sessions(); // Throws
1,952✔
344
                    conn.cancel_reconnect_delay(); // Throws
1,952✔
345
                }
1,952✔
346
                else {
4✔
347
                    slot.reconnect_info.reset();
4✔
348
                }
4✔
349
            }
1,956✔
350
        }
1,956✔
351
    }); // Throws
1,956✔
352
}
1,956✔
353

354

355
void ClientImpl::voluntary_disconnect_all_connections()
356
{
12✔
357
    auto done_pf = util::make_promise_future<void>();
12✔
358
    post([this, promise = std::move(done_pf.promise)]() mutable {
12✔
359
        try {
12✔
360
            for (auto& p : m_server_slots) {
12✔
361
                ServerSlot& slot = p.second;
12✔
362
                if (m_one_connection_per_session) {
12✔
363
                    REALM_ASSERT(!slot.connection);
×
NEW
364
                    for (const auto& [_, conn] : slot.alt_connections) {
×
NEW
365
                        conn->voluntary_disconnect();
×
366
                    }
×
367
                }
×
368
                else {
12✔
369
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
370
                    if (slot.connection) {
12✔
371
                        slot.connection->voluntary_disconnect();
12✔
372
                    }
12✔
373
                }
12✔
374
            }
12✔
375
        }
12✔
376
        catch (...) {
12✔
377
            promise.set_error(exception_to_status());
×
378
            return;
×
379
        }
×
380
        promise.emplace_value();
12✔
381
    });
12✔
382
    done_pf.future.get();
12✔
383
}
12✔
384

385

386
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
387
{
9,488✔
388
    // Thread safety required
389

390
    {
9,488✔
391
        util::CheckedLockGuard lock{m_mutex};
9,488✔
392
        m_sessions_terminated = false;
9,488✔
393
    }
9,488✔
394

395
    // The technique employed here relies on the fact that
396
    // actualize_and_finalize_session_wrappers() must get to execute at least
397
    // once before the post handler submitted below gets to execute, but still
398
    // at a time where all session wrappers, that are abandoned prior to the
399
    // execution of wait_for_session_terminations_or_client_stopped(), have been
400
    // added to `m_abandoned_session_wrappers`.
401
    //
402
    // To see that this is the case, consider a session wrapper that was
403
    // abandoned before wait_for_session_terminations_or_client_stopped() was
404
    // invoked. Then the session wrapper will have been added to
405
    // `m_abandoned_session_wrappers`, and an invocation of
406
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
407
    // guarantees mentioned in the documentation of Trigger then ensure
408
    // that at least one execution of actualize_and_finalize_session_wrappers()
409
    // will happen after the session wrapper has been added to
410
    // `m_abandoned_session_wrappers`, but before the post handler submitted
411
    // below gets to execute.
412
    post([this] {
9,488✔
413
        {
9,488✔
414
            util::CheckedLockGuard lock{m_mutex};
9,488✔
415
            m_sessions_terminated = true;
9,488✔
416
        }
9,488✔
417
        m_wait_or_client_stopped_cond.notify_all();
9,488✔
418
    }); // Throws
9,488✔
419

420
    bool completion_condition_was_satisfied;
9,488✔
421
    {
9,488✔
422
        util::CheckedUniqueLock lock{m_mutex};
9,488✔
423
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
18,944✔
424
            return m_sessions_terminated || m_stopped;
18,944✔
425
        });
18,944✔
426
        completion_condition_was_satisfied = !m_stopped;
9,488✔
427
    }
9,488✔
428
    return completion_condition_was_satisfied;
9,488✔
429
}
9,488✔
430

431

432
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
433
util::Future<void> ClientImpl::notify_session_terminated()
434
{
56✔
435
    auto pf = util::make_promise_future<void>();
56✔
436
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
437
        // Includes operation_aborted
438
        if (!status.is_ok()) {
56✔
439
            promise.set_error(status);
×
440
            return;
×
441
        }
×
442

443
        promise.emplace_value();
56✔
444
    });
56✔
445

446
    return std::move(pf.future);
56✔
447
}
56✔
448

449
void ClientImpl::drain_connections_on_loop()
450
{
9,918✔
451
    post([this](Status status) {
9,918✔
452
        REALM_ASSERT(status.is_ok());
9,906✔
453
        drain_connections();
9,906✔
454
    });
9,906✔
455
}
9,918✔
456

457
void ClientImpl::shutdown_and_wait()
458
{
10,686✔
459
    shutdown();
10,686✔
460
    util::CheckedUniqueLock lock{m_drain_mutex};
10,686✔
461
    if (m_drained) {
10,686✔
462
        return;
768✔
463
    }
768✔
464

465
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,918✔
466
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
15,856✔
467
        return m_num_connections == 0 && m_outstanding_posts == 0;
15,856✔
468
    });
15,856✔
469

470
    m_drained = true;
9,918✔
471
}
9,918✔
472

473
void ClientImpl::shutdown() noexcept
474
{
20,686✔
475
    {
20,686✔
476
        util::CheckedLockGuard lock{m_mutex};
20,686✔
477
        if (m_stopped)
20,686✔
478
            return;
10,768✔
479
        m_stopped = true;
9,918✔
480
    }
9,918✔
NEW
481
    m_wait_or_client_stopped_cond.notify_all();
×
482

483
    drain_connections_on_loop();
9,918✔
484
}
9,918✔
485

486

487
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
488
{
10,104✔
489
    // Thread safety required.
490
    {
10,104✔
491
        util::CheckedLockGuard lock{m_mutex};
10,104✔
492
        // We can't actualize the session if we've already been stopped, so
493
        // just finalize it immediately.
494
        if (m_stopped) {
10,104✔
NEW
495
            wrapper->finalize_before_actualization();
×
NEW
496
            return;
×
NEW
497
        }
×
498

499
        REALM_ASSERT(m_actualize_and_finalize);
10,104✔
500
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
10,104✔
501
    }
10,104✔
UNCOV
502
    m_actualize_and_finalize->trigger();
×
503
}
10,104✔
504

505

506
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
507
{
10,104✔
508
    // Thread safety required.
509
    {
10,104✔
510
        util::CheckedLockGuard lock{m_mutex};
10,104✔
511
        REALM_ASSERT(m_actualize_and_finalize);
10,104✔
512
        // The wrapper may have already been finalized before being abandoned
513
        // if we were stopped when it was created.
514
        if (wrapper->mark_abandoned())
10,104✔
515
            return;
4✔
516

517
        // If the session wrapper has not yet been actualized (on the event loop
518
        // thread), it can be immediately finalized. This ensures that we will
519
        // generally not actualize a session wrapper that has already been
520
        // abandoned.
521
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
10,100✔
522
            wrapper->finalize_before_actualization();
68✔
523
            return;
68✔
524
        }
68✔
525
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,032✔
526
    }
10,032✔
527
    m_actualize_and_finalize->trigger();
×
528
}
10,032✔
529

530

531
// Must be called from the event loop thread.
532
void ClientImpl::actualize_and_finalize_session_wrappers()
533
{
14,130✔
534
    // We need to pop from the wrapper stacks while holding the lock to ensure
535
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
536
    // release the lock before finalizing or actualizing because those functions
537
    // invoke user callbacks which may try to access the client and reacquire
538
    // the lock.
539
    //
540
    // Finalization must always happen before actualization because we may be
541
    // finalizing and actualizing sessions for the same Realm file, and
542
    // actualizing first would result in overlapping sessions. Because we're
543
    // releasing the lock new sessions may come in as we're looping, so we need
544
    // a single loop that checks both fields.
545
    while (true) {
34,184✔
546
        bool finalize = true;
34,180✔
547
        bool stopped;
34,180✔
548
        util::bind_ptr<SessionWrapper> wrapper;
34,180✔
549
        {
34,180✔
550
            util::CheckedLockGuard lock{m_mutex};
34,180✔
551
            wrapper = m_abandoned_session_wrappers.pop();
34,180✔
552
            if (!wrapper) {
34,180✔
553
                wrapper = m_unactualized_session_wrappers.pop();
24,158✔
554
                finalize = false;
24,158✔
555
            }
24,158✔
556
            stopped = m_stopped;
34,180✔
557
        }
34,180✔
558
        if (!wrapper)
34,180✔
559
            break;
14,126✔
560
        if (finalize)
20,054✔
561
            wrapper->finalize(); // Throws
10,020✔
562
        else if (stopped)
10,034✔
563
            wrapper->finalize_before_actualization();
4✔
564
        else
10,030✔
565
            wrapper->actualize(); // Throws
10,030✔
566
    }
20,054✔
567
}
14,130✔
568

569

570
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
571
                                                   const std::string& authorization_header_name,
572
                                                   const std::map<std::string, std::string>& custom_http_headers,
573
                                                   bool verify_servers_ssl_certificate,
574
                                                   Optional<std::string> ssl_trust_certificate_path,
575
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
576
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
577
{
10,024✔
578
    auto&& [server_slot_it, inserted] =
10,024✔
579
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,024✔
580
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,024✔
581

582
    // TODO: enable multiplexing with proxies
583
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,024✔
584
        // Use preexisting connection
585
        REALM_ASSERT(server_slot.alt_connections.empty());
7,246✔
586
        return *server_slot.connection;
7,246✔
587
    }
7,246✔
588

589
    // Create a new connection
590
    REALM_ASSERT(!server_slot.connection);
2,778✔
591
    connection_ident_type ident = m_prev_connection_ident + 1;
2,778✔
592
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,778✔
593
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,778✔
594
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,778✔
595
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,778✔
596
    ClientImpl::Connection& conn = *conn_2;
2,778✔
597
    if (!m_one_connection_per_session) {
2,778✔
598
        server_slot.connection = std::move(conn_2);
2,768✔
599
    }
2,768✔
600
    else {
10✔
601
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
10✔
602
    }
10✔
603
    m_prev_connection_ident = ident;
2,778✔
604
    was_created = true;
2,778✔
605
    {
2,778✔
606
        util::CheckedLockGuard lk(m_drain_mutex);
2,778✔
607
        ++m_num_connections;
2,778✔
608
    }
2,778✔
609
    return conn;
2,778✔
610
}
10,024✔
611

612

613
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
614
{
2,770✔
615
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,770✔
616
    auto i = m_server_slots.find(endpoint);
2,770✔
617
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,770✔
618
    ServerSlot& server_slot = i->second;
2,770✔
619
    if (!m_one_connection_per_session) {
2,770✔
620
        REALM_ASSERT(server_slot.alt_connections.empty());
2,758✔
621
        REALM_ASSERT(&*server_slot.connection == &conn);
2,758✔
622
        server_slot.reconnect_info = conn.get_reconnect_info();
2,758✔
623
        server_slot.connection.reset();
2,758✔
624
    }
2,758✔
625
    else {
12✔
626
        REALM_ASSERT(!server_slot.connection);
12✔
627
        connection_ident_type ident = conn.get_ident();
12✔
628
        auto j = server_slot.alt_connections.find(ident);
12✔
629
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
630
        REALM_ASSERT(&*j->second == &conn);
12✔
631
        server_slot.alt_connections.erase(j);
12✔
632
    }
12✔
633

634
    bool notify;
2,770✔
635
    {
2,770✔
636
        util::CheckedLockGuard lk(m_drain_mutex);
2,770✔
637
        REALM_ASSERT(m_num_connections);
2,770✔
638
        notify = --m_num_connections <= 0;
2,770✔
639
    }
2,770✔
640
    if (notify) {
2,770✔
641
        m_drain_cv.notify_all();
2,160✔
642
    }
2,160✔
643
}
2,770✔
644

645

646
// ################ SessionImpl ################
647

648
void SessionImpl::force_close()
649
{
102✔
650
    // Allow force_close() if session is active or hasn't been activated yet.
651
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102!
652
        m_wrapper.force_close();
102✔
653
    }
102✔
654
}
102✔
655

656
void SessionImpl::on_connection_state_changed(ConnectionState state,
657
                                              const std::optional<SessionErrorInfo>& error_info)
658
{
11,998✔
659
    // Only used to report errors back to the SyncSession while the Session is active
660
    if (m_state == SessionImpl::Active) {
11,998✔
661
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
11,998✔
662
    }
11,998✔
663
}
11,998✔
664

665

666
const std::string& SessionImpl::get_virt_path() const noexcept
667
{
6,952✔
668
    // Can only be called if the session is active or being activated
669
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
6,952✔
670
    return m_wrapper.m_virt_path;
6,952✔
671
}
6,952✔
672

673
const std::string& SessionImpl::get_realm_path() const noexcept
674
{
10,324✔
675
    // Can only be called if the session is active or being activated
676
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,324✔
677
    return m_wrapper.m_db->get_path();
10,324✔
678
}
10,324✔
679

680
DBRef SessionImpl::get_db() const noexcept
681
{
24,892✔
682
    // Can only be called if the session is active or being activated
683
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
24,892✔
684
    return m_wrapper.m_db;
24,892✔
685
}
24,892✔
686

687
ClientReplication& SessionImpl::get_repl() const noexcept
688
{
118,638✔
689
    // Can only be called if the session is active or being activated
690
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
118,638✔
691
    return m_wrapper.get_replication();
118,638✔
692
}
118,638✔
693

694
ClientHistory& SessionImpl::get_history() const noexcept
695
{
116,692✔
696
    return get_repl().get_history();
116,692✔
697
}
116,692✔
698

699
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
700
{
13,704✔
701
    // Can only be called if the session is active or being activated
702
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
13,704✔
703
    return m_wrapper.m_client_reset_config;
13,704✔
704
}
13,704✔
705

706
SessionReason SessionImpl::get_session_reason() noexcept
707
{
1,466✔
708
    // Can only be called if the session is active or being activated
709
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,466✔
710
    return m_wrapper.m_session_reason;
1,466✔
711
}
1,466✔
712

713
uint64_t SessionImpl::get_schema_version() noexcept
714
{
1,466✔
715
    // Can only be called if the session is active or being activated
716
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,466✔
717
    return m_wrapper.m_schema_version;
1,466✔
718
}
1,466✔
719

720
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
721
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
722
{
44,056✔
723
    // Ignore the call if the session is not active
724
    if (m_state != State::Active) {
44,056✔
725
        return;
×
726
    }
×
727

728
    try {
44,056✔
729
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
44,056!
730
        if (simulate_integration_error) {
44,056✔
731
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
732
        }
×
733
        version_type client_version;
44,056✔
734
        if (REALM_LIKELY(!get_client().is_dry_run())) {
44,056✔
735
            VersionInfo version_info;
44,054✔
736
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
44,054✔
737
            client_version = version_info.realm_version;
44,054✔
738
        }
44,054✔
739
        else {
2✔
740
            // Fake it for "dry run" mode
741
            client_version = m_last_version_available + 1;
2✔
742
        }
2✔
743
        on_changesets_integrated(client_version, progress, !changesets.empty()); // Throws
44,056✔
744
    }
44,056✔
745
    catch (const IntegrationException& e) {
44,056✔
746
        on_integration_failure(e);
24✔
747
    }
24✔
748
}
44,056✔
749

750

751
void SessionImpl::on_upload_completion()
752
{
14,794✔
753
    // Ignore the call if the session is not active
754
    if (m_state == State::Active) {
14,794✔
755
        m_wrapper.on_upload_completion(); // Throws
14,794✔
756
    }
14,794✔
757
}
14,794✔
758

759

760
void SessionImpl::on_download_completion()
761
{
16,106✔
762
    // Ignore the call if the session is not active
763
    if (m_state == State::Active) {
16,108✔
764
        m_wrapper.on_download_completion(); // Throws
16,108✔
765
    }
16,108✔
766
}
16,106✔
767

768

769
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
770
{
668✔
771
    // Ignore the call if the session is not active
772
    if (m_state == State::Active) {
668✔
773
        m_wrapper.on_suspended(error_info); // Throws
668✔
774
    }
668✔
775
}
668✔
776

777

778
void SessionImpl::on_resumed()
779
{
60✔
780
    // Ignore the call if the session is not active
781
    if (m_state == State::Active) {
60✔
782
        m_wrapper.on_resumed(); // Throws
60✔
783
    }
60✔
784
}
60✔
785

786
void SessionImpl::handle_pending_client_reset_acknowledgement()
787
{
314✔
788
    // Ignore the call if the session is not active
789
    if (m_state == State::Active) {
314✔
790
        m_wrapper.handle_pending_client_reset_acknowledgement();
314✔
791
    }
314✔
792
}
314✔
793

794
void SessionImpl::update_subscription_version_info()
795
{
296✔
796
    // Ignore the call if the session is not active
797
    if (m_state == State::Active) {
296✔
798
        m_wrapper.update_subscription_version_info();
296✔
799
    }
296✔
800
}
296✔
801

802
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
803
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
804
{
46,190✔
805
    // Ignore the call if the session is not active
806
    if (m_state != State::Active) {
46,190✔
807
        return false;
×
808
    }
×
809

810
    if (is_steady_state_download_message(batch_state, query_version)) {
46,190✔
811
        return false;
44,056✔
812
    }
44,056✔
813

814
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,134✔
815
    std::optional<SyncProgress> maybe_progress;
2,134✔
816
    if (batch_state == DownloadBatchState::LastInBatch) {
2,134✔
817
        maybe_progress = progress;
1,934✔
818
    }
1,934✔
819

820
    bool new_batch = false;
2,134✔
821
    try {
2,134✔
822
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
2,134✔
823
    }
2,134✔
824
    catch (const LogicError& ex) {
2,134✔
825
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
826
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
827
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
828
                                    ProtocolError::bad_changeset_size);
×
829
            on_integration_failure(ex);
×
830
            return true;
×
831
        }
×
832
        throw;
×
833
    }
×
834

835
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
836
    // bootstrapping.
837
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
2,134✔
838
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
48✔
839
    }
48✔
840

841
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
2,134✔
842
                                       batch_state, received_changesets.size());
2,134✔
843
    if (hook_action == SyncClientHookAction::EarlyReturn) {
2,134✔
844
        return true;
12✔
845
    }
12✔
846
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
2,122✔
847

848
    if (batch_state == DownloadBatchState::MoreToCome) {
2,122✔
849
        notify_download_progress(bootstrap_store->pending_stats().pending_changeset_bytes);
196✔
850
        return true;
196✔
851
    }
196✔
852
    else {
1,926✔
853
        // FIXME (#7451) this variable is not needed in principle, and bootstrap store bytes could be passed just
854
        // through notify_download_progress, but since it is needed in report_progress, and it is also called on
855
        // upload progress for now until progress is reported separately. As soon as we understand here that there
856
        // are no more changesets for bootstrap store, and we want to process bootstrap, we don't need to notify
857
        // intermediate progress - so reset these bytes to not accidentally double report them.
858
        m_wrapper.m_bootstrap_store_bytes.reset();
1,926✔
859
    }
1,926✔
860

861
    try {
1,926✔
862
        process_pending_flx_bootstrap();
1,926✔
863
    }
1,926✔
864
    catch (const IntegrationException& e) {
1,926✔
865
        on_integration_failure(e);
12✔
866
    }
12✔
867
    catch (...) {
1,926✔
868
        on_integration_failure(IntegrationException(exception_to_status()));
×
869
    }
×
870

871
    return true;
1,926✔
872
}
1,926✔
873

874

875
void SessionImpl::process_pending_flx_bootstrap()
876
{
11,954✔
877
    // Ignore the call if not a flx session or session is not active
878
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,954✔
879
        return;
8,534✔
880
    }
8,534✔
881
    // Should never be called if session is not active
882
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,420✔
883
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,420✔
884
    if (!bootstrap_store->has_pending()) {
3,420✔
885
        return;
1,474✔
886
    }
1,474✔
887

888
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,946✔
889
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,946✔
890
                "changeset size: %3)",
1,946✔
891
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,946✔
892
                pending_batch_stats.pending_changeset_bytes);
1,946✔
893
    auto& history = get_repl().get_history();
1,946✔
894
    VersionInfo new_version;
1,946✔
895
    SyncProgress progress;
1,946✔
896
    int64_t query_version = -1;
1,946✔
897
    size_t changesets_processed = 0;
1,946✔
898

899
    // Used to commit each batch after it was transformed.
900
    TransactionRef transact = get_db()->start_write();
1,946✔
901
    while (bootstrap_store->has_pending()) {
4,052✔
902
        auto start_time = std::chrono::steady_clock::now();
2,118✔
903
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,118✔
904
        if (!pending_batch.progress) {
2,118✔
905
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
906
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
907
            // bootstrap store requires a write transaction itself.
908
            transact->close();
8✔
909
            bootstrap_store->clear();
8✔
910
            return;
8✔
911
        }
8✔
912

913
        auto batch_state =
2,110✔
914
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,110✔
915
        uint64_t downloadable_bytes = 0;
2,110✔
916
        query_version = pending_batch.query_version;
2,110✔
917
        bool simulate_integration_error =
2,110✔
918
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,110✔
919
        if (simulate_integration_error) {
2,110✔
920
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
921
        }
4✔
922

923
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,106✔
924
                        batch_state, pending_batch.changesets.size());
2,106✔
925

926
        history.integrate_server_changesets(
2,106✔
927
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
2,106✔
928
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,106✔
929
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,094✔
930
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,094✔
931
            });
2,094✔
932
        progress = *pending_batch.progress;
2,106✔
933
        changesets_processed += pending_batch.changesets.size();
2,106✔
934
        auto duration = std::chrono::steady_clock::now() - start_time;
2,106✔
935

936
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,106✔
937
                                      batch_state, pending_batch.changesets.size());
2,106✔
938
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,106✔
939

940
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,106✔
941
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,106✔
942
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
2,106✔
943
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
2,106✔
944
                    pending_batch.remaining_changesets);
2,106✔
945
    }
2,106✔
946

947
    REALM_ASSERT_3(query_version, !=, -1);
1,934✔
948
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,934✔
949

950
    on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
1,934✔
951
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,934✔
952
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,934✔
953
    // NoAction/EarlyReturn are both valid no-op actions to take here.
954
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,934✔
955
}
1,934✔
956

957
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
958
{
20✔
959
    // Ignore the call if the session is not active
960
    if (m_state == State::Active) {
20✔
961
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
962
    }
20✔
963
}
20✔
964

965
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
966
{
1,970✔
967
    // Ignore the call if the session is not active
968
    if (m_state == State::Active) {
1,970✔
969
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,970✔
970
    }
1,970✔
971
}
1,970✔
972

973
SubscriptionStore* SessionImpl::get_flx_subscription_store()
974
{
18,132✔
975
    // Should never be called if session is not active
976
    REALM_ASSERT_EX(m_state == State::Active, m_state);
18,132✔
977
    return m_wrapper.get_flx_subscription_store();
18,132✔
978
}
18,132✔
979

980
MigrationStore* SessionImpl::get_migration_store()
981
{
66,028✔
982
    // Should never be called if session is not active
983
    REALM_ASSERT_EX(m_state == State::Active, m_state);
66,028✔
984
    return m_wrapper.get_migration_store();
66,028✔
985
}
66,028✔
986

987
void SessionImpl::on_flx_sync_version_complete(int64_t version)
988
{
300✔
989
    // Ignore the call if the session is not active
990
    if (m_state == State::Active) {
300✔
991
        m_wrapper.on_flx_sync_version_complete(version);
300✔
992
    }
300✔
993
}
300✔
994

995
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
996
{
2,160✔
997
    // Should never be called if session is not active
998
    REALM_ASSERT_EX(m_state == State::Active, m_state);
2,160✔
999

1000
    // Make sure we don't call the debug hook recursively.
1001
    if (m_wrapper.m_in_debug_hook) {
2,160✔
1002
        return SyncClientHookAction::NoAction;
24✔
1003
    }
24✔
1004
    m_wrapper.m_in_debug_hook = true;
2,136✔
1005
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
2,136✔
1006
        m_wrapper.m_in_debug_hook = false;
2,136✔
1007
    });
2,136✔
1008

1009
    auto action = m_wrapper.m_debug_hook(data);
2,136✔
1010
    switch (action) {
2,136✔
1011
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1012
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1013
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1014

1015
            auto err_processing_err = receive_error_message(err_info);
12✔
1016
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1017
            return SyncClientHookAction::EarlyReturn;
12✔
1018
        }
×
1019
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1020
            get_connection().voluntary_disconnect();
24✔
1021
            return SyncClientHookAction::EarlyReturn;
24✔
1022
        }
×
1023
        default:
2,092✔
1024
            return action;
2,092✔
1025
    }
2,136✔
1026
}
2,136✔
1027

1028
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1029
                                                  int64_t query_version, DownloadBatchState batch_state,
1030
                                                  size_t num_changesets)
1031
{
116,964✔
1032
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
116,964✔
1033
        return SyncClientHookAction::NoAction;
114,972✔
1034
    }
114,972✔
1035
    if (REALM_UNLIKELY(m_state != State::Active)) {
1,992✔
1036
        return SyncClientHookAction::NoAction;
×
1037
    }
×
1038

1039
    SyncClientHookData data;
1,992✔
1040
    data.event = event;
1,992✔
1041
    data.batch_state = batch_state;
1,992✔
1042
    data.progress = progress;
1,992✔
1043
    data.num_changesets = num_changesets;
1,992✔
1044
    data.query_version = query_version;
1,992✔
1045

1046
    return call_debug_hook(data);
1,992✔
1047
}
1,992✔
1048

1049
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1050
{
1,378✔
1051
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,378✔
1052
        return SyncClientHookAction::NoAction;
1,210✔
1053
    }
1,210✔
1054
    if (REALM_UNLIKELY(m_state != State::Active)) {
168✔
1055
        return SyncClientHookAction::NoAction;
×
1056
    }
×
1057

1058
    SyncClientHookData data;
168✔
1059
    data.event = event;
168✔
1060
    data.batch_state = DownloadBatchState::SteadyState;
168✔
1061
    data.progress = m_progress;
168✔
1062
    data.num_changesets = 0;
168✔
1063
    data.query_version = 0;
168✔
1064
    data.error_info = &error_info;
168✔
1065

1066
    return call_debug_hook(data);
168✔
1067
}
168✔
1068

1069
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1070
{
92,392✔
1071
    // Should never be called if session is not active
1072
    REALM_ASSERT_EX(m_state == State::Active, m_state);
92,392✔
1073
    if (batch_state == DownloadBatchState::SteadyState) {
92,392✔
1074
        return true;
44,056✔
1075
    }
44,056✔
1076

1077
    if (!m_is_flx_sync_session) {
48,336✔
1078
        return true;
42,760✔
1079
    }
42,760✔
1080

1081
    // If this is a steady state DOWNLOAD, no need for special handling.
1082
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
5,576✔
1083
        return true;
1,298✔
1084
    }
1,298✔
1085

1086
    return false;
4,278✔
1087
}
5,576✔
1088

1089
void SessionImpl::init_progress_handler()
1090
{
10,324✔
1091
    if (m_state != State::Unactivated && m_state != State::Active)
10,324✔
1092
        return;
×
1093

1094
    m_wrapper.init_progress_handler();
10,324✔
1095
}
10,324✔
1096

1097
void SessionImpl::enable_progress_notifications()
1098
{
44,606✔
1099
    m_wrapper.m_reliable_download_progress = true;
44,606✔
1100
}
44,606✔
1101

1102
void SessionImpl::notify_upload_progress()
1103
{
33,652✔
1104
    if (m_state != State::Active)
33,652✔
1105
        return;
×
1106

1107
    m_wrapper.on_upload_progress();
33,652✔
1108
}
33,652✔
1109

1110
void SessionImpl::update_download_estimate(double download_estimate)
1111
{
3,432✔
1112
    if (m_state != State::Active)
3,432✔
1113
        return;
×
1114

1115
    m_wrapper.m_download_estimate = download_estimate;
3,432✔
1116
}
3,432✔
1117

1118
void SessionImpl::notify_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
1119
{
26,534✔
1120
    if (m_state != State::Active)
26,534✔
1121
        return;
×
1122

1123
    m_wrapper.on_download_progress(bootstrap_store_bytes); // Throws
26,534✔
1124
}
26,534✔
1125

1126
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1127
{
60✔
1128
    if (m_state != State::Active) {
60✔
1129
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1130
    }
×
1131

1132
    try {
60✔
1133
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1134
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1135
            return Status{ErrorCodes::LogicError,
4✔
1136
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1137
        }
4✔
1138
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
1139
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1140
        }
×
1141
    }
56✔
1142
    catch (const nlohmann::json::parse_error& e) {
60✔
1143
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1144
    }
4✔
1145

1146
    auto pf = util::make_promise_future<std::string>();
52✔
1147
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1148
        // Includes operation_aborted
1149
        if (!status.is_ok()) {
52✔
1150
            promise.set_error(status);
×
1151
            return;
×
1152
        }
×
1153

1154
        auto id = ++m_last_pending_test_command_ident;
52✔
1155
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1156
        ensure_enlisted_to_send();
52✔
1157
    });
52✔
1158

1159
    return std::move(pf.future);
52✔
1160
}
60✔
1161

1162
// ################ SessionWrapper ################
1163

1164
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1165
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1166
// the ClientImpl::Connection that owns the ClientImpl::Session.
1167
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1168
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
1169
    : m_client{client}
4,880✔
1170
    , m_db(std::move(db))
4,880✔
1171
    , m_replication(m_db->get_replication())
4,880✔
1172
    , m_protocol_envelope{config.protocol_envelope}
4,880✔
1173
    , m_server_address{std::move(config.server_address)}
4,880✔
1174
    , m_server_port{config.server_port}
4,880✔
1175
    , m_server_verified{config.server_verified}
4,880✔
1176
    , m_user_id(std::move(config.user_id))
4,880✔
1177
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
4,880✔
1178
    , m_authorization_header_name{config.authorization_header_name}
4,880✔
1179
    , m_custom_http_headers{std::move(config.custom_http_headers)}
4,880✔
1180
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
4,880✔
1181
    , m_simulate_integration_error{config.simulate_integration_error}
4,880✔
1182
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
4,880✔
1183
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
4,880✔
1184
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
4,880✔
1185
    , m_http_request_path_prefix{std::move(config.service_identifier)}
4,880✔
1186
    , m_virt_path{std::move(config.realm_identifier)}
4,880✔
1187
    , m_proxy_config{std::move(config.proxy_config)}
4,880✔
1188
    , m_signed_access_token{std::move(config.signed_user_token)}
4,880✔
1189
    , m_client_reset_config{std::move(config.client_reset_config)}
4,880✔
1190
    , m_progress_handler(std::move(config.progress_handler))
4,880✔
1191
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
4,880✔
1192
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
4,880✔
1193
    , m_session_reason(m_client_reset_config ? SessionReason::ClientReset : config.session_reason)
4,880✔
1194
    , m_schema_version(config.schema_version)
4,880✔
1195
    , m_flx_subscription_store(std::move(flx_sub_store))
4,880✔
1196
    , m_migration_store(std::move(migration_store))
4,880✔
1197
{
10,104✔
1198
    REALM_ASSERT(m_db);
10,104✔
1199
    REALM_ASSERT(m_db->get_replication());
10,104✔
1200
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,104✔
1201

1202
    // SessionWrapper begins at +1 retain count because Client retains and
1203
    // releases it while performing async operations, and these need to not
1204
    // take it to 0 or it could be deleted before the caller can retain it.
1205
    bind_ptr();
10,104✔
1206
    m_client.register_unactualized_session_wrapper(this);
10,104✔
1207
}
10,104✔
1208

1209
SessionWrapper::~SessionWrapper() noexcept
1210
{
10,092✔
1211
    // We begin actualization in the constructor and do not delete the wrapper
1212
    // until both the Client is done with it and the Session has abandoned it,
1213
    // so at this point we must have actualized, finalized, and been abandoned.
1214
    REALM_ASSERT(m_actualized);
10,092✔
1215
    REALM_ASSERT(m_abandoned);
10,092✔
1216
    REALM_ASSERT(m_finalized);
10,092✔
1217
    REALM_ASSERT(m_closed);
10,092✔
1218
    REALM_ASSERT(!m_db);
10,092✔
1219
}
10,092✔
1220

1221

1222
inline ClientReplication& SessionWrapper::get_replication() noexcept
1223
{
118,634✔
1224
    REALM_ASSERT(m_db);
118,634✔
1225
    return static_cast<ClientReplication&>(*m_replication);
118,634✔
1226
}
118,634✔
1227

1228

1229
inline ClientImpl& SessionWrapper::get_client() noexcept
UNCOV
1230
{
×
UNCOV
1231
    return m_client;
×
UNCOV
1232
}
×
1233

1234
bool SessionWrapper::has_flx_subscription_store() const
1235
{
1,970✔
1236
    return static_cast<bool>(m_flx_subscription_store);
1,970✔
1237
}
1,970✔
1238

1239
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1240
{
20✔
1241
    REALM_ASSERT(!m_finalized);
20✔
1242
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1243
}
20✔
1244

1245
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1246
{
2,222✔
1247
    REALM_ASSERT(!m_finalized);
2,222✔
1248
    m_flx_last_seen_version = version;
2,222✔
1249
    m_flx_active_version = version;
2,222✔
1250
}
2,222✔
1251

1252
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1253
{
1,970✔
1254
    if (!has_flx_subscription_store()) {
1,970✔
1255
        return;
×
1256
    }
×
1257
    REALM_ASSERT(!m_finalized);
1,970✔
1258
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,970✔
1259
    REALM_ASSERT(new_version >= m_flx_active_version);
1,970✔
1260
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,970✔
1261

1262
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,970✔
1263

1264
    switch (batch_state) {
1,970✔
1265
        case DownloadBatchState::SteadyState:
✔
1266
            // Cannot be called with this value.
1267
            REALM_UNREACHABLE();
1268
        case DownloadBatchState::LastInBatch:
1,922✔
1269
            if (m_flx_active_version == new_version) {
1,922✔
1270
                return;
×
1271
            }
×
1272
            on_flx_sync_version_complete(new_version);
1,922✔
1273
            if (new_version == 0) {
1,922✔
1274
                new_state = SubscriptionSet::State::Complete;
912✔
1275
            }
912✔
1276
            else {
1,010✔
1277
                new_state = SubscriptionSet::State::AwaitingMark;
1,010✔
1278
                m_flx_pending_mark_version = new_version;
1,010✔
1279
            }
1,010✔
1280
            break;
1,922✔
1281
        case DownloadBatchState::MoreToCome:
48✔
1282
            if (m_flx_last_seen_version == new_version) {
48✔
1283
                return;
×
1284
            }
×
1285

1286
            m_flx_last_seen_version = new_version;
48✔
1287
            new_state = SubscriptionSet::State::Bootstrapping;
48✔
1288
            break;
48✔
1289
    }
1,970✔
1290

1291
    get_flx_subscription_store()->update_state(new_version, new_state);
1,970✔
1292
}
1,970✔
1293

1294
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1295
{
20,122✔
1296
    REALM_ASSERT(!m_finalized);
20,122✔
1297
    return m_flx_subscription_store.get();
20,122✔
1298
}
20,122✔
1299

1300
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1301
{
5,554✔
1302
    REALM_ASSERT(!m_finalized);
5,554✔
1303
    return m_flx_pending_bootstrap_store.get();
5,554✔
1304
}
5,554✔
1305

1306
MigrationStore* SessionWrapper::get_migration_store()
1307
{
66,028✔
1308
    REALM_ASSERT(!m_finalized);
66,028✔
1309
    return m_migration_store.get();
66,028✔
1310
}
66,028✔
1311

1312
inline bool SessionWrapper::mark_abandoned()
1313
{
10,104✔
1314
    REALM_ASSERT(!m_abandoned);
10,104✔
1315
    m_abandoned = true;
10,104✔
1316
    return m_finalized;
10,104✔
1317
}
10,104✔
1318

1319

1320
void SessionWrapper::on_commit(version_type new_version)
1321
{
112,366✔
1322
    // Thread safety required
1323
    m_client.post([self = util::bind_ptr{this}, new_version] {
112,366✔
1324
        REALM_ASSERT(self->m_actualized);
112,366✔
1325
        if (REALM_UNLIKELY(!self->m_sess))
112,366✔
1326
            return; // Already finalized
946✔
1327
        SessionImpl& sess = *self->m_sess;
111,420✔
1328
        sess.recognize_sync_version(new_version);                           // Throws
111,420✔
1329
        self->on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
111,420✔
1330
    });
111,420✔
1331
}
112,366✔
1332

1333

1334
void SessionWrapper::cancel_reconnect_delay()
1335
{
20✔
1336
    // Thread safety required
1337

1338
    m_client.post([self = util::bind_ptr{this}] {
20✔
1339
        REALM_ASSERT(self->m_actualized);
20✔
1340
        if (REALM_UNLIKELY(self->m_closed)) {
20✔
1341
            return;
×
1342
        }
×
1343

1344
        if (REALM_UNLIKELY(!self->m_sess))
20✔
1345
            return; // Already finalized
×
1346
        SessionImpl& sess = *self->m_sess;
20✔
1347
        sess.cancel_resumption_delay(); // Throws
20✔
1348
        ClientImpl::Connection& conn = sess.get_connection();
20✔
1349
        conn.cancel_reconnect_delay(); // Throws
20✔
1350
    });                                // Throws
20✔
1351
}
20✔
1352

1353
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1354
                                    WaitOperCompletionHandler handler)
1355
{
5,170✔
1356
    REALM_ASSERT(upload_completion || download_completion);
5,170✔
1357

1358
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
5,170✔
1359
                   download_completion]() mutable {
5,170✔
1360
        REALM_ASSERT(self->m_actualized);
5,170✔
1361
        if (REALM_UNLIKELY(!self->m_sess)) {
5,170✔
1362
            // Already finalized
1363
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
104✔
1364
            return;
104✔
1365
        }
104✔
1366
        if (upload_completion) {
5,066✔
1367
            if (download_completion) {
2,594✔
1368
                // Wait for upload and download completion
1369
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
306✔
1370
            }
306✔
1371
            else {
2,288✔
1372
                // Wait for upload completion only
1373
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,288✔
1374
            }
2,288✔
1375
        }
2,594✔
1376
        else {
2,472✔
1377
            // Wait for download completion only
1378
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,472✔
1379
        }
2,472✔
1380
        SessionImpl& sess = *self->m_sess;
5,066✔
1381
        if (upload_completion)
5,066✔
1382
            sess.request_upload_completion_notification(); // Throws
2,594✔
1383
        if (download_completion)
5,066✔
1384
            sess.request_download_completion_notification(); // Throws
2,778✔
1385
    });                                                      // Throws
5,066✔
1386
}
5,170✔
1387

1388

1389
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1390
{
12,900✔
1391
    // Thread safety required
1392
    REALM_ASSERT(!m_abandoned);
12,900✔
1393

1394
    std::int_fast64_t target_mark;
12,900✔
1395
    {
12,900✔
1396
        util::CheckedLockGuard lock{m_client.m_mutex};
12,900✔
1397
        target_mark = ++m_target_upload_mark;
12,900✔
1398
    }
12,900✔
1399

1400
    m_client.post([self = util::bind_ptr{this}, target_mark] {
12,900✔
1401
        REALM_ASSERT(self->m_actualized);
12,900✔
1402
        // The session wrapper may already have been finalized. This can only
1403
        // happen if it was abandoned, but in that case, the call of
1404
        // wait_for_upload_complete_or_client_stopped() must have returned
1405
        // already.
1406
        if (REALM_UNLIKELY(!self->m_sess))
12,900✔
1407
            return;
22✔
1408
        if (target_mark > self->m_staged_upload_mark) {
12,878✔
1409
            self->m_staged_upload_mark = target_mark;
12,878✔
1410
            SessionImpl& sess = *self->m_sess;
12,878✔
1411
            sess.request_upload_completion_notification(); // Throws
12,878✔
1412
        }
12,878✔
1413
    }); // Throws
12,878✔
1414

1415
    bool completion_condition_was_satisfied;
12,900✔
1416
    {
12,900✔
1417
        util::CheckedUniqueLock lock{m_client.m_mutex};
12,900✔
1418
        m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
32,584✔
1419
            return m_reached_upload_mark >= target_mark || m_client.m_stopped;
32,584✔
1420
        });
32,584✔
1421
        completion_condition_was_satisfied = !m_client.m_stopped;
12,900✔
1422
    }
12,900✔
1423
    return completion_condition_was_satisfied;
12,900✔
1424
}
12,900✔
1425

1426

1427
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1428
{
9,988✔
1429
    // Thread safety required
1430
    REALM_ASSERT(!m_abandoned);
9,988✔
1431

1432
    std::int_fast64_t target_mark;
9,988✔
1433
    {
9,988✔
1434
        util::CheckedLockGuard lock{m_client.m_mutex};
9,988✔
1435
        target_mark = ++m_target_download_mark;
9,988✔
1436
    }
9,988✔
1437

1438
    m_client.post([self = util::bind_ptr{this}, target_mark] {
9,988✔
1439
        REALM_ASSERT(self->m_actualized);
9,988✔
1440
        // The session wrapper may already have been finalized. This can only
1441
        // happen if it was abandoned, but in that case, the call of
1442
        // wait_for_download_complete_or_client_stopped() must have returned
1443
        // already.
1444
        if (REALM_UNLIKELY(!self->m_sess))
9,988✔
1445
            return;
60✔
1446
        if (target_mark > self->m_staged_download_mark) {
9,928✔
1447
            self->m_staged_download_mark = target_mark;
9,928✔
1448
            SessionImpl& sess = *self->m_sess;
9,928✔
1449
            sess.request_download_completion_notification(); // Throws
9,928✔
1450
        }
9,928✔
1451
    }); // Throws
9,928✔
1452

1453
    bool completion_condition_was_satisfied;
9,988✔
1454
    {
9,988✔
1455
        util::CheckedUniqueLock lock{m_client.m_mutex};
9,988✔
1456
        m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
20,270✔
1457
            return m_reached_download_mark >= target_mark || m_client.m_stopped;
20,270✔
1458
        });
20,270✔
1459
        completion_condition_was_satisfied = !m_client.m_stopped;
9,988✔
1460
    }
9,988✔
1461
    return completion_condition_was_satisfied;
9,988✔
1462
}
9,988✔
1463

1464

1465
void SessionWrapper::refresh(std::string_view signed_access_token)
1466
{
216✔
1467
    // Thread safety required
1468
    REALM_ASSERT(!m_abandoned);
216✔
1469

1470
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
216✔
1471
        REALM_ASSERT(self->m_actualized);
216✔
1472
        if (REALM_UNLIKELY(!self->m_sess))
216✔
1473
            return; // Already finalized
4✔
1474
        self->m_signed_access_token = std::move(token);
212✔
1475
        SessionImpl& sess = *self->m_sess;
212✔
1476
        ClientImpl::Connection& conn = sess.get_connection();
212✔
1477
        // FIXME: This only makes sense when each session uses a separate connection.
1478
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
212✔
1479
        sess.cancel_resumption_delay();                                                          // Throws
212✔
1480
        conn.cancel_reconnect_delay();                                                           // Throws
212✔
1481
    });
212✔
1482
}
216✔
1483

1484

1485
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1486
{
10,104✔
1487
    ClientImpl& client = wrapper->m_client;
10,104✔
1488
    client.register_abandoned_session_wrapper(std::move(wrapper));
10,104✔
1489
}
10,104✔
1490

1491

1492
// Must be called from event loop thread
1493
void SessionWrapper::actualize()
1494
{
10,032✔
1495
    // actualize() can only ever be called once
1496
    REALM_ASSERT(!m_actualized);
10,032✔
1497
    REALM_ASSERT(!m_sess);
10,032✔
1498
    // The client should have removed this wrapper from those pending
1499
    // actualization if it called force_close() or finalize_before_actualize()
1500
    REALM_ASSERT(!m_finalized);
10,032✔
1501
    REALM_ASSERT(!m_closed);
10,032✔
1502

1503
    m_actualized = true;
10,032✔
1504

1505
    ScopeExitFail close_on_error([&]() noexcept {
10,032✔
1506
        m_closed = true;
4✔
1507
    });
4✔
1508

1509
    m_db->claim_sync_agent();
10,032✔
1510
    m_db->add_commit_listener(this);
10,032✔
1511
    ScopeExitFail remove_commit_listener([&]() noexcept {
10,032✔
NEW
1512
        m_db->remove_commit_listener(this);
×
NEW
1513
    });
×
1514

1515
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,032✔
1516
                            m_user_id,           m_sync_mode,      m_server_verified};
10,032✔
1517
    bool was_created = false;
10,032✔
1518
    ClientImpl::Connection& conn = m_client.get_connection(
10,032✔
1519
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,032✔
1520
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,032✔
1521
        was_created); // Throws
10,032✔
1522
    ScopeExitFail remove_connection([&]() noexcept {
10,032✔
UNCOV
1523
        if (was_created)
×
1524
            m_client.remove_connection(conn);
×
NEW
1525
    });
×
1526

1527
    // FIXME: This only makes sense when each session uses a separate connection.
1528
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,032✔
1529
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,032✔
1530
    if (m_sync_mode == SyncServerMode::FLX) {
10,032✔
1531
        m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,494✔
1532
    }
1,494✔
1533

1534
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,032✔
1535
    m_sess = sess.get();
10,032✔
1536
    ScopeExitFail clear_sess([&]() noexcept {
10,032✔
NEW
1537
        m_sess = nullptr;
×
NEW
1538
    });
×
1539
    conn.activate_session(std::move(sess)); // Throws
10,032✔
1540

1541
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1542
    // session cannot change the state of the bootstrap store at the same time.
1543
    update_subscription_version_info();
10,032✔
1544

1545
    if (was_created)
10,032✔
1546
        conn.activate(); // Throws
2,782✔
1547

1548
    if (m_connection_state_change_listener) {
10,032✔
1549
        ConnectionState state = conn.get_state();
10,020✔
1550
        if (state != ConnectionState::disconnected) {
10,020✔
1551
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,096✔
1552
            if (state == ConnectionState::connected)
7,096✔
1553
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,360✔
1554
        }
7,096✔
1555
    }
10,020✔
1556

1557
    if (!m_client_reset_config)
10,032✔
1558
        on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
9,652✔
1559
}
10,032✔
1560

1561
void SessionWrapper::force_close()
1562
{
10,122✔
1563
    if (m_closed) {
10,122✔
1564
        return;
106✔
1565
    }
106✔
1566
    REALM_ASSERT(m_actualized);
10,016✔
1567
    REALM_ASSERT(m_sess);
10,016✔
1568
    m_closed = true;
10,016✔
1569

1570
    ClientImpl::Connection& conn = m_sess->get_connection();
10,016✔
1571
    conn.initiate_session_deactivation(m_sess); // Throws
10,016✔
1572

1573
    // We need to keep the DB open until finalization, but we no longer want to
1574
    // know when commits are made
1575
    m_db->remove_commit_listener(this);
10,016✔
1576

1577
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1578
    m_flx_pending_bootstrap_store.reset();
10,016✔
1579
    // Clear the subscription and migration store refs since they are owned by SyncSession
1580
    m_flx_subscription_store.reset();
10,016✔
1581
    m_migration_store.reset();
10,016✔
1582
    m_sess = nullptr;
10,016✔
1583
    // Everything is being torn down, no need to report connection state anymore
1584
    m_connection_state_change_listener = {};
10,016✔
1585
}
10,016✔
1586

1587
// Must be called from event loop thread
1588
//
1589
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
1590
// have been acquired at some point in between the final read or write ever made
1591
// from a different thread and when this is called.
1592
void SessionWrapper::finalize()
1593
{
10,020✔
1594
    REALM_ASSERT(m_actualized);
10,020✔
1595
    REALM_ASSERT(m_abandoned);
10,020✔
1596
    REALM_ASSERT(!m_finalized);
10,020✔
1597

1598
    force_close();
10,020✔
1599

1600
    m_finalized = true;
10,020✔
1601

1602
    // The Realm file can be closed now, as no access to the Realm file is
1603
    // supposed to happen on behalf of a session after initiation of
1604
    // deactivation.
1605
    m_db->release_sync_agent();
10,020✔
1606
    m_db = nullptr;
10,020✔
1607

1608
    // All outstanding wait operations must be canceled
1609
    while (!m_upload_completion_handlers.empty()) {
10,458✔
1610
        auto handler = std::move(m_upload_completion_handlers.back());
438✔
1611
        m_upload_completion_handlers.pop_back();
438✔
1612
        handler(
438✔
1613
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
438✔
1614
    }
438✔
1615
    while (!m_download_completion_handlers.empty()) {
10,240✔
1616
        auto handler = std::move(m_download_completion_handlers.back());
220✔
1617
        m_download_completion_handlers.pop_back();
220✔
1618
        handler(
220✔
1619
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
220✔
1620
    }
220✔
1621
    while (!m_sync_completion_handlers.empty()) {
10,032✔
1622
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1623
        m_sync_completion_handlers.pop_back();
12✔
1624
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
12✔
1625
    }
12✔
1626
}
10,020✔
1627

1628

1629
// Must be called only when an unactualized session wrapper becomes abandoned.
1630
//
1631
// Called with a lock on `m_client.m_mutex`.
1632
inline void SessionWrapper::finalize_before_actualization() noexcept
1633
{
72✔
1634
    REALM_ASSERT(!m_finalized);
72✔
1635
    REALM_ASSERT(!m_sess);
72✔
1636
    m_actualized = true;
72✔
1637
    m_finalized = true;
72✔
1638
    m_closed = true;
72✔
1639
    m_db->remove_commit_listener(this);
72✔
1640
    m_db->release_sync_agent();
72✔
1641
    m_db = nullptr;
72✔
1642
}
72✔
1643

1644
inline void SessionWrapper::on_upload_progress(bool only_if_new_uploadable_data)
1645
{
154,724✔
1646
    REALM_ASSERT(!m_finalized);
154,724✔
1647
    report_progress(/* is_download = */ false, only_if_new_uploadable_data); // Throws
154,724✔
1648
}
154,724✔
1649

1650
inline void SessionWrapper::on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
1651
{
26,534✔
1652
    REALM_ASSERT(!m_finalized);
26,534✔
1653
    m_bootstrap_store_bytes = bootstrap_store_bytes;
26,534✔
1654
    report_progress(/* is_download = */ true); // Throws
26,534✔
1655
}
26,534✔
1656

1657

1658
void SessionWrapper::on_upload_completion()
1659
{
14,794✔
1660
    REALM_ASSERT(!m_finalized);
14,794✔
1661
    while (!m_upload_completion_handlers.empty()) {
16,742✔
1662
        auto handler = std::move(m_upload_completion_handlers.back());
1,948✔
1663
        m_upload_completion_handlers.pop_back();
1,948✔
1664
        handler(Status::OK()); // Throws
1,948✔
1665
    }
1,948✔
1666
    while (!m_sync_completion_handlers.empty()) {
14,990✔
1667
        auto handler = std::move(m_sync_completion_handlers.back());
196✔
1668
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
196✔
1669
        m_sync_completion_handlers.pop_back();
196✔
1670
    }
196✔
1671
    util::CheckedLockGuard lock{m_client.m_mutex};
14,794✔
1672
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,794✔
1673
        m_reached_upload_mark = m_staged_upload_mark;
12,860✔
1674
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,860✔
1675
    }
12,860✔
1676
}
14,794✔
1677

1678

1679
void SessionWrapper::on_download_completion()
1680
{
16,108✔
1681
    while (!m_download_completion_handlers.empty()) {
18,544✔
1682
        auto handler = std::move(m_download_completion_handlers.back());
2,436✔
1683
        m_download_completion_handlers.pop_back();
2,436✔
1684
        handler(Status::OK()); // Throws
2,436✔
1685
    }
2,436✔
1686
    while (!m_sync_completion_handlers.empty()) {
16,206✔
1687
        auto handler = std::move(m_sync_completion_handlers.back());
98✔
1688
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
98✔
1689
        m_sync_completion_handlers.pop_back();
98✔
1690
    }
98✔
1691

1692
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
16,108✔
1693
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
876✔
1694
                             m_flx_pending_mark_version);
876✔
1695
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
876✔
1696
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
876✔
1697
    }
876✔
1698

1699
    util::CheckedLockGuard lock{m_client.m_mutex};
16,108✔
1700
    if (m_staged_download_mark > m_reached_download_mark) {
16,108✔
1701
        m_reached_download_mark = m_staged_download_mark;
9,858✔
1702
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,858✔
1703
    }
9,858✔
1704
}
16,108✔
1705

1706

1707
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1708
{
668✔
1709
    REALM_ASSERT(!m_finalized);
668✔
1710
    m_suspended = true;
668✔
1711
    if (m_connection_state_change_listener) {
668✔
1712
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
668✔
1713
    }
668✔
1714
}
668✔
1715

1716

1717
void SessionWrapper::on_resumed()
1718
{
60✔
1719
    REALM_ASSERT(!m_finalized);
60✔
1720
    m_suspended = false;
60✔
1721
    if (m_connection_state_change_listener) {
60✔
1722
        ClientImpl::Connection& conn = m_sess->get_connection();
60✔
1723
        if (conn.get_state() != ConnectionState::disconnected) {
60✔
1724
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
52✔
1725
            if (conn.get_state() == ConnectionState::connected)
52✔
1726
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
48✔
1727
        }
52✔
1728
    }
60✔
1729
}
60✔
1730

1731

1732
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1733
                                                 const std::optional<SessionErrorInfo>& error_info)
1734
{
12,002✔
1735
    if (m_connection_state_change_listener && !m_suspended) {
12,002✔
1736
        m_connection_state_change_listener(state, error_info); // Throws
11,968✔
1737
    }
11,968✔
1738
}
12,002✔
1739

1740
void SessionWrapper::init_progress_handler()
1741
{
10,324✔
1742
    uint64_t unused = 0;
10,324✔
1743
    ClientHistory::get_upload_download_bytes(m_db.get(), m_reported_progress.final_downloaded, unused,
10,324✔
1744
                                             m_reported_progress.final_uploaded, unused, unused);
10,324✔
1745
}
10,324✔
1746

1747
void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadable_data)
1748
{
181,258✔
1749
    REALM_ASSERT(!m_finalized);
181,258✔
1750
    REALM_ASSERT(m_sess);
181,258✔
1751
    REALM_ASSERT(!(only_if_new_uploadable_data && is_download));
181,258✔
1752

1753
    if (!m_progress_handler)
181,258✔
1754
        return;
126,794✔
1755

1756
    // Ignore progress messages from before we first receive a DOWNLOAD message
1757
    if (!m_reliable_download_progress)
54,464✔
1758
        return;
27,436✔
1759

1760
    ReportedProgress p = m_reported_progress;
27,028✔
1761
    ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, p.downloadable, p.uploaded, p.uploadable,
27,028✔
1762
                                             p.snapshot);
27,028✔
1763

1764
    // If this progress notification was triggered by a commit being made we
1765
    // only want to send it if the uploadable bytes has actually increased,
1766
    // and not if it was an empty commit.
1767
    if (only_if_new_uploadable_data && m_reported_progress.uploadable == p.uploadable)
27,028✔
1768
        return;
18,334✔
1769

1770
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1771
    // is only the remaining to download. This is confusing, so make them use
1772
    // the same units.
1773
    p.downloadable += p.downloaded;
8,694✔
1774

1775
    bool is_completed = false;
8,694✔
1776
    if (is_download) {
8,694✔
1777
        if (m_download_estimate)
3,378✔
1778
            is_completed = *m_download_estimate >= 1.0;
1,304✔
1779
        else
2,074✔
1780
            is_completed = p.downloaded == p.downloadable;
2,074✔
1781
    }
3,378✔
1782
    else {
5,316✔
1783
        is_completed = p.uploaded == p.uploadable;
5,316✔
1784
    }
5,316✔
1785

1786
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
9,510✔
1787
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
9,510✔
1788
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
9,510✔
1789

1790
        // The effect of this calculation is that if new bytes are added for download/upload,
1791
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1792
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1793
        // before progress has reached 1.0.
1794
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1795
        // Example for upload progress reported:
1796
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1797

1798
        double progress_estimate = 1.0;
9,510✔
1799
        if (final_transferred < transferable && transferred < transferable)
9,510✔
1800
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
3,964✔
1801
        return progress_estimate;
9,510✔
1802
    };
9,510✔
1803

1804
    double upload_estimate = 1.0, download_estimate = 1.0;
8,694✔
1805

1806
    // calculate estimate for both download/upload since the progress is reported all at once
1807
    if (!is_completed || is_download)
8,694✔
1808
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, p.final_uploaded);
5,838✔
1809

1810
    // download estimate only known for flx
1811
    if (m_download_estimate) {
8,694✔
1812
        download_estimate = *m_download_estimate;
2,978✔
1813

1814
        // ... bootstrap store bytes should be null after initial sync when every changeset integrated immediately
1815
        if (m_bootstrap_store_bytes)
2,978✔
1816
            p.downloaded += *m_bootstrap_store_bytes;
196✔
1817

1818
        // FIXME for flx with download estimate these bytes are not known
1819
        // provide some sensible value for non-streaming version of object-store callbacks
1820
        // until these field are completely removed from the api after pbs deprecation
1821
        p.downloadable = p.downloaded;
2,978✔
1822
        if (0.01 <= download_estimate && download_estimate <= 0.99)
2,978✔
1823
            if (p.downloaded > p.final_downloaded)
160✔
1824
                p.downloadable =
160✔
1825
                    p.final_downloaded + uint64_t((p.downloaded - p.final_downloaded) / download_estimate);
160✔
1826
    }
2,978✔
1827
    else {
5,716✔
1828
        if (!is_completed || !is_download)
5,716✔
1829
            download_estimate = calculate_progress(p.downloaded, p.downloadable, p.final_downloaded);
3,672✔
1830
    }
5,716✔
1831

1832
    if (is_completed) {
8,694✔
1833
        if (is_download)
6,004✔
1834
            p.final_downloaded = p.downloaded;
3,150✔
1835
        else
2,854✔
1836
            p.final_uploaded = p.uploaded;
2,854✔
1837
    }
6,004✔
1838

1839
    m_reported_progress = p;
8,694✔
1840

1841
    if (m_sess->logger.would_log(Logger::Level::debug)) {
8,694✔
1842
        auto to_str = [](double d) {
16,860✔
1843
            std::ostringstream ss;
16,860✔
1844
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1845
            ss << std::fixed << std::setprecision(4) << d;
16,860✔
1846
            return ss.str();
16,860✔
1847
        };
16,860✔
1848
        m_sess->logger.debug(
8,430✔
1849
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
8,430✔
1850
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
8,430✔
1851
            p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
8,430✔
1852
            to_str(upload_estimate), p.snapshot, m_flx_active_version);
8,430✔
1853
    }
8,430✔
1854

1855
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
8,694✔
1856
                       upload_estimate, m_flx_last_seen_version);
8,694✔
1857
}
8,694✔
1858

1859
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1860
{
60✔
1861
    if (!m_sess) {
60✔
1862
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1863
    }
×
1864

1865
    return m_sess->send_test_command(std::move(body));
60✔
1866
}
60✔
1867

1868
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1869
{
314✔
1870
    REALM_ASSERT(!m_finalized);
314✔
1871

1872
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
314✔
1873
    REALM_ASSERT(pending_reset);
314✔
1874
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
314✔
1875
                        pending_reset->time);
314✔
1876
    async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
314✔
1877
        if (status == ErrorCodes::OperationAborted) {
314✔
1878
            return;
138✔
1879
        }
138✔
1880
        auto& logger = self->m_sess->logger;
176✔
1881
        if (!status.is_ok()) {
176✔
1882
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1883
            return;
×
1884
        }
×
1885

1886
        auto wt = self->m_db->start_write();
176✔
1887
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
176✔
1888
        if (!cur_pending_reset) {
176✔
1889
            logger.debug(
×
1890
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1891
                pending_reset.type, pending_reset.time);
×
1892
            return;
×
1893
        }
×
1894
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
176✔
1895
            logger.debug(
×
1896
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
1897
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
1898
        }
×
1899
        else {
176✔
1900
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
176✔
1901
                         "Removing cycle detection tracker.",
176✔
1902
                         pending_reset.type, pending_reset.time);
176✔
1903
        }
176✔
1904
        _impl::client_reset::remove_pending_client_resets(*wt);
176✔
1905
        wt->commit();
176✔
1906
    });
176✔
1907
}
314✔
1908

1909
void SessionWrapper::update_subscription_version_info()
1910
{
10,324✔
1911
    if (!m_flx_subscription_store)
10,324✔
1912
        return;
8,718✔
1913
    auto versions_info = m_flx_subscription_store->get_version_info();
1,606✔
1914
    m_flx_active_version = versions_info.active;
1,606✔
1915
    m_flx_pending_mark_version = versions_info.pending_mark;
1,606✔
1916
}
1,606✔
1917

1918
std::string SessionWrapper::get_appservices_connection_id()
1919
{
72✔
1920
    auto pf = util::make_promise_future<std::string>();
72✔
1921

1922
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
72✔
1923
        if (!status.is_ok()) {
72✔
1924
            promise.set_error(status);
×
1925
            return;
×
1926
        }
×
1927

1928
        if (!self->m_sess) {
72✔
1929
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1930
            return;
×
1931
        }
×
1932

1933
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1934
    });
72✔
1935

1936
    return pf.future.get();
72✔
1937
}
72✔
1938

1939
// ################ ClientImpl::Connection ################
1940

1941
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1942
                                   const std::string& authorization_header_name,
1943
                                   const std::map<std::string, std::string>& custom_http_headers,
1944
                                   bool verify_servers_ssl_certificate,
1945
                                   Optional<std::string> ssl_trust_certificate_path,
1946
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1947
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1948
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1,324✔
1949
                                                      client.logger_ptr)} // Throws
1,324✔
1950
    , logger{*logger_ptr}
1,324✔
1951
    , m_client{client}
1,324✔
1952
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,324✔
1953
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,324✔
1954
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,324✔
1955
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,324✔
1956
    , m_reconnect_info{reconnect_info}
1,324✔
1957
    , m_ident{ident}
1,324✔
1958
    , m_server_endpoint{std::move(endpoint)}
1,324✔
1959
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,324✔
1960
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,324✔
1961
{
2,782✔
1962
    m_on_idle = m_client.create_trigger([this](Status status) {
2,782✔
1963
        if (status == ErrorCodes::OperationAborted)
2,776✔
1964
            return;
×
1965
        else if (!status.is_ok())
2,776✔
1966
            throw Exception(status);
×
1967

1968
        REALM_ASSERT(m_activated);
2,776✔
1969
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,778✔
1970
            on_idle(); // Throws
2,770✔
1971
            // Connection object may be destroyed now.
1972
        }
2,770✔
1973
    });
2,776✔
1974
}
2,782✔
1975

1976
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1977
{
12✔
1978
    return m_ident;
12✔
1979
}
12✔
1980

1981

1982
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1983
{
2,770✔
1984
    return m_server_endpoint;
2,770✔
1985
}
2,770✔
1986

1987
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1988
                                                        const std::string& signed_access_token)
1989
{
10,240✔
1990
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,240✔
1991
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,240✔
1992
}
10,240✔
1993

1994

1995
void ClientImpl::Connection::resume_active_sessions()
1996
{
1,952✔
1997
    auto handler = [=](ClientImpl::Session& sess) {
3,900✔
1998
        sess.cancel_resumption_delay(); // Throws
3,900✔
1999
    };
3,900✔
2000
    for_each_active_session(std::move(handler)); // Throws
1,952✔
2001
}
1,952✔
2002

2003
void ClientImpl::Connection::on_idle()
2004
{
2,770✔
2005
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,770✔
2006
    ClientImpl& client = get_client();
2,770✔
2007
    client.remove_connection(*this);
2,770✔
2008
    // NOTE: This connection object is now destroyed!
2009
}
2,770✔
2010

2011

2012
std::string ClientImpl::Connection::get_http_request_path() const
2013
{
3,770✔
2014
    using namespace std::string_view_literals;
3,770✔
2015
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,770✔
2016

2017
    std::string path;
3,770✔
2018
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,770✔
2019
    path += m_http_request_path_prefix;
3,770✔
2020
    path += param;
3,770✔
2021
    path += m_signed_access_token;
3,770✔
2022

2023
    return path;
3,770✔
2024
}
3,770✔
2025

2026

2027
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
2028
{
2,780✔
2029
    return util::format("Connection[%1] ", ident);
2,780✔
2030
}
2,780✔
2031

2032

2033
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
2034
                                                            std::optional<SessionErrorInfo> error_info)
2035
{
11,104✔
2036
    if (m_force_closed) {
11,104✔
2037
        return;
2,402✔
2038
    }
2,402✔
2039
    auto handler = [=](ClientImpl::Session& sess) {
11,846✔
2040
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
11,846✔
2041
        sess_2.on_connection_state_changed(state, error_info); // Throws
11,846✔
2042
    };
11,846✔
2043
    for_each_active_session(std::move(handler)); // Throws
8,702✔
2044
}
8,702✔
2045

2046

2047
Client::Client(Config config)
2048
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,890✔
2049
{
9,918✔
2050
}
9,918✔
2051

2052

2053
Client::Client(Client&& client) noexcept
2054
    : m_impl{std::move(client.m_impl)}
2055
{
×
2056
}
×
2057

2058

2059
Client::~Client() noexcept {}
9,918✔
2060

2061

2062
void Client::shutdown() noexcept
2063
{
10,000✔
2064
    m_impl->shutdown();
10,000✔
2065
}
10,000✔
2066

2067
void Client::shutdown_and_wait()
2068
{
768✔
2069
    m_impl->shutdown_and_wait();
768✔
2070
}
768✔
2071

2072
void Client::cancel_reconnect_delay()
2073
{
1,956✔
2074
    m_impl->cancel_reconnect_delay();
1,956✔
2075
}
1,956✔
2076

2077
void Client::voluntary_disconnect_all_connections()
2078
{
12✔
2079
    m_impl->voluntary_disconnect_all_connections();
12✔
2080
}
12✔
2081

2082
bool Client::wait_for_session_terminations_or_client_stopped()
2083
{
9,488✔
2084
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,488✔
2085
}
9,488✔
2086

2087
util::Future<void> Client::notify_session_terminated()
2088
{
56✔
2089
    return m_impl->notify_session_terminated();
56✔
2090
}
56✔
2091

2092
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2093
                                  port_type& port, std::string& path) const
2094
{
4,016✔
2095
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
4,016✔
2096
}
4,016✔
2097

2098

2099
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2100
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2101
{
10,104✔
2102
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,104✔
2103
                                std::move(config)}; // Throws
10,104✔
2104
}
10,104✔
2105

2106

2107
void Session::nonsync_transact_notify(version_type new_version)
2108
{
17,708✔
2109
    m_impl->on_commit(new_version); // Throws
17,708✔
2110
}
17,708✔
2111

2112

2113
void Session::cancel_reconnect_delay()
2114
{
20✔
2115
    m_impl->cancel_reconnect_delay(); // Throws
20✔
2116
}
20✔
2117

2118

2119
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2120
{
4,856✔
2121
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,856✔
2122
}
4,856✔
2123

2124

2125
bool Session::wait_for_upload_complete_or_client_stopped()
2126
{
12,900✔
2127
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,900✔
2128
}
12,900✔
2129

2130

2131
bool Session::wait_for_download_complete_or_client_stopped()
2132
{
9,988✔
2133
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
9,988✔
2134
}
9,988✔
2135

2136

2137
void Session::refresh(std::string_view signed_access_token)
2138
{
216✔
2139
    m_impl->refresh(signed_access_token); // Throws
216✔
2140
}
216✔
2141

2142

2143
void Session::abandon() noexcept
2144
{
10,104✔
2145
    REALM_ASSERT(m_impl);
10,104✔
2146
    // Reabsorb the ownership assigned to the applications naked pointer by
2147
    // Session constructor
2148
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,104✔
2149
    SessionWrapper::abandon(std::move(wrapper));
10,104✔
2150
}
10,104✔
2151

2152
util::Future<std::string> Session::send_test_command(std::string body)
2153
{
60✔
2154
    return m_impl->send_test_command(std::move(body));
60✔
2155
}
60✔
2156

2157
std::string Session::get_appservices_connection_id()
2158
{
72✔
2159
    return m_impl->get_appservices_connection_id();
72✔
2160
}
72✔
2161

2162
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2163
{
×
2164
    switch (proxyType) {
×
2165
        case ProxyConfig::Type::HTTP:
×
2166
            return os << "HTTP";
×
2167
        case ProxyConfig::Type::HTTPS:
×
2168
            return os << "HTTPS";
×
2169
    }
×
2170
    REALM_TERMINATE("Invalid Proxy Type object.");
2171
}
×
2172

2173
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc