• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2470

03 Jul 2024 10:12PM UTC coverage: 90.985% (+0.001%) from 90.984%
2470

push

Evergreen

web-flow
RCORE-2185 Sync client should steal file ident of fresh realm when performing client reset (#7850)

* Initial changes to use the file ident from the fresh realm during client reset

* Fixed failing realm_sync_test tests

* Don't send UPLOAD Messages while downloading fresh realm

* Allow sending QUERY bootstrap for fresh download sessions

* Added SHARED_GROUP_FRESH_PATH to generate path for fresh realm

* Removed SHARED_GROUP_FRESH_PATH and used session_reason setting instead

* Some cleanup after tests passing

* Added test to verify no UPLOAD messages are sent during fresh realm download

* Use is_fresh_path to determine if hook event called by client reset fresh realm download session

* Fixed tsan failure around REQUIRE() within hook event callback in flx_migration test

* Updates from review and streamlined changes based on recommendations

* Reverted some test changes that are no longer needed

* Updated logic for when to perform a client reset diff

* Updated fresh realm download to update upload progress but not send upload messages

* Removed has_client_reset_config flag in favor of get_cliet_reset_config()

* Updats from the review - renamed m_allow_uploads to m_delay_uploads

* Updated assert

* Updated test to start with file ident, added comment about client reset and no file ident

* Updated comment for m_delay_uploads

102308 of 180462 branches covered (56.69%)

140 of 147 new or added lines in 10 files covered. (95.24%)

68 existing lines in 12 files now uncovered.

215214 of 236538 relevant lines covered (90.98%)

5920016.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.98
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    void update_subscription_version_info();
108

109
    // Can be called from any thread.
110
    std::string get_appservices_connection_id();
111

112
    // Can be called from any thread, but inherently cannot be called
113
    // concurrently with calls to any of the other non-confined functions.
114
    bool mark_abandoned();
115

116
private:
117
    ClientImpl& m_client;
118
    DBRef m_db;
119
    Replication* m_replication;
120

121
    const ProtocolEnvelope m_protocol_envelope;
122
    const std::string m_server_address;
123
    const port_type m_server_port;
124
    const bool m_server_verified;
125
    const std::string m_user_id;
126
    const SyncServerMode m_sync_mode;
127
    const std::string m_authorization_header_name;
128
    const std::map<std::string, std::string> m_custom_http_headers;
129
    const bool m_verify_servers_ssl_certificate;
130
    const bool m_simulate_integration_error;
131
    const std::optional<std::string> m_ssl_trust_certificate_path;
132
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
133
    const size_t m_flx_bootstrap_batch_size_bytes;
134
    const std::string m_http_request_path_prefix;
135
    const std::string m_virt_path;
136
    const std::optional<ProxyConfig> m_proxy_config;
137

138
    // This one is different from null when, and only when the session wrapper
139
    // is in ClientImpl::m_abandoned_session_wrappers.
140
    SessionWrapper* m_next = nullptr;
141

142
    // These may only be accessed by the event loop thread.
143
    std::string m_signed_access_token;
144
    std::optional<ClientReset> m_client_reset_config;
145

146
    struct ReportedProgress {
147
        uint64_t snapshot;
148
        uint64_t uploaded;
149
        uint64_t uploadable;
150
        uint64_t downloaded;
151
        uint64_t downloadable;
152

153
        // Does not check snapshot
154
        bool operator==(const ReportedProgress& p) const noexcept
155
        {
21,502✔
156
            return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
21,502✔
157
                   downloadable == p.downloadable;
21,502✔
158
        }
21,502✔
159
    };
160
    std::optional<ReportedProgress> m_reported_progress;
161
    uint64_t m_final_uploaded = 0;
162
    uint64_t m_final_downloaded = 0;
163

164
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
165
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
166

167
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
168
    bool m_in_debug_hook = false;
169

170
    const SessionReason m_session_reason;
171

172
    // If false, QUERY and MARK messages are allowed but UPLOAD messages will not
173
    // be sent to the server.
174
    const bool m_allow_upload_messages;
175

176
    const uint64_t m_schema_version;
177

178
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
179
    int64_t m_flx_active_version = 0;
180
    int64_t m_flx_last_seen_version = 0;
181
    int64_t m_flx_pending_mark_version = 0;
182
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
183

184
    std::shared_ptr<MigrationStore> m_migration_store;
185

186
    // Set to true when this session wrapper is actualized (i.e. the wrapped
187
    // session is created), or when the wrapper is finalized before actualization.
188
    // It is then never modified again.
189
    //
190
    // Actualization is scheduled during the construction of SessionWrapper, and
191
    // so a session specific post handler will always find that `m_actualized`
192
    // is true as the handler will always be run after the actualization job.
193
    // This holds even if the wrapper is finalized or closed before actualization.
194
    bool m_actualized = false;
195

196
    // Set to true when session deactivation is begun, either via force_close()
197
    // or finalize().
198
    bool m_closed = false;
199

200
    // Set to true in on_suspended() and then false in on_resumed(). Used to
201
    // suppress spurious connection state and error reporting while the session
202
    // is already in an error state.
203
    bool m_suspended = false;
204

205
    // Set when the session has been abandoned. After this point none of the
206
    // public API functions should be called again.
207
    bool m_abandoned = false;
208
    // Has the SessionWrapper been finalized?
209
    bool m_finalized = false;
210

211
    // Set to true when the first DOWNLOAD message is received to indicate that
212
    // the byte-level download progress parameters can be considered reasonable
213
    // reliable. Before that, a lot of time may have passed, so our record of
214
    // the download progress is likely completely out of date.
215
    bool m_reliable_download_progress = false;
216

217
    // Set to point to an activated session object during actualization of the
218
    // session wrapper. Set to null during finalization of the session
219
    // wrapper. Both modifications are guaranteed to be performed by the event
220
    // loop thread.
221
    //
222
    // If a session specific post handler, that is submitted after the
223
    // initiation of the session wrapper, sees that `m_sess` is null, it can
224
    // conclude that the session wrapper has either been force closed or has
225
    // been both abandoned and finalized.
226
    //
227
    // Must only be accessed from the event loop thread.
228
    SessionImpl* m_sess = nullptr;
229

230
    // These must only be accessed from the event loop thread.
231
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
232
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
233
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
234

235
    version_type m_upload_completion_requested_version = -1;
236

237
    void on_download_completion();
238
    void on_suspended(const SessionErrorInfo& error_info);
239
    void on_resumed();
240
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
241
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
242
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
243
    void on_flx_sync_version_complete(int64_t version);
244

245
    void init_progress_handler();
246
    void check_progress();
247
    void report_progress(ReportedProgress& p, DownloadableProgress downloadable);
248
    void report_upload_completion(version_type);
249

250
    friend class SessionWrapperStack;
251
    friend class ClientImpl::Session;
252
};
253

254

255
// ################ SessionWrapperStack ################
256

257
inline bool SessionWrapperStack::empty() const noexcept
258
{
19,868✔
259
    return !m_back;
19,868✔
260
}
19,868✔
261

262

263
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
264
{
20,204✔
265
    REALM_ASSERT(!w->m_next);
20,204✔
266
    w->m_next = m_back;
20,204✔
267
    m_back = w.release();
20,204✔
268
}
20,204✔
269

270

271
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
272
{
59,738✔
273
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
59,738✔
274
    if (m_back) {
59,738✔
275
        m_back = m_back->m_next;
20,134✔
276
        w->m_next = nullptr;
20,134✔
277
    }
20,134✔
278
    return w;
59,738✔
279
}
59,738✔
280

281

282
inline void SessionWrapperStack::clear() noexcept
283
{
19,868✔
284
    while (m_back) {
19,868✔
285
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
286
        m_back = w->m_next;
×
287
    }
×
288
}
19,868✔
289

290

291
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
292
{
10,132✔
293
    SessionWrapper** p = &m_back;
10,132✔
294
    while (*p && *p != w) {
10,290✔
295
        p = &(*p)->m_next;
158✔
296
    }
158✔
297
    if (!*p) {
10,132✔
298
        return false;
10,068✔
299
    }
10,068✔
300
    *p = w->m_next;
64✔
301
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
64✔
302
    return true;
64✔
303
}
10,132✔
304

305

306
SessionWrapperStack::~SessionWrapperStack()
307
{
19,868✔
308
    clear();
19,868✔
309
}
19,868✔
310

311

312
// ################ ClientImpl ################
313

314
ClientImpl::~ClientImpl()
315
{
9,934✔
316
    // Since no other thread is allowed to be accessing this client or any of
317
    // its subobjects at this time, no mutex locking is necessary.
318

319
    shutdown_and_wait();
9,934✔
320
    // Session wrappers are removed from m_unactualized_session_wrappers as they
321
    // are abandoned.
322
    REALM_ASSERT(m_stopped);
9,934✔
323
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,934✔
324
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
9,934✔
325
}
9,934✔
326

327

328
void ClientImpl::cancel_reconnect_delay()
329
{
1,740✔
330
    // Thread safety required
331
    post([this] {
1,740✔
332
        for (auto& p : m_server_slots) {
1,740✔
333
            ServerSlot& slot = p.second;
1,740✔
334
            if (m_one_connection_per_session) {
1,740✔
335
                REALM_ASSERT(!slot.connection);
×
336
                for (const auto& p : slot.alt_connections) {
×
337
                    ClientImpl::Connection& conn = *p.second;
×
338
                    conn.resume_active_sessions(); // Throws
×
339
                    conn.cancel_reconnect_delay(); // Throws
×
340
                }
×
341
            }
×
342
            else {
1,740✔
343
                REALM_ASSERT(slot.alt_connections.empty());
1,740✔
344
                if (slot.connection) {
1,740✔
345
                    ClientImpl::Connection& conn = *slot.connection;
1,736✔
346
                    conn.resume_active_sessions(); // Throws
1,736✔
347
                    conn.cancel_reconnect_delay(); // Throws
1,736✔
348
                }
1,736✔
349
                else {
4✔
350
                    slot.reconnect_info.reset();
4✔
351
                }
4✔
352
            }
1,740✔
353
        }
1,740✔
354
    }); // Throws
1,740✔
355
}
1,740✔
356

357

358
void ClientImpl::voluntary_disconnect_all_connections()
359
{
12✔
360
    auto done_pf = util::make_promise_future<void>();
12✔
361
    post([this, promise = std::move(done_pf.promise)]() mutable {
12✔
362
        try {
12✔
363
            for (auto& p : m_server_slots) {
12✔
364
                ServerSlot& slot = p.second;
12✔
365
                if (m_one_connection_per_session) {
12✔
366
                    REALM_ASSERT(!slot.connection);
×
367
                    for (const auto& [_, conn] : slot.alt_connections) {
×
368
                        conn->voluntary_disconnect();
×
369
                    }
×
370
                }
×
371
                else {
12✔
372
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
373
                    if (slot.connection) {
12✔
374
                        slot.connection->voluntary_disconnect();
12✔
375
                    }
12✔
376
                }
12✔
377
            }
12✔
378
        }
12✔
379
        catch (...) {
12✔
380
            promise.set_error(exception_to_status());
×
381
            return;
×
382
        }
×
383
        promise.emplace_value();
12✔
384
    });
12✔
385
    done_pf.future.get();
12✔
386
}
12✔
387

388

389
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
390
{
9,568✔
391
    // Thread safety required
392

393
    {
9,568✔
394
        util::CheckedLockGuard lock{m_mutex};
9,568✔
395
        m_sessions_terminated = false;
9,568✔
396
    }
9,568✔
397

398
    // The technique employed here relies on the fact that
399
    // actualize_and_finalize_session_wrappers() must get to execute at least
400
    // once before the post handler submitted below gets to execute, but still
401
    // at a time where all session wrappers, that are abandoned prior to the
402
    // execution of wait_for_session_terminations_or_client_stopped(), have been
403
    // added to `m_abandoned_session_wrappers`.
404
    //
405
    // To see that this is the case, consider a session wrapper that was
406
    // abandoned before wait_for_session_terminations_or_client_stopped() was
407
    // invoked. Then the session wrapper will have been added to
408
    // `m_abandoned_session_wrappers`, and an invocation of
409
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
410
    // guarantees mentioned in the documentation of Trigger then ensure
411
    // that at least one execution of actualize_and_finalize_session_wrappers()
412
    // will happen after the session wrapper has been added to
413
    // `m_abandoned_session_wrappers`, but before the post handler submitted
414
    // below gets to execute.
415
    post([this] {
9,568✔
416
        {
9,568✔
417
            util::CheckedLockGuard lock{m_mutex};
9,568✔
418
            m_sessions_terminated = true;
9,568✔
419
        }
9,568✔
420
        m_wait_or_client_stopped_cond.notify_all();
9,568✔
421
    }); // Throws
9,568✔
422

423
    bool completion_condition_was_satisfied;
9,568✔
424
    {
9,568✔
425
        util::CheckedUniqueLock lock{m_mutex};
9,568✔
426
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
19,134✔
427
            return m_sessions_terminated || m_stopped;
19,134✔
428
        });
19,134✔
429
        completion_condition_was_satisfied = !m_stopped;
9,568✔
430
    }
9,568✔
431
    return completion_condition_was_satisfied;
9,568✔
432
}
9,568✔
433

434

435
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
436
util::Future<void> ClientImpl::notify_session_terminated()
437
{
56✔
438
    auto pf = util::make_promise_future<void>();
56✔
439
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
440
        // Includes operation_aborted
441
        if (!status.is_ok()) {
56✔
442
            promise.set_error(status);
×
443
            return;
×
444
        }
×
445

446
        promise.emplace_value();
56✔
447
    });
56✔
448

449
    return std::move(pf.future);
56✔
450
}
56✔
451

452
void ClientImpl::drain_connections_on_loop()
453
{
9,934✔
454
    post([this](Status status) {
9,934✔
455
        REALM_ASSERT(status.is_ok());
9,934✔
456
        drain_connections();
9,934✔
457
    });
9,934✔
458
}
9,934✔
459

460
void ClientImpl::shutdown_and_wait()
461
{
10,706✔
462
    shutdown();
10,706✔
463
    util::CheckedUniqueLock lock{m_drain_mutex};
10,706✔
464
    if (m_drained) {
10,706✔
465
        return;
772✔
466
    }
772✔
467

468
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,934✔
469
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
15,824✔
470
        return m_num_connections == 0 && m_outstanding_posts == 0;
15,824✔
471
    });
15,824✔
472

473
    m_drained = true;
9,934✔
474
}
9,934✔
475

476
void ClientImpl::shutdown() noexcept
477
{
20,722✔
478
    {
20,722✔
479
        util::CheckedLockGuard lock{m_mutex};
20,722✔
480
        if (m_stopped)
20,722✔
481
            return;
10,788✔
482
        m_stopped = true;
9,934✔
483
    }
9,934✔
484
    m_wait_or_client_stopped_cond.notify_all();
×
485

486
    drain_connections_on_loop();
9,934✔
487
}
9,934✔
488

489

490
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
491
{
10,134✔
492
    // Thread safety required.
493
    {
10,134✔
494
        util::CheckedLockGuard lock{m_mutex};
10,134✔
495
        // We can't actualize the session if we've already been stopped, so
496
        // just finalize it immediately.
497
        if (m_stopped) {
10,134✔
498
            wrapper->finalize_before_actualization();
×
499
            return;
×
500
        }
×
501

502
        REALM_ASSERT(m_actualize_and_finalize);
10,134✔
503
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
10,134✔
504
    }
10,134✔
505
    m_actualize_and_finalize->trigger();
×
506
}
10,134✔
507

508

509
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
510
{
10,132✔
511
    // Thread safety required.
512
    {
10,132✔
513
        util::CheckedLockGuard lock{m_mutex};
10,132✔
514
        REALM_ASSERT(m_actualize_and_finalize);
10,132✔
515
        // The wrapper may have already been finalized before being abandoned
516
        // if we were stopped when it was created.
517
        if (wrapper->mark_abandoned())
10,132✔
518
            return;
4✔
519

520
        // If the session wrapper has not yet been actualized (on the event loop
521
        // thread), it can be immediately finalized. This ensures that we will
522
        // generally not actualize a session wrapper that has already been
523
        // abandoned.
524
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
10,128✔
525
            wrapper->finalize_before_actualization();
64✔
526
            return;
64✔
527
        }
64✔
528
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,064✔
529
    }
10,064✔
530
    m_actualize_and_finalize->trigger();
×
531
}
10,064✔
532

533

534
// Must be called from the event loop thread.
535
void ClientImpl::actualize_and_finalize_session_wrappers()
536
{
14,772✔
537
    // We need to pop from the wrapper stacks while holding the lock to ensure
538
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
539
    // release the lock before finalizing or actualizing because those functions
540
    // invoke user callbacks which may try to access the client and reacquire
541
    // the lock.
542
    //
543
    // Finalization must always happen before actualization because we may be
544
    // finalizing and actualizing sessions for the same Realm file, and
545
    // actualizing first would result in overlapping sessions. Because we're
546
    // releasing the lock new sessions may come in as we're looping, so we need
547
    // a single loop that checks both fields.
548
    while (true) {
34,908✔
549
        bool finalize = true;
34,904✔
550
        bool stopped;
34,904✔
551
        util::bind_ptr<SessionWrapper> wrapper;
34,904✔
552
        {
34,904✔
553
            util::CheckedLockGuard lock{m_mutex};
34,904✔
554
            wrapper = m_abandoned_session_wrappers.pop();
34,904✔
555
            if (!wrapper) {
34,904✔
556
                wrapper = m_unactualized_session_wrappers.pop();
24,836✔
557
                finalize = false;
24,836✔
558
            }
24,836✔
559
            stopped = m_stopped;
34,904✔
560
        }
34,904✔
561
        if (!wrapper)
34,904✔
562
            break;
14,768✔
563
        if (finalize)
20,136✔
564
            wrapper->finalize(); // Throws
10,068✔
565
        else if (stopped)
10,068✔
566
            wrapper->finalize_before_actualization();
4✔
567
        else
10,064✔
568
            wrapper->actualize(); // Throws
10,064✔
569
    }
20,136✔
570
}
14,772✔
571

572

573
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
574
                                                   const std::string& authorization_header_name,
575
                                                   const std::map<std::string, std::string>& custom_http_headers,
576
                                                   bool verify_servers_ssl_certificate,
577
                                                   Optional<std::string> ssl_trust_certificate_path,
578
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
579
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
580
{
10,062✔
581
    auto&& [server_slot_it, inserted] =
10,062✔
582
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,062✔
583
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,062✔
584

585
    // TODO: enable multiplexing with proxies
586
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,062✔
587
        // Use preexisting connection
588
        REALM_ASSERT(server_slot.alt_connections.empty());
7,272✔
589
        return *server_slot.connection;
7,272✔
590
    }
7,272✔
591

592
    // Create a new connection
593
    REALM_ASSERT(!server_slot.connection);
2,790✔
594
    connection_ident_type ident = m_prev_connection_ident + 1;
2,790✔
595
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,790✔
596
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,790✔
597
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,790✔
598
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,790✔
599
    ClientImpl::Connection& conn = *conn_2;
2,790✔
600
    if (!m_one_connection_per_session) {
2,790✔
601
        server_slot.connection = std::move(conn_2);
2,780✔
602
    }
2,780✔
603
    else {
10✔
604
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
10✔
605
    }
10✔
606
    m_prev_connection_ident = ident;
2,790✔
607
    was_created = true;
2,790✔
608
    {
2,790✔
609
        util::CheckedLockGuard lk(m_drain_mutex);
2,790✔
610
        ++m_num_connections;
2,790✔
611
    }
2,790✔
612
    return conn;
2,790✔
613
}
10,062✔
614

615

616
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
617
{
2,790✔
618
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,790✔
619
    auto i = m_server_slots.find(endpoint);
2,790✔
620
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,790✔
621
    ServerSlot& server_slot = i->second;
2,790✔
622
    if (!m_one_connection_per_session) {
2,790✔
623
        REALM_ASSERT(server_slot.alt_connections.empty());
2,774✔
624
        REALM_ASSERT(&*server_slot.connection == &conn);
2,774✔
625
        server_slot.reconnect_info = conn.get_reconnect_info();
2,774✔
626
        server_slot.connection.reset();
2,774✔
627
    }
2,774✔
628
    else {
16✔
629
        REALM_ASSERT(!server_slot.connection);
16✔
630
        connection_ident_type ident = conn.get_ident();
16✔
631
        auto j = server_slot.alt_connections.find(ident);
16✔
632
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
16✔
633
        REALM_ASSERT(&*j->second == &conn);
16✔
634
        server_slot.alt_connections.erase(j);
16✔
635
    }
16✔
636

637
    bool notify;
2,790✔
638
    {
2,790✔
639
        util::CheckedLockGuard lk(m_drain_mutex);
2,790✔
640
        REALM_ASSERT(m_num_connections);
2,790✔
641
        notify = --m_num_connections <= 0;
2,790✔
642
    }
2,790✔
643
    if (notify) {
2,790✔
644
        m_drain_cv.notify_all();
2,180✔
645
    }
2,180✔
646
}
2,790✔
647

648

649
// ################ SessionImpl ################
650

651
void SessionImpl::force_close()
652
{
102✔
653
    // Allow force_close() if session is active or hasn't been activated yet.
654
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102!
655
        m_wrapper.force_close();
102✔
656
    }
102✔
657
}
102✔
658

659
void SessionImpl::on_connection_state_changed(ConnectionState state,
660
                                              const std::optional<SessionErrorInfo>& error_info)
661
{
11,490✔
662
    // Only used to report errors back to the SyncSession while the Session is active
663
    if (m_state == SessionImpl::Active) {
11,490✔
664
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
11,490✔
665
    }
11,490✔
666
}
11,490✔
667

668

669
const std::string& SessionImpl::get_virt_path() const noexcept
670
{
6,964✔
671
    // Can only be called if the session is active or being activated
672
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
6,964!
673
    return m_wrapper.m_virt_path;
6,964✔
674
}
6,964✔
675

676
const std::string& SessionImpl::get_realm_path() const noexcept
677
{
10,358✔
678
    // Can only be called if the session is active or being activated
679
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,358✔
680
    return m_wrapper.m_db->get_path();
10,358✔
681
}
10,358✔
682

683
DBRef SessionImpl::get_db() const noexcept
684
{
25,526✔
685
    // Can only be called if the session is active or being activated
686
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
25,526✔
687
    return m_wrapper.m_db;
25,526✔
688
}
25,526✔
689

690
ClientReplication& SessionImpl::get_repl() const noexcept
691
{
119,618✔
692
    // Can only be called if the session is active or being activated
693
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
119,618✔
694
    return m_wrapper.get_replication();
119,618✔
695
}
119,618✔
696

697
ClientHistory& SessionImpl::get_history() const noexcept
698
{
117,676✔
699
    return get_repl().get_history();
117,676✔
700
}
117,676✔
701

702
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
703
{
103,692✔
704
    // Can only be called if the session is active or being activated
705
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
103,692✔
706
    return m_wrapper.m_client_reset_config;
103,692✔
707
}
103,692✔
708

709
SessionReason SessionImpl::get_session_reason() noexcept
710
{
1,478✔
711
    // Can only be called if the session is active or being activated
712
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,478!
713
    return m_wrapper.m_session_reason;
1,478✔
714
}
1,478✔
715

716
uint64_t SessionImpl::get_schema_version() noexcept
717
{
1,478✔
718
    // Can only be called if the session is active or being activated
719
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,478✔
720
    return m_wrapper.m_schema_version;
1,478✔
721
}
1,478✔
722

723
bool SessionImpl::upload_messages_allowed() noexcept
724
{
68,848✔
725
    // Can only be called if the session is active or being activated
726
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
68,848✔
727
    return m_wrapper.m_allow_upload_messages;
68,848✔
728
}
68,848✔
729

730
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
731
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
732
{
44,926✔
733
    // Ignore the call if the session is not active
734
    if (m_state != State::Active) {
44,926✔
735
        return;
×
736
    }
×
737

738
    try {
44,926✔
739
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
44,926!
740
        if (simulate_integration_error) {
44,926✔
741
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
742
        }
×
743
        version_type client_version;
44,926✔
744
        if (REALM_LIKELY(!get_client().is_dry_run())) {
44,926✔
745
            VersionInfo version_info;
44,926✔
746
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
44,926✔
747
            client_version = version_info.realm_version;
44,926✔
748
        }
44,926✔
UNCOV
749
        else {
×
750
            // Fake it for "dry run" mode
UNCOV
751
            client_version = m_last_version_available + 1;
×
UNCOV
752
        }
×
753
        on_changesets_integrated(client_version, progress); // Throws
44,926✔
754
    }
44,926✔
755
    catch (const IntegrationException& e) {
44,926✔
756
        on_integration_failure(e);
24✔
757
    }
24✔
758
}
44,926✔
759

760

761
void SessionImpl::on_download_completion()
762
{
16,132✔
763
    // Ignore the call if the session is not active
764
    if (m_state == State::Active) {
16,132✔
765
        m_wrapper.on_download_completion(); // Throws
16,132✔
766
    }
16,132✔
767
}
16,132✔
768

769

770
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
771
{
670✔
772
    // Ignore the call if the session is not active
773
    if (m_state == State::Active) {
670✔
774
        m_wrapper.on_suspended(error_info); // Throws
670✔
775
    }
670✔
776
}
670✔
777

778

779
void SessionImpl::on_resumed()
780
{
58✔
781
    // Ignore the call if the session is not active
782
    if (m_state == State::Active) {
58✔
783
        m_wrapper.on_resumed(); // Throws
58✔
784
    }
58✔
785
}
58✔
786

787
void SessionImpl::handle_pending_client_reset_acknowledgement()
788
{
10,356✔
789
    // Ignore the call if the session is not active
790
    if (m_state == State::Active) {
10,358✔
791
        m_wrapper.handle_pending_client_reset_acknowledgement();
10,358✔
792
    }
10,358✔
793
}
10,356✔
794

795
void SessionImpl::update_subscription_version_info()
796
{
296✔
797
    // Ignore the call if the session is not active
798
    if (m_state == State::Active) {
296✔
799
        m_wrapper.update_subscription_version_info();
296✔
800
    }
296✔
801
}
296✔
802

803
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
804
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
805
{
47,044✔
806
    // Ignore the call if the session is not active
807
    if (m_state != State::Active) {
47,044✔
808
        return false;
×
809
    }
×
810

811
    if (is_steady_state_download_message(batch_state, query_version)) {
47,044✔
812
        return false;
44,926✔
813
    }
44,926✔
814

815
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,118✔
816
    std::optional<SyncProgress> maybe_progress;
2,118✔
817
    if (batch_state == DownloadBatchState::LastInBatch) {
2,118✔
818
        maybe_progress = progress;
1,934✔
819
    }
1,934✔
820

821
    bool new_batch = false;
2,118✔
822
    try {
2,118✔
823
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
2,118✔
824
    }
2,118✔
825
    catch (const LogicError& ex) {
2,118✔
826
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
827
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
828
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
829
                                    ProtocolError::bad_changeset_size);
×
830
            on_integration_failure(ex);
×
831
            return true;
×
832
        }
×
833
        throw;
×
834
    }
×
835

836
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
837
    // bootstrapping.
838
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
2,118✔
839
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
48✔
840
    }
48✔
841

842
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
2,118✔
843
                                       batch_state, received_changesets.size());
2,118✔
844
    if (hook_action == SyncClientHookAction::EarlyReturn) {
2,118✔
845
        return true;
12✔
846
    }
12✔
847
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
2,106✔
848

849
    if (batch_state == DownloadBatchState::MoreToCome) {
2,106✔
850
        return true;
180✔
851
    }
180✔
852

853
    try {
1,926✔
854
        process_pending_flx_bootstrap();
1,926✔
855
    }
1,926✔
856
    catch (const IntegrationException& e) {
1,926✔
857
        on_integration_failure(e);
12✔
858
    }
12✔
859
    catch (...) {
1,926✔
860
        on_integration_failure(IntegrationException(exception_to_status()));
×
861
    }
×
862

863
    return true;
1,926✔
864
}
1,926✔
865

866

867
void SessionImpl::process_pending_flx_bootstrap()
868
{
11,988✔
869
    // Ignore the call if not a flx session or session is not active
870
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,988✔
871
        return;
8,556✔
872
    }
8,556✔
873
    // Should never be called if session is not active
874
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,432✔
875
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,432✔
876
    if (!bootstrap_store->has_pending()) {
3,432✔
877
        return;
1,486✔
878
    }
1,486✔
879

880
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,946✔
881
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,946✔
882
                "changeset size: %3)",
1,946✔
883
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,946✔
884
                pending_batch_stats.pending_changeset_bytes);
1,946✔
885
    auto& history = get_repl().get_history();
1,946✔
886
    VersionInfo new_version;
1,946✔
887
    SyncProgress progress;
1,946✔
888
    int64_t query_version = -1;
1,946✔
889
    size_t changesets_processed = 0;
1,946✔
890

891
    // Used to commit each batch after it was transformed.
892
    TransactionRef transact = get_db()->start_write();
1,946✔
893
    while (bootstrap_store->has_pending()) {
4,032✔
894
        auto start_time = std::chrono::steady_clock::now();
2,098✔
895
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,098✔
896
        if (!pending_batch.progress) {
2,098✔
897
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
898
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
899
            // bootstrap store requires a write transaction itself.
900
            transact->close();
8✔
901
            bootstrap_store->clear();
8✔
902
            return;
8✔
903
        }
8✔
904

905
        auto batch_state =
2,090✔
906
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,090✔
907
        uint64_t downloadable_bytes = 0;
2,090✔
908
        query_version = pending_batch.query_version;
2,090✔
909
        bool simulate_integration_error =
2,090✔
910
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,090✔
911
        if (simulate_integration_error) {
2,090✔
912
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
913
        }
4✔
914

915
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,086✔
916
                        batch_state, pending_batch.changesets.size());
2,086✔
917

918
        history.integrate_server_changesets(
2,086✔
919
            *pending_batch.progress, downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
2,086✔
920
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,086✔
921
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,074✔
922
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,074✔
923
            });
2,074✔
924
        progress = *pending_batch.progress;
2,086✔
925
        changesets_processed += pending_batch.changesets.size();
2,086✔
926
        auto duration = std::chrono::steady_clock::now() - start_time;
2,086✔
927

928
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,086✔
929
                                      batch_state, pending_batch.changesets.size());
2,086✔
930
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,086✔
931

932
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,086✔
933
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,086✔
934
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
2,086✔
935
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
2,086✔
936
                    pending_batch.remaining_changesets);
2,086✔
937
    }
2,086✔
938

939
    REALM_ASSERT_3(query_version, !=, -1);
1,934✔
940
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,934✔
941

942
    on_changesets_integrated(new_version.realm_version, progress);
1,934✔
943
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,934✔
944
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,934✔
945
    // NoAction/EarlyReturn are both valid no-op actions to take here.
946
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,934✔
947
}
1,934✔
948

949
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
950
{
20✔
951
    // Ignore the call if the session is not active
952
    if (m_state == State::Active) {
20✔
953
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
954
    }
20✔
955
}
20✔
956

957
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
958
{
1,970✔
959
    // Ignore the call if the session is not active
960
    if (m_state == State::Active) {
1,970✔
961
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,970✔
962
    }
1,970✔
963
}
1,970✔
964

965
SubscriptionStore* SessionImpl::get_flx_subscription_store()
966
{
18,226✔
967
    // Should never be called if session is not active
968
    REALM_ASSERT_EX(m_state == State::Active, m_state);
18,226✔
969
    return m_wrapper.get_flx_subscription_store();
18,226✔
970
}
18,226✔
971

972
MigrationStore* SessionImpl::get_migration_store()
973
{
67,002✔
974
    // Should never be called if session is not active
975
    REALM_ASSERT_EX(m_state == State::Active, m_state);
67,002✔
976
    return m_wrapper.get_migration_store();
67,002✔
977
}
67,002✔
978

979
void SessionImpl::on_flx_sync_version_complete(int64_t version)
980
{
300✔
981
    // Ignore the call if the session is not active
982
    if (m_state == State::Active) {
300✔
983
        m_wrapper.on_flx_sync_version_complete(version);
300✔
984
    }
300✔
985
}
300✔
986

987
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
988
{
8,342✔
989
    // Should never be called if session is not active
990
    REALM_ASSERT_EX(m_state == State::Active, m_state);
8,342✔
991

992
    // Make sure we don't call the debug hook recursively.
993
    if (m_wrapper.m_in_debug_hook) {
8,342✔
994
        return SyncClientHookAction::NoAction;
24✔
995
    }
24✔
996
    m_wrapper.m_in_debug_hook = true;
8,318✔
997
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
8,318✔
998
        m_wrapper.m_in_debug_hook = false;
8,318✔
999
    });
8,318✔
1000

1001
    auto action = m_wrapper.m_debug_hook(data);
8,318✔
1002
    switch (action) {
8,318✔
1003
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1004
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1005
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1006

1007
            auto err_processing_err = receive_error_message(err_info);
12✔
1008
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1009
            return SyncClientHookAction::EarlyReturn;
12✔
1010
        }
×
1011
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1012
            get_connection().voluntary_disconnect();
24✔
1013
            return SyncClientHookAction::EarlyReturn;
24✔
1014
        }
×
1015
        default:
8,274✔
1016
            return action;
8,274✔
1017
    }
8,318✔
1018
}
8,318✔
1019

1020
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1021
                                                  int64_t query_version, DownloadBatchState batch_state,
1022
                                                  size_t num_changesets)
1023
{
176,968✔
1024
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
176,968✔
1025
        return SyncClientHookAction::NoAction;
169,104✔
1026
    }
169,104✔
1027
    if (REALM_UNLIKELY(m_state != State::Active)) {
7,864✔
1028
        return SyncClientHookAction::NoAction;
×
1029
    }
×
1030

1031
    SyncClientHookData data;
7,864✔
1032
    data.event = event;
7,864✔
1033
    data.batch_state = batch_state;
7,864✔
1034
    data.progress = progress;
7,864✔
1035
    data.num_changesets = num_changesets;
7,864✔
1036
    data.query_version = query_version;
7,864✔
1037

1038
    return call_debug_hook(data);
7,864✔
1039
}
7,864✔
1040

1041
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1042
{
1,384✔
1043
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,384✔
1044
        return SyncClientHookAction::NoAction;
912✔
1045
    }
912✔
1046
    if (REALM_UNLIKELY(m_state != State::Active)) {
472✔
1047
        return SyncClientHookAction::NoAction;
×
1048
    }
×
1049

1050
    SyncClientHookData data;
472✔
1051
    data.event = event;
472✔
1052
    data.batch_state = DownloadBatchState::SteadyState;
472✔
1053
    data.progress = m_progress;
472✔
1054
    data.num_changesets = 0;
472✔
1055
    data.query_version = 0;
472✔
1056
    data.error_info = &error_info;
472✔
1057

1058
    return call_debug_hook(data);
472✔
1059
}
472✔
1060

1061
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
1062
{
76,786✔
1063
    return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
76,786✔
1064
}
76,786✔
1065

1066
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1067
{
94,102✔
1068
    // Should never be called if session is not active
1069
    REALM_ASSERT_EX(m_state == State::Active, m_state);
94,102✔
1070
    if (batch_state == DownloadBatchState::SteadyState) {
94,102✔
1071
        return true;
44,926✔
1072
    }
44,926✔
1073

1074
    if (!m_is_flx_sync_session) {
49,176✔
1075
        return true;
43,638✔
1076
    }
43,638✔
1077

1078
    // If this is a steady state DOWNLOAD, no need for special handling.
1079
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
5,538✔
1080
        return true;
1,288✔
1081
    }
1,288✔
1082

1083
    return false;
4,250✔
1084
}
5,538✔
1085

1086
void SessionImpl::init_progress_handler()
1087
{
10,356✔
1088
    REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
10,356✔
1089
    m_wrapper.init_progress_handler();
10,356✔
1090
}
10,356✔
1091

1092
void SessionImpl::enable_progress_notifications()
1093
{
45,438✔
1094
    m_wrapper.m_reliable_download_progress = true;
45,438✔
1095
}
45,438✔
1096

1097
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1098
{
60✔
1099
    if (m_state != State::Active) {
60✔
1100
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1101
    }
×
1102

1103
    try {
60✔
1104
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1105
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1106
            return Status{ErrorCodes::LogicError,
4✔
1107
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1108
        }
4✔
1109
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
1110
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1111
        }
×
1112
    }
56✔
1113
    catch (const nlohmann::json::parse_error& e) {
60✔
1114
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1115
    }
4✔
1116

1117
    auto pf = util::make_promise_future<std::string>();
52✔
1118
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1119
        // Includes operation_aborted
1120
        if (!status.is_ok()) {
52✔
1121
            promise.set_error(status);
×
1122
            return;
×
1123
        }
×
1124

1125
        auto id = ++m_last_pending_test_command_ident;
52✔
1126
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1127
        ensure_enlisted_to_send();
52✔
1128
    });
52✔
1129

1130
    return std::move(pf.future);
52✔
1131
}
60✔
1132

1133
// ################ SessionWrapper ################
1134

1135
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1136
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1137
// the ClientImpl::Connection that owns the ClientImpl::Session.
1138
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1139
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
1140
    : m_client{client}
4,894✔
1141
    , m_db(std::move(db))
4,894✔
1142
    , m_replication(m_db->get_replication())
4,894✔
1143
    , m_protocol_envelope{config.protocol_envelope}
4,894✔
1144
    , m_server_address{std::move(config.server_address)}
4,894✔
1145
    , m_server_port{config.server_port}
4,894✔
1146
    , m_server_verified{config.server_verified}
4,894✔
1147
    , m_user_id(std::move(config.user_id))
4,894✔
1148
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
4,894✔
1149
    , m_authorization_header_name{config.authorization_header_name}
4,894✔
1150
    , m_custom_http_headers{std::move(config.custom_http_headers)}
4,894✔
1151
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
4,894✔
1152
    , m_simulate_integration_error{config.simulate_integration_error}
4,894✔
1153
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
4,894✔
1154
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
4,894✔
1155
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
4,894✔
1156
    , m_http_request_path_prefix{std::move(config.service_identifier)}
4,894✔
1157
    , m_virt_path{std::move(config.realm_identifier)}
4,894✔
1158
    , m_proxy_config{std::move(config.proxy_config)}
4,894✔
1159
    , m_signed_access_token{std::move(config.signed_user_token)}
4,894✔
1160
    , m_client_reset_config{std::move(config.client_reset_config)}
4,894✔
1161
    , m_progress_handler(std::move(config.progress_handler))
4,894✔
1162
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
4,894✔
1163
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
4,894✔
1164
    , m_session_reason(m_client_reset_config || config.fresh_realm_download ? SessionReason::ClientReset
4,894✔
1165
                                                                            : SessionReason::Sync)
4,894✔
1166
    , m_allow_upload_messages(!config.fresh_realm_download)
4,894✔
1167
    , m_schema_version(config.schema_version)
4,894✔
1168
    , m_flx_subscription_store(std::move(flx_sub_store))
4,894✔
1169
    , m_migration_store(std::move(migration_store))
4,894✔
1170
{
10,134✔
1171
    REALM_ASSERT(m_db);
10,134✔
1172
    REALM_ASSERT(m_db->get_replication());
10,134✔
1173
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,134✔
1174

1175
    // SessionWrapper begins at +1 retain count because Client retains and
1176
    // releases it while performing async operations, and these need to not
1177
    // take it to 0 or it could be deleted before the caller can retain it.
1178
    bind_ptr();
10,134✔
1179
    m_client.register_unactualized_session_wrapper(this);
10,134✔
1180
}
10,134✔
1181

1182
SessionWrapper::~SessionWrapper() noexcept
1183
{
10,134✔
1184
    // We begin actualization in the constructor and do not delete the wrapper
1185
    // until both the Client is done with it and the Session has abandoned it,
1186
    // so at this point we must have actualized, finalized, and been abandoned.
1187
    REALM_ASSERT(m_actualized);
10,134✔
1188
    REALM_ASSERT(m_abandoned);
10,134✔
1189
    REALM_ASSERT(m_finalized);
10,134✔
1190
    REALM_ASSERT(m_closed);
10,134✔
1191
    REALM_ASSERT(!m_db);
10,134✔
1192
}
10,134✔
1193

1194

1195
inline ClientReplication& SessionWrapper::get_replication() noexcept
1196
{
119,612✔
1197
    REALM_ASSERT(m_db);
119,612✔
1198
    return static_cast<ClientReplication&>(*m_replication);
119,612✔
1199
}
119,612✔
1200

1201

1202
inline ClientImpl& SessionWrapper::get_client() noexcept
1203
{
×
1204
    return m_client;
×
1205
}
×
1206

1207
bool SessionWrapper::has_flx_subscription_store() const
1208
{
1,970✔
1209
    return static_cast<bool>(m_flx_subscription_store);
1,970✔
1210
}
1,970✔
1211

1212
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1213
{
20✔
1214
    REALM_ASSERT(!m_finalized);
20✔
1215
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1216
}
20✔
1217

1218
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1219
{
2,222✔
1220
    REALM_ASSERT(!m_finalized);
2,222✔
1221
    m_flx_last_seen_version = version;
2,222✔
1222
    m_flx_active_version = version;
2,222✔
1223
}
2,222✔
1224

1225
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1226
{
1,970✔
1227
    if (!has_flx_subscription_store()) {
1,970✔
1228
        return;
×
1229
    }
×
1230
    REALM_ASSERT(!m_finalized);
1,970✔
1231
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,970✔
1232
    REALM_ASSERT(new_version >= m_flx_active_version);
1,970✔
1233
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,970✔
1234

1235
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,970✔
1236

1237
    switch (batch_state) {
1,970✔
1238
        case DownloadBatchState::SteadyState:
✔
1239
            // Cannot be called with this value.
1240
            REALM_UNREACHABLE();
1241
        case DownloadBatchState::LastInBatch:
1,922✔
1242
            if (m_flx_active_version == new_version) {
1,922✔
1243
                return;
×
1244
            }
×
1245
            on_flx_sync_version_complete(new_version);
1,922✔
1246
            if (new_version == 0) {
1,922✔
1247
                new_state = SubscriptionSet::State::Complete;
924✔
1248
            }
924✔
1249
            else {
998✔
1250
                new_state = SubscriptionSet::State::AwaitingMark;
998✔
1251
                m_flx_pending_mark_version = new_version;
998✔
1252
            }
998✔
1253
            break;
1,922✔
1254
        case DownloadBatchState::MoreToCome:
48✔
1255
            if (m_flx_last_seen_version == new_version) {
48✔
1256
                return;
×
1257
            }
×
1258

1259
            m_flx_last_seen_version = new_version;
48✔
1260
            new_state = SubscriptionSet::State::Bootstrapping;
48✔
1261
            break;
48✔
1262
    }
1,970✔
1263

1264
    get_flx_subscription_store()->update_state(new_version, new_state);
1,970✔
1265
}
1,970✔
1266

1267
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1268
{
20,216✔
1269
    REALM_ASSERT(!m_finalized);
20,216✔
1270
    return m_flx_subscription_store.get();
20,216✔
1271
}
20,216✔
1272

1273
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1274
{
5,550✔
1275
    REALM_ASSERT(!m_finalized);
5,550✔
1276
    return m_flx_pending_bootstrap_store.get();
5,550✔
1277
}
5,550✔
1278

1279
MigrationStore* SessionWrapper::get_migration_store()
1280
{
67,004✔
1281
    REALM_ASSERT(!m_finalized);
67,004✔
1282
    return m_migration_store.get();
67,004✔
1283
}
67,004✔
1284

1285
inline bool SessionWrapper::mark_abandoned()
1286
{
10,136✔
1287
    REALM_ASSERT(!m_abandoned);
10,136✔
1288
    m_abandoned = true;
10,136✔
1289
    return m_finalized;
10,136✔
1290
}
10,136✔
1291

1292

1293
void SessionWrapper::on_commit(version_type new_version)
1294
{
113,382✔
1295
    // Thread safety required
1296
    m_client.post([self = util::bind_ptr{this}, new_version] {
113,382✔
1297
        REALM_ASSERT(self->m_actualized);
113,380✔
1298
        if (REALM_UNLIKELY(!self->m_sess))
113,380✔
1299
            return; // Already finalized
414✔
1300
        SessionImpl& sess = *self->m_sess;
112,966✔
1301
        sess.recognize_sync_version(new_version); // Throws
112,966✔
1302
        self->check_progress();                   // Throws
112,966✔
1303
    });
112,966✔
1304
}
113,382✔
1305

1306

1307
void SessionWrapper::cancel_reconnect_delay()
1308
{
28✔
1309
    // Thread safety required
1310

1311
    m_client.post([self = util::bind_ptr{this}] {
28✔
1312
        REALM_ASSERT(self->m_actualized);
28✔
1313
        if (REALM_UNLIKELY(self->m_closed)) {
28✔
1314
            return;
×
1315
        }
×
1316

1317
        if (REALM_UNLIKELY(!self->m_sess))
28✔
1318
            return; // Already finalized
×
1319
        SessionImpl& sess = *self->m_sess;
28✔
1320
        sess.cancel_resumption_delay(); // Throws
28✔
1321
        ClientImpl::Connection& conn = sess.get_connection();
28✔
1322
        conn.cancel_reconnect_delay(); // Throws
28✔
1323
    });                                // Throws
28✔
1324
}
28✔
1325

1326
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1327
                                    WaitOperCompletionHandler handler)
1328
{
28,118✔
1329
    REALM_ASSERT(upload_completion || download_completion);
28,118✔
1330

1331
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
28,118✔
1332
                   download_completion](Status status) mutable {
28,118✔
1333
        REALM_ASSERT(self->m_actualized);
28,118✔
1334
        if (!status.is_ok()) {
28,118✔
1335
            handler(status); // Throws
×
1336
            return;
×
1337
        }
×
1338
        if (REALM_UNLIKELY(!self->m_sess)) {
28,118✔
1339
            // Already finalized
1340
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
186✔
1341
            return;
186✔
1342
        }
186✔
1343
        if (upload_completion) {
27,932✔
1344
            self->m_upload_completion_requested_version = self->m_db->get_version_of_latest_snapshot();
15,368✔
1345
            if (download_completion) {
15,368✔
1346
                // Wait for upload and download completion
1347
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
316✔
1348
            }
316✔
1349
            else {
15,052✔
1350
                // Wait for upload completion only
1351
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
15,052✔
1352
            }
15,052✔
1353
        }
15,368✔
1354
        else {
12,564✔
1355
            // Wait for download completion only
1356
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
12,564✔
1357
        }
12,564✔
1358
        SessionImpl& sess = *self->m_sess;
27,932✔
1359
        if (upload_completion)
27,932✔
1360
            self->check_progress();
15,368✔
1361
        if (download_completion)
27,932✔
1362
            sess.request_download_completion_notification(); // Throws
12,880✔
1363
    });                                                      // Throws
27,932✔
1364
}
28,118✔
1365

1366

1367
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1368
{
12,912✔
1369
    // Thread safety required
1370
    REALM_ASSERT(!m_abandoned);
12,912✔
1371

1372
    auto pf = util::make_promise_future<bool>();
12,912✔
1373
    async_wait_for(true, false, [promise = std::move(pf.promise)](Status status) mutable {
12,912✔
1374
        promise.emplace_value(status.is_ok());
12,912✔
1375
    });
12,912✔
1376
    return pf.future.get();
12,912✔
1377
}
12,912✔
1378

1379

1380
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1381
{
10,004✔
1382
    // Thread safety required
1383
    REALM_ASSERT(!m_abandoned);
10,004✔
1384

1385
    auto pf = util::make_promise_future<bool>();
10,004✔
1386
    async_wait_for(false, true, [promise = std::move(pf.promise)](Status status) mutable {
10,004✔
1387
        promise.emplace_value(status.is_ok());
10,004✔
1388
    });
10,004✔
1389
    return pf.future.get();
10,004✔
1390
}
10,004✔
1391

1392

1393
void SessionWrapper::refresh(std::string_view signed_access_token)
1394
{
216✔
1395
    // Thread safety required
1396
    REALM_ASSERT(!m_abandoned);
216✔
1397

1398
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
216✔
1399
        REALM_ASSERT(self->m_actualized);
216✔
1400
        if (REALM_UNLIKELY(!self->m_sess))
216✔
1401
            return; // Already finalized
×
1402
        self->m_signed_access_token = std::move(token);
216✔
1403
        SessionImpl& sess = *self->m_sess;
216✔
1404
        ClientImpl::Connection& conn = sess.get_connection();
216✔
1405
        // FIXME: This only makes sense when each session uses a separate connection.
1406
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
216✔
1407
        sess.cancel_resumption_delay();                                                          // Throws
216✔
1408
        conn.cancel_reconnect_delay();                                                           // Throws
216✔
1409
    });
216✔
1410
}
216✔
1411

1412

1413
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1414
{
10,136✔
1415
    ClientImpl& client = wrapper->m_client;
10,136✔
1416
    client.register_abandoned_session_wrapper(std::move(wrapper));
10,136✔
1417
}
10,136✔
1418

1419

1420
// Must be called from event loop thread
1421
void SessionWrapper::actualize()
1422
{
10,066✔
1423
    // actualize() can only ever be called once
1424
    REALM_ASSERT(!m_actualized);
10,066✔
1425
    REALM_ASSERT(!m_sess);
10,066✔
1426
    // The client should have removed this wrapper from those pending
1427
    // actualization if it called force_close() or finalize_before_actualize()
1428
    REALM_ASSERT(!m_finalized);
10,066✔
1429
    REALM_ASSERT(!m_closed);
10,066✔
1430

1431
    m_actualized = true;
10,066✔
1432

1433
    ScopeExitFail close_on_error([&]() noexcept {
10,066✔
1434
        m_closed = true;
4✔
1435
    });
4✔
1436

1437
    m_db->claim_sync_agent();
10,066✔
1438
    m_db->add_commit_listener(this);
10,066✔
1439
    ScopeExitFail remove_commit_listener([&]() noexcept {
10,066✔
1440
        m_db->remove_commit_listener(this);
×
1441
    });
×
1442

1443
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,066✔
1444
                            m_user_id,           m_sync_mode,      m_server_verified};
10,066✔
1445
    bool was_created = false;
10,066✔
1446
    ClientImpl::Connection& conn = m_client.get_connection(
10,066✔
1447
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,066✔
1448
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,066✔
1449
        was_created); // Throws
10,066✔
1450
    ScopeExitFail remove_connection([&]() noexcept {
10,066✔
1451
        if (was_created)
×
1452
            m_client.remove_connection(conn);
×
1453
    });
×
1454

1455
    // FIXME: This only makes sense when each session uses a separate connection.
1456
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,066✔
1457
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,066✔
1458
    if (m_sync_mode == SyncServerMode::FLX) {
10,066✔
1459
        m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,506✔
1460
    }
1,506✔
1461

1462
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,066✔
1463
    m_sess = sess.get();
10,066✔
1464
    ScopeExitFail clear_sess([&]() noexcept {
10,066✔
1465
        m_sess = nullptr;
×
1466
    });
×
1467
    conn.activate_session(std::move(sess)); // Throws
10,066✔
1468

1469
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1470
    // session cannot change the state of the bootstrap store at the same time.
1471
    update_subscription_version_info();
10,066✔
1472

1473
    if (was_created)
10,066✔
1474
        conn.activate(); // Throws
2,788✔
1475

1476
    if (m_connection_state_change_listener) {
10,066✔
1477
        ConnectionState state = conn.get_state();
10,052✔
1478
        if (state != ConnectionState::disconnected) {
10,052✔
1479
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,080✔
1480
            if (state == ConnectionState::connected)
7,080✔
1481
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,970✔
1482
        }
7,080✔
1483
    }
10,052✔
1484

1485
    if (!m_client_reset_config)
10,066✔
1486
        check_progress(); // Throws
9,684✔
1487
}
10,066✔
1488

1489
void SessionWrapper::force_close()
1490
{
10,168✔
1491
    if (m_closed) {
10,168✔
1492
        return;
106✔
1493
    }
106✔
1494
    REALM_ASSERT(m_actualized);
10,062✔
1495
    REALM_ASSERT(m_sess);
10,062✔
1496
    m_closed = true;
10,062✔
1497

1498
    ClientImpl::Connection& conn = m_sess->get_connection();
10,062✔
1499
    conn.initiate_session_deactivation(m_sess); // Throws
10,062✔
1500

1501
    // We need to keep the DB open until finalization, but we no longer want to
1502
    // know when commits are made
1503
    m_db->remove_commit_listener(this);
10,062✔
1504

1505
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1506
    m_flx_pending_bootstrap_store.reset();
10,062✔
1507
    // Clear the subscription and migration store refs since they are owned by SyncSession
1508
    m_flx_subscription_store.reset();
10,062✔
1509
    m_migration_store.reset();
10,062✔
1510
    m_sess = nullptr;
10,062✔
1511
    // Everything is being torn down, no need to report connection state anymore
1512
    m_connection_state_change_listener = {};
10,062✔
1513

1514
    // All outstanding wait operations must be canceled
1515
    while (!m_upload_completion_handlers.empty()) {
10,420✔
1516
        auto handler = std::move(m_upload_completion_handlers.back());
358✔
1517
        m_upload_completion_handlers.pop_back();
358✔
1518
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before upload was complete"}); // Throws
358✔
1519
    }
358✔
1520
    while (!m_download_completion_handlers.empty()) {
10,446✔
1521
        auto handler = std::move(m_download_completion_handlers.back());
384✔
1522
        m_download_completion_handlers.pop_back();
384✔
1523
        handler(
384✔
1524
            {ErrorCodes::OperationAborted, "Sync session is being closed before download was complete"}); // Throws
384✔
1525
    }
384✔
1526
    while (!m_sync_completion_handlers.empty()) {
10,074✔
1527
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1528
        m_sync_completion_handlers.pop_back();
12✔
1529
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before sync was complete"}); // Throws
12✔
1530
    }
12✔
1531
}
10,062✔
1532

1533
// Must be called from event loop thread
1534
//
1535
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
1536
// have been acquired at some point in between the final read or write ever made
1537
// from a different thread and when this is called.
1538
void SessionWrapper::finalize()
1539
{
10,066✔
1540
    REALM_ASSERT(m_actualized);
10,066✔
1541
    REALM_ASSERT(m_abandoned);
10,066✔
1542
    REALM_ASSERT(!m_finalized);
10,066✔
1543

1544
    force_close();
10,066✔
1545

1546
    m_finalized = true;
10,066✔
1547

1548
    // The Realm file can be closed now, as no access to the Realm file is
1549
    // supposed to happen on behalf of a session after initiation of
1550
    // deactivation.
1551
    m_db->release_sync_agent();
10,066✔
1552
    m_db = nullptr;
10,066✔
1553
}
10,066✔
1554

1555

1556
// Must be called only when an unactualized session wrapper becomes abandoned.
1557
//
1558
// Called with a lock on `m_client.m_mutex`.
1559
inline void SessionWrapper::finalize_before_actualization() noexcept
1560
{
68✔
1561
    REALM_ASSERT(!m_finalized);
68✔
1562
    REALM_ASSERT(!m_sess);
68✔
1563
    m_actualized = true;
68✔
1564
    m_finalized = true;
68✔
1565
    m_closed = true;
68✔
1566
    m_db->remove_commit_listener(this);
68✔
1567
    m_db->release_sync_agent();
68✔
1568
    m_db = nullptr;
68✔
1569
}
68✔
1570

1571
void SessionWrapper::on_download_completion()
1572
{
16,132✔
1573
    // Ensure that progress handlers get called before completion handlers. The
1574
    // download completing performed a commit and will trigger progress
1575
    // notifications asynchronously, but they would arrive after the download
1576
    // completion without this.
1577
    check_progress();
16,132✔
1578

1579
    while (!m_download_completion_handlers.empty()) {
28,524✔
1580
        auto handler = std::move(m_download_completion_handlers.back());
12,392✔
1581
        m_download_completion_handlers.pop_back();
12,392✔
1582
        handler(Status::OK()); // Throws
12,392✔
1583
    }
12,392✔
1584
    while (!m_sync_completion_handlers.empty()) {
16,224✔
1585
        auto handler = std::move(m_sync_completion_handlers.back());
92✔
1586
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
92✔
1587
        m_sync_completion_handlers.pop_back();
92✔
1588
    }
92✔
1589

1590
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
16,132✔
1591
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
888✔
1592
                             m_flx_pending_mark_version);
888✔
1593
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
888✔
1594
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
888✔
1595
    }
888✔
1596
}
16,132✔
1597

1598

1599
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1600
{
670✔
1601
    REALM_ASSERT(!m_finalized);
670✔
1602
    m_suspended = true;
670✔
1603
    if (m_connection_state_change_listener) {
670✔
1604
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
670✔
1605
    }
670✔
1606
}
670✔
1607

1608

1609
void SessionWrapper::on_resumed()
1610
{
58✔
1611
    REALM_ASSERT(!m_finalized);
58✔
1612
    m_suspended = false;
58✔
1613
    if (m_connection_state_change_listener) {
58✔
1614
        ClientImpl::Connection& conn = m_sess->get_connection();
58✔
1615
        if (conn.get_state() != ConnectionState::disconnected) {
58✔
1616
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
50✔
1617
            if (conn.get_state() == ConnectionState::connected)
50✔
1618
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
46✔
1619
        }
50✔
1620
    }
58✔
1621
}
58✔
1622

1623

1624
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1625
                                                 const std::optional<SessionErrorInfo>& error_info)
1626
{
11,492✔
1627
    if (m_connection_state_change_listener && !m_suspended) {
11,492✔
1628
        m_connection_state_change_listener(state, error_info); // Throws
11,460✔
1629
    }
11,460✔
1630
}
11,492✔
1631

1632
void SessionWrapper::init_progress_handler()
1633
{
10,360✔
1634
    ClientHistory::get_upload_download_state(m_db.get(), m_final_downloaded, m_final_uploaded);
10,360✔
1635
}
10,360✔
1636

1637
void SessionWrapper::check_progress()
1638
{
154,152✔
1639
    REALM_ASSERT(!m_finalized);
154,152✔
1640
    REALM_ASSERT(m_sess);
154,152✔
1641

1642
    if (!m_progress_handler && m_upload_completion_handlers.empty() && m_sync_completion_handlers.empty())
154,152✔
1643
        return;
73,688✔
1644

1645
    version_type uploaded_version;
80,464✔
1646
    ReportedProgress p;
80,464✔
1647
    DownloadableProgress downloadable;
80,464✔
1648
    ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
80,464✔
1649
                                             uploaded_version);
80,464✔
1650

1651
    report_progress(p, downloadable);
80,464✔
1652
    report_upload_completion(uploaded_version);
80,464✔
1653
}
80,464✔
1654

1655
void SessionWrapper::report_upload_completion(version_type uploaded_version)
1656
{
80,462✔
1657
    if (uploaded_version < m_upload_completion_requested_version)
80,462✔
1658
        return;
60,940✔
1659

1660
    std::move(m_sync_completion_handlers.begin(), m_sync_completion_handlers.end(),
19,522✔
1661
              std::back_inserter(m_download_completion_handlers));
19,522✔
1662
    m_sync_completion_handlers.clear();
19,522✔
1663

1664
    while (!m_upload_completion_handlers.empty()) {
34,308✔
1665
        auto handler = std::move(m_upload_completion_handlers.back());
14,786✔
1666
        m_upload_completion_handlers.pop_back();
14,786✔
1667
        handler(Status::OK()); // Throws
14,786✔
1668
    }
14,786✔
1669
}
19,522✔
1670

1671
void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress downloadable)
1672
{
80,462✔
1673
    if (!m_progress_handler)
80,462✔
1674
        return;
27,798✔
1675

1676
    // Ignore progress messages from before we first receive a DOWNLOAD message
1677
    if (!m_reliable_download_progress)
52,664✔
1678
        return;
28,618✔
1679

1680
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
24,046✔
1681
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
18,740✔
1682
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
18,740✔
1683

1684
        // The effect of this calculation is that if new bytes are added for download/upload,
1685
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1686
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1687
        // before progress has reached 1.0.
1688
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1689
        // Example for upload progress reported:
1690
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1691

1692
        double progress_estimate = 1.0;
18,740✔
1693
        if (final_transferred < transferable && transferred < transferable)
18,740✔
1694
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
9,614✔
1695
        return progress_estimate;
18,740✔
1696
    };
18,740✔
1697

1698
    bool upload_completed = p.uploaded == p.uploadable;
24,046✔
1699
    double upload_estimate = 1.0;
24,046✔
1700
    if (!upload_completed)
24,046✔
1701
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);
9,566✔
1702

1703
    bool download_completed = p.downloaded == 0;
24,046✔
1704
    double download_estimate = 1.00;
24,046✔
1705
    if (m_flx_pending_bootstrap_store) {
24,046✔
1706
        if (m_flx_pending_bootstrap_store->has_pending()) {
11,662✔
1707
            download_estimate = downloadable.as_estimate();
526✔
1708
            p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
526✔
1709
        }
526✔
1710
        download_completed = download_estimate >= 1.0;
11,662✔
1711

1712
        // for flx with download estimate these bytes are not known
1713
        // provide some sensible value for non-streaming version of object-store callbacks
1714
        // until these field are completely removed from the api after pbs deprecation
1715
        p.downloadable = p.downloaded;
11,662✔
1716
        if (download_estimate > 0 && download_estimate < 1.0 && p.downloaded > m_final_downloaded)
11,662!
1717
            p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / download_estimate);
×
1718
    }
11,662✔
1719
    else {
12,384✔
1720
        // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1721
        // is only the remaining to download. This is confusing, so make them use
1722
        // the same units.
1723
        p.downloadable = downloadable.as_bytes() + p.downloaded;
12,384✔
1724
        if (!download_completed)
12,384✔
1725
            download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
9,174✔
1726
    }
12,384✔
1727

1728
    if (download_completed)
24,046✔
1729
        m_final_downloaded = p.downloaded;
14,872✔
1730
    if (upload_completed)
24,046✔
1731
        m_final_uploaded = p.uploaded;
14,480✔
1732

1733
    if (p == m_reported_progress)
24,046✔
1734
        return;
16,846✔
1735

1736
    m_reported_progress = p;
7,200✔
1737

1738
    if (m_sess->logger.would_log(Logger::Level::debug)) {
7,200✔
1739
        auto to_str = [](double d) {
13,924✔
1740
            std::ostringstream ss;
13,924✔
1741
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1742
            ss << std::fixed << std::setprecision(4) << d;
13,924✔
1743
            return ss.str();
13,924✔
1744
        };
13,924✔
1745
        m_sess->logger.debug(
6,962✔
1746
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
6,962✔
1747
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
6,962✔
1748
            p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
6,962✔
1749
            to_str(upload_estimate), p.snapshot, m_flx_active_version);
6,962✔
1750
    }
6,962✔
1751

1752
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
7,200✔
1753
                       upload_estimate, m_flx_last_seen_version);
7,200✔
1754
}
7,200✔
1755

1756
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1757
{
60✔
1758
    if (!m_sess) {
60✔
1759
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1760
    }
×
1761

1762
    return m_sess->send_test_command(std::move(body));
60✔
1763
}
60✔
1764

1765
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1766
{
10,356✔
1767
    REALM_ASSERT(!m_finalized);
10,356✔
1768

1769
    auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
10,356✔
1770
    if (!has_pending_reset) {
10,356✔
1771
        return; // nothing to do
10,032✔
1772
    }
10,032✔
1773

1774
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
324✔
1775

1776
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
1777
    async_wait_for(
324✔
1778
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
324✔
1779
            if (status == ErrorCodes::OperationAborted) {
322✔
1780
                return;
158✔
1781
            }
158✔
1782
            auto& logger = self->m_sess->logger;
164✔
1783
            if (!status.is_ok()) {
164✔
1784
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
×
1785
                             status);
×
1786
                return;
×
1787
            }
×
1788

1789
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
164✔
1790

1791
            auto tr = self->m_db->start_write();
164✔
1792
            auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
164✔
1793
            if (!cur_pending_reset) {
164✔
1794
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
4✔
1795
                return;
4✔
1796
            }
4✔
1797
            if (*cur_pending_reset == pending_reset) {
160✔
1798
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
160✔
1799
            }
160✔
1800
            else {
×
1801
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
×
1802
            }
×
1803
            PendingResetStore::clear_pending_reset(tr);
160✔
1804
            tr->commit();
160✔
1805
        });
160✔
1806
}
324✔
1807

1808
void SessionWrapper::update_subscription_version_info()
1809
{
10,356✔
1810
    if (!m_flx_subscription_store)
10,356✔
1811
        return;
8,738✔
1812
    auto versions_info = m_flx_subscription_store->get_version_info();
1,618✔
1813
    m_flx_active_version = versions_info.active;
1,618✔
1814
    m_flx_pending_mark_version = versions_info.pending_mark;
1,618✔
1815
}
1,618✔
1816

1817
std::string SessionWrapper::get_appservices_connection_id()
1818
{
72✔
1819
    auto pf = util::make_promise_future<std::string>();
72✔
1820

1821
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
72✔
1822
        if (!status.is_ok()) {
72✔
1823
            promise.set_error(status);
×
1824
            return;
×
1825
        }
×
1826

1827
        if (!self->m_sess) {
72✔
1828
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1829
            return;
×
1830
        }
×
1831

1832
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1833
    });
72✔
1834

1835
    return pf.future.get();
72✔
1836
}
72✔
1837

1838
// ################ ClientImpl::Connection ################
1839

1840
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1841
                                   const std::string& authorization_header_name,
1842
                                   const std::map<std::string, std::string>& custom_http_headers,
1843
                                   bool verify_servers_ssl_certificate,
1844
                                   Optional<std::string> ssl_trust_certificate_path,
1845
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1846
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1847
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1,330✔
1848
                                                      client.logger_ptr)} // Throws
1,330✔
1849
    , logger{*logger_ptr}
1,330✔
1850
    , m_client{client}
1,330✔
1851
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,330✔
1852
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,330✔
1853
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,330✔
1854
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,330✔
1855
    , m_reconnect_info{reconnect_info}
1,330✔
1856
    , m_ident{ident}
1,330✔
1857
    , m_server_endpoint{std::move(endpoint)}
1,330✔
1858
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,330✔
1859
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,330✔
1860
{
2,792✔
1861
    m_on_idle = m_client.create_trigger([this](Status status) {
2,802✔
1862
        if (status == ErrorCodes::OperationAborted)
2,802✔
1863
            return;
×
1864
        else if (!status.is_ok())
2,802✔
1865
            throw Exception(status);
×
1866

1867
        REALM_ASSERT(m_activated);
2,802✔
1868
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,802✔
1869
            on_idle(); // Throws
2,792✔
1870
            // Connection object may be destroyed now.
1871
        }
2,792✔
1872
    });
2,802✔
1873
}
2,792✔
1874

1875
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1876
{
12✔
1877
    return m_ident;
12✔
1878
}
12✔
1879

1880

1881
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1882
{
2,790✔
1883
    return m_server_endpoint;
2,790✔
1884
}
2,790✔
1885

1886
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1887
                                                        const std::string& signed_access_token)
1888
{
10,280✔
1889
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,280✔
1890
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,280✔
1891
}
10,280✔
1892

1893

1894
void ClientImpl::Connection::resume_active_sessions()
1895
{
1,736✔
1896
    auto handler = [=](ClientImpl::Session& sess) {
3,468✔
1897
        sess.cancel_resumption_delay(); // Throws
3,468✔
1898
    };
3,468✔
1899
    for_each_active_session(std::move(handler)); // Throws
1,736✔
1900
}
1,736✔
1901

1902
void ClientImpl::Connection::on_idle()
1903
{
2,790✔
1904
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,790✔
1905
    ClientImpl& client = get_client();
2,790✔
1906
    client.remove_connection(*this);
2,790✔
1907
    // NOTE: This connection object is now destroyed!
1908
}
2,790✔
1909

1910

1911
std::string ClientImpl::Connection::get_http_request_path() const
1912
{
3,686✔
1913
    using namespace std::string_view_literals;
3,686✔
1914
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,686✔
1915

1916
    std::string path;
3,686✔
1917
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,686✔
1918
    path += m_http_request_path_prefix;
3,686✔
1919
    path += param;
3,686✔
1920
    path += m_signed_access_token;
3,686✔
1921

1922
    return path;
3,686✔
1923
}
3,686✔
1924

1925

1926
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1927
{
2,792✔
1928
    return util::format("Connection[%1] ", ident);
2,792✔
1929
}
2,792✔
1930

1931

1932
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1933
                                                            std::optional<SessionErrorInfo> error_info)
1934
{
10,854✔
1935
    if (m_force_closed) {
10,854✔
1936
        return;
2,426✔
1937
    }
2,426✔
1938
    auto handler = [=](ClientImpl::Session& sess) {
11,336✔
1939
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
11,336✔
1940
        sess_2.on_connection_state_changed(state, error_info); // Throws
11,336✔
1941
    };
11,336✔
1942
    for_each_active_session(std::move(handler)); // Throws
8,428✔
1943
}
8,428✔
1944

1945

1946
Client::Client(Config config)
1947
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,898✔
1948
{
9,934✔
1949
}
9,934✔
1950

1951

1952
Client::Client(Client&& client) noexcept
1953
    : m_impl{std::move(client.m_impl)}
1954
{
×
1955
}
×
1956

1957

1958
Client::~Client() noexcept {}
9,934✔
1959

1960

1961
void Client::shutdown() noexcept
1962
{
10,016✔
1963
    m_impl->shutdown();
10,016✔
1964
}
10,016✔
1965

1966
void Client::shutdown_and_wait()
1967
{
772✔
1968
    m_impl->shutdown_and_wait();
772✔
1969
}
772✔
1970

1971
void Client::cancel_reconnect_delay()
1972
{
1,740✔
1973
    m_impl->cancel_reconnect_delay();
1,740✔
1974
}
1,740✔
1975

1976
void Client::voluntary_disconnect_all_connections()
1977
{
12✔
1978
    m_impl->voluntary_disconnect_all_connections();
12✔
1979
}
12✔
1980

1981
bool Client::wait_for_session_terminations_or_client_stopped()
1982
{
9,568✔
1983
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,568✔
1984
}
9,568✔
1985

1986
util::Future<void> Client::notify_session_terminated()
1987
{
56✔
1988
    return m_impl->notify_session_terminated();
56✔
1989
}
56✔
1990

1991
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
1992
                                  port_type& port, std::string& path) const
1993
{
4,036✔
1994
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
4,036✔
1995
}
4,036✔
1996

1997

1998
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1999
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2000
{
10,136✔
2001
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,136✔
2002
                                std::move(config)}; // Throws
10,136✔
2003
}
10,136✔
2004

2005

2006
void Session::nonsync_transact_notify(version_type new_version)
2007
{
17,752✔
2008
    m_impl->on_commit(new_version); // Throws
17,752✔
2009
}
17,752✔
2010

2011

2012
void Session::cancel_reconnect_delay()
2013
{
28✔
2014
    m_impl->cancel_reconnect_delay(); // Throws
28✔
2015
}
28✔
2016

2017

2018
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2019
{
4,880✔
2020
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,880✔
2021
}
4,880✔
2022

2023

2024
bool Session::wait_for_upload_complete_or_client_stopped()
2025
{
12,912✔
2026
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,912✔
2027
}
12,912✔
2028

2029

2030
bool Session::wait_for_download_complete_or_client_stopped()
2031
{
10,004✔
2032
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,004✔
2033
}
10,004✔
2034

2035

2036
void Session::refresh(std::string_view signed_access_token)
2037
{
216✔
2038
    m_impl->refresh(signed_access_token); // Throws
216✔
2039
}
216✔
2040

2041

2042
void Session::abandon() noexcept
2043
{
10,134✔
2044
    REALM_ASSERT(m_impl);
10,134✔
2045
    // Reabsorb the ownership assigned to the applications naked pointer by
2046
    // Session constructor
2047
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,134✔
2048
    SessionWrapper::abandon(std::move(wrapper));
10,134✔
2049
}
10,134✔
2050

2051
util::Future<std::string> Session::send_test_command(std::string body)
2052
{
60✔
2053
    return m_impl->send_test_command(std::move(body));
60✔
2054
}
60✔
2055

2056
std::string Session::get_appservices_connection_id()
2057
{
72✔
2058
    return m_impl->get_appservices_connection_id();
72✔
2059
}
72✔
2060

2061
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2062
{
×
2063
    switch (proxyType) {
×
2064
        case ProxyConfig::Type::HTTP:
×
2065
            return os << "HTTP";
×
2066
        case ProxyConfig::Type::HTTPS:
×
2067
            return os << "HTTPS";
×
2068
    }
×
2069
    REALM_TERMINATE("Invalid Proxy Type object.");
2070
}
×
2071

2072
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc