• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2493

15 Jul 2024 06:38PM UTC coverage: 90.982% (+0.002%) from 90.98%
2493

push

Evergreen

web-flow
RCORE-2192 RCORE-2193 Fix FLX download progress reporting (#7870)

* Fix FLX download progress reporting

We need to store the download progress for each batch of a bootstrap and not
just at the end for it to be useful in any way.

The server will sometimes send us DOWNLOAD messages with a non-one estimate
followed by a one estimate where the byte-level information is the same (as the
final message is empty). When this happens we need to report the download
completion to the user, so add the estimate to the fields checked for changes.

A subscription change which doesn't actually change what set of objects is in
view can result in an empty DOWNLOAD message with no changes other than the
query version, and we should report that too.

* Fix a comment

* Pass the DownloadMessage to process_flx_bootstrap_message()

* Report steady-state download progress

102352 of 180586 branches covered (56.68%)

247 of 257 new or added lines in 10 files covered. (96.11%)

88 existing lines in 17 files now uncovered.

215381 of 236730 relevant lines covered (90.98%)

5726853.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.09
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    void update_subscription_version_info();
108

109
    // Can be called from any thread.
110
    std::string get_appservices_connection_id();
111

112
    // Can be called from any thread, but inherently cannot be called
113
    // concurrently with calls to any of the other non-confined functions.
114
    bool mark_abandoned();
115

116
private:
117
    ClientImpl& m_client;
118
    DBRef m_db;
119
    Replication* m_replication;
120

121
    const ProtocolEnvelope m_protocol_envelope;
122
    const std::string m_server_address;
123
    const port_type m_server_port;
124
    const bool m_server_verified;
125
    const std::string m_user_id;
126
    const SyncServerMode m_sync_mode;
127
    const std::string m_authorization_header_name;
128
    const std::map<std::string, std::string> m_custom_http_headers;
129
    const bool m_verify_servers_ssl_certificate;
130
    const bool m_simulate_integration_error;
131
    const std::optional<std::string> m_ssl_trust_certificate_path;
132
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
133
    const size_t m_flx_bootstrap_batch_size_bytes;
134
    const std::string m_http_request_path_prefix;
135
    const std::string m_virt_path;
136
    const std::optional<ProxyConfig> m_proxy_config;
137

138
    // This one is different from null when, and only when the session wrapper
139
    // is in ClientImpl::m_abandoned_session_wrappers.
140
    SessionWrapper* m_next = nullptr;
141

142
    // These may only be accessed by the event loop thread.
143
    std::string m_signed_access_token;
144
    std::optional<ClientReset> m_client_reset_config;
145

146
    struct ReportedProgress {
147
        uint64_t snapshot;
148
        uint64_t uploaded;
149
        uint64_t uploadable;
150
        uint64_t downloaded;
151
        uint64_t downloadable;
152
        int64_t query_version;
153
        double download_estimate;
154

155
        // Does not check snapshot
156
        bool operator==(const ReportedProgress& p) const noexcept
157
        {
22,862✔
158
            return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
22,862✔
159
                   downloadable == p.downloadable && query_version == p.query_version &&
22,862✔
160
                   download_estimate == p.download_estimate;
22,862✔
161
        }
22,862✔
162
    };
163
    std::optional<ReportedProgress> m_reported_progress;
164
    uint64_t m_final_uploaded = 0;
165
    uint64_t m_final_downloaded = 0;
166

167
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
168
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
169

170
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
171
    bool m_in_debug_hook = false;
172

173
    const SessionReason m_session_reason;
174

175
    // If false, QUERY and MARK messages are allowed but UPLOAD messages will not
176
    // be sent to the server.
177
    const bool m_allow_upload_messages;
178

179
    const uint64_t m_schema_version;
180

181
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
182
    int64_t m_flx_active_version = 0;
183
    int64_t m_flx_last_seen_version = 0;
184
    int64_t m_flx_pending_mark_version = 0;
185
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
186

187
    std::shared_ptr<MigrationStore> m_migration_store;
188

189
    // Set to true when this session wrapper is actualized (i.e. the wrapped
190
    // session is created), or when the wrapper is finalized before actualization.
191
    // It is then never modified again.
192
    //
193
    // Actualization is scheduled during the construction of SessionWrapper, and
194
    // so a session specific post handler will always find that `m_actualized`
195
    // is true as the handler will always be run after the actualization job.
196
    // This holds even if the wrapper is finalized or closed before actualization.
197
    bool m_actualized = false;
198

199
    // Set to true when session deactivation is begun, either via force_close()
200
    // or finalize().
201
    bool m_closed = false;
202

203
    // Set to true in on_suspended() and then false in on_resumed(). Used to
204
    // suppress spurious connection state and error reporting while the session
205
    // is already in an error state.
206
    bool m_suspended = false;
207

208
    // Set when the session has been abandoned. After this point none of the
209
    // public API functions should be called again.
210
    bool m_abandoned = false;
211
    // Has the SessionWrapper been finalized?
212
    bool m_finalized = false;
213

214
    // Set to true when the first DOWNLOAD message is received to indicate that
215
    // the byte-level download progress parameters can be considered reasonable
216
    // reliable. Before that, a lot of time may have passed, so our record of
217
    // the download progress is likely completely out of date.
218
    bool m_reliable_download_progress = false;
219

220
    // Set to point to an activated session object during actualization of the
221
    // session wrapper. Set to null during finalization of the session
222
    // wrapper. Both modifications are guaranteed to be performed by the event
223
    // loop thread.
224
    //
225
    // If a session specific post handler, that is submitted after the
226
    // initiation of the session wrapper, sees that `m_sess` is null, it can
227
    // conclude that the session wrapper has either been force closed or has
228
    // been both abandoned and finalized.
229
    //
230
    // Must only be accessed from the event loop thread.
231
    SessionImpl* m_sess = nullptr;
232

233
    // These must only be accessed from the event loop thread.
234
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
235
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
236
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
237

238
    version_type m_upload_completion_requested_version = -1;
239

240
    void on_download_completion();
241
    void on_suspended(const SessionErrorInfo& error_info);
242
    void on_resumed();
243
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
244
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
245
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
246
    void on_flx_sync_version_complete(int64_t version);
247

248
    void init_progress_handler();
249
    void check_progress();
250
    void report_progress(ReportedProgress& p, DownloadableProgress downloadable);
251
    void report_upload_completion(version_type);
252

253
    friend class SessionWrapperStack;
254
    friend class ClientImpl::Session;
255
};
256

257

258
// ################ SessionWrapperStack ################
259

260
inline bool SessionWrapperStack::empty() const noexcept
261
{
19,876✔
262
    return !m_back;
19,876✔
263
}
19,876✔
264

265

266
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
267
{
20,276✔
268
    REALM_ASSERT(!w->m_next);
20,276✔
269
    w->m_next = m_back;
20,276✔
270
    m_back = w.release();
20,276✔
271
}
20,276✔
272

273

274
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
275
{
58,770✔
276
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
58,770✔
277
    if (m_back) {
58,770✔
278
        m_back = m_back->m_next;
20,212✔
279
        w->m_next = nullptr;
20,212✔
280
    }
20,212✔
281
    return w;
58,770✔
282
}
58,770✔
283

284

285
inline void SessionWrapperStack::clear() noexcept
286
{
19,876✔
287
    while (m_back) {
19,876✔
288
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
289
        m_back = w->m_next;
×
290
    }
×
291
}
19,876✔
292

293

294
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
295
{
10,168✔
296
    SessionWrapper** p = &m_back;
10,168✔
297
    while (*p && *p != w) {
10,366✔
298
        p = &(*p)->m_next;
198✔
299
    }
198✔
300
    if (!*p) {
10,168✔
301
        return false;
10,104✔
302
    }
10,104✔
303
    *p = w->m_next;
64✔
304
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
64✔
305
    return true;
64✔
306
}
10,168✔
307

308

309
SessionWrapperStack::~SessionWrapperStack()
310
{
19,876✔
311
    clear();
19,876✔
312
}
19,876✔
313

314

315
// ################ ClientImpl ################
316

317
ClientImpl::~ClientImpl()
318
{
9,938✔
319
    // Since no other thread is allowed to be accessing this client or any of
320
    // its subobjects at this time, no mutex locking is necessary.
321

322
    shutdown_and_wait();
9,938✔
323
    // Session wrappers are removed from m_unactualized_session_wrappers as they
324
    // are abandoned.
325
    REALM_ASSERT(m_stopped);
9,938✔
326
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,938✔
327
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
9,938✔
328
}
9,938✔
329

330

331
void ClientImpl::cancel_reconnect_delay()
332
{
2,004✔
333
    // Thread safety required
334
    post([this] {
2,004✔
335
        for (auto& p : m_server_slots) {
2,004✔
336
            ServerSlot& slot = p.second;
2,004✔
337
            if (m_one_connection_per_session) {
2,004✔
338
                REALM_ASSERT(!slot.connection);
×
339
                for (const auto& p : slot.alt_connections) {
×
340
                    ClientImpl::Connection& conn = *p.second;
×
341
                    conn.resume_active_sessions(); // Throws
×
342
                    conn.cancel_reconnect_delay(); // Throws
×
343
                }
×
344
            }
×
345
            else {
2,004✔
346
                REALM_ASSERT(slot.alt_connections.empty());
2,004✔
347
                if (slot.connection) {
2,004✔
348
                    ClientImpl::Connection& conn = *slot.connection;
2,000✔
349
                    conn.resume_active_sessions(); // Throws
2,000✔
350
                    conn.cancel_reconnect_delay(); // Throws
2,000✔
351
                }
2,000✔
352
                else {
4✔
353
                    slot.reconnect_info.reset();
4✔
354
                }
4✔
355
            }
2,004✔
356
        }
2,004✔
357
    }); // Throws
2,004✔
358
}
2,004✔
359

360

361
void ClientImpl::voluntary_disconnect_all_connections()
362
{
12✔
363
    auto done_pf = util::make_promise_future<void>();
12✔
364
    post([this, promise = std::move(done_pf.promise)]() mutable {
12✔
365
        try {
12✔
366
            for (auto& p : m_server_slots) {
12✔
367
                ServerSlot& slot = p.second;
12✔
368
                if (m_one_connection_per_session) {
12✔
369
                    REALM_ASSERT(!slot.connection);
×
370
                    for (const auto& [_, conn] : slot.alt_connections) {
×
371
                        conn->voluntary_disconnect();
×
372
                    }
×
373
                }
×
374
                else {
12✔
375
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
376
                    if (slot.connection) {
12✔
377
                        slot.connection->voluntary_disconnect();
12✔
378
                    }
12✔
379
                }
12✔
380
            }
12✔
381
        }
12✔
382
        catch (...) {
12✔
383
            promise.set_error(exception_to_status());
×
384
            return;
×
385
        }
×
386
        promise.emplace_value();
12✔
387
    });
12✔
388
    done_pf.future.get();
12✔
389
}
12✔
390

391

392
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
393
{
9,572✔
394
    // Thread safety required
395

396
    {
9,572✔
397
        util::CheckedLockGuard lock{m_mutex};
9,572✔
398
        m_sessions_terminated = false;
9,572✔
399
    }
9,572✔
400

401
    // The technique employed here relies on the fact that
402
    // actualize_and_finalize_session_wrappers() must get to execute at least
403
    // once before the post handler submitted below gets to execute, but still
404
    // at a time where all session wrappers, that are abandoned prior to the
405
    // execution of wait_for_session_terminations_or_client_stopped(), have been
406
    // added to `m_abandoned_session_wrappers`.
407
    //
408
    // To see that this is the case, consider a session wrapper that was
409
    // abandoned before wait_for_session_terminations_or_client_stopped() was
410
    // invoked. Then the session wrapper will have been added to
411
    // `m_abandoned_session_wrappers`, and an invocation of
412
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
413
    // guarantees mentioned in the documentation of Trigger then ensure
414
    // that at least one execution of actualize_and_finalize_session_wrappers()
415
    // will happen after the session wrapper has been added to
416
    // `m_abandoned_session_wrappers`, but before the post handler submitted
417
    // below gets to execute.
418
    post([this] {
9,572✔
419
        {
9,572✔
420
            util::CheckedLockGuard lock{m_mutex};
9,572✔
421
            m_sessions_terminated = true;
9,572✔
422
        }
9,572✔
423
        m_wait_or_client_stopped_cond.notify_all();
9,572✔
424
    }); // Throws
9,572✔
425

426
    bool completion_condition_was_satisfied;
9,572✔
427
    {
9,572✔
428
        util::CheckedUniqueLock lock{m_mutex};
9,572✔
429
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
19,142✔
430
            return m_sessions_terminated || m_stopped;
19,142✔
431
        });
19,142✔
432
        completion_condition_was_satisfied = !m_stopped;
9,572✔
433
    }
9,572✔
434
    return completion_condition_was_satisfied;
9,572✔
435
}
9,572✔
436

437

438
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
439
util::Future<void> ClientImpl::notify_session_terminated()
440
{
56✔
441
    auto pf = util::make_promise_future<void>();
56✔
442
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
443
        // Includes operation_aborted
444
        if (!status.is_ok()) {
56✔
445
            promise.set_error(status);
×
446
            return;
×
447
        }
×
448

449
        promise.emplace_value();
56✔
450
    });
56✔
451

452
    return std::move(pf.future);
56✔
453
}
56✔
454

455
void ClientImpl::drain_connections_on_loop()
456
{
9,938✔
457
    post([this](Status status) {
9,938✔
458
        REALM_ASSERT(status.is_ok());
9,938✔
459
        drain_connections();
9,938✔
460
    });
9,938✔
461
}
9,938✔
462

463
void ClientImpl::shutdown_and_wait()
464
{
10,710✔
465
    shutdown();
10,710✔
466
    util::CheckedUniqueLock lock{m_drain_mutex};
10,710✔
467
    if (m_drained) {
10,710✔
468
        return;
772✔
469
    }
772✔
470

471
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,938✔
472
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
15,848✔
473
        return m_num_connections == 0 && m_outstanding_posts == 0;
15,848✔
474
    });
15,848✔
475

476
    m_drained = true;
9,938✔
477
}
9,938✔
478

479
void ClientImpl::shutdown() noexcept
480
{
20,730✔
481
    {
20,730✔
482
        util::CheckedLockGuard lock{m_mutex};
20,730✔
483
        if (m_stopped)
20,730✔
484
            return;
10,792✔
485
        m_stopped = true;
9,938✔
486
    }
9,938✔
487
    m_wait_or_client_stopped_cond.notify_all();
×
488

489
    drain_connections_on_loop();
9,938✔
490
}
9,938✔
491

492

493
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
494
{
10,164✔
495
    // Thread safety required.
496
    {
10,164✔
497
        util::CheckedLockGuard lock{m_mutex};
10,164✔
498
        // We can't actualize the session if we've already been stopped, so
499
        // just finalize it immediately.
500
        if (m_stopped) {
10,164✔
501
            wrapper->finalize_before_actualization();
×
502
            return;
×
503
        }
×
504

505
        REALM_ASSERT(m_actualize_and_finalize);
10,164✔
506
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
10,164✔
507
    }
10,164✔
508
    m_actualize_and_finalize->trigger();
×
509
}
10,164✔
510

511

512
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
513
{
10,172✔
514
    // Thread safety required.
515
    {
10,172✔
516
        util::CheckedLockGuard lock{m_mutex};
10,172✔
517
        REALM_ASSERT(m_actualize_and_finalize);
10,172✔
518
        // The wrapper may have already been finalized before being abandoned
519
        // if we were stopped when it was created.
520
        if (wrapper->mark_abandoned())
10,172✔
521
            return;
4✔
522

523
        // If the session wrapper has not yet been actualized (on the event loop
524
        // thread), it can be immediately finalized. This ensures that we will
525
        // generally not actualize a session wrapper that has already been
526
        // abandoned.
527
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
10,168✔
528
            wrapper->finalize_before_actualization();
64✔
529
            return;
64✔
530
        }
64✔
531
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,104✔
532
    }
10,104✔
533
    m_actualize_and_finalize->trigger();
×
534
}
10,104✔
535

536

537
// Must be called from the event loop thread.
538
void ClientImpl::actualize_and_finalize_session_wrappers()
539
{
14,234✔
540
    // We need to pop from the wrapper stacks while holding the lock to ensure
541
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
542
    // release the lock before finalizing or actualizing because those functions
543
    // invoke user callbacks which may try to access the client and reacquire
544
    // the lock.
545
    //
546
    // Finalization must always happen before actualization because we may be
547
    // finalizing and actualizing sessions for the same Realm file, and
548
    // actualizing first would result in overlapping sessions. Because we're
549
    // releasing the lock new sessions may come in as we're looping, so we need
550
    // a single loop that checks both fields.
551
    while (true) {
34,442✔
552
        bool finalize = true;
34,438✔
553
        bool stopped;
34,438✔
554
        util::bind_ptr<SessionWrapper> wrapper;
34,438✔
555
        {
34,438✔
556
            util::CheckedLockGuard lock{m_mutex};
34,438✔
557
            wrapper = m_abandoned_session_wrappers.pop();
34,438✔
558
            if (!wrapper) {
34,438✔
559
                wrapper = m_unactualized_session_wrappers.pop();
24,332✔
560
                finalize = false;
24,332✔
561
            }
24,332✔
562
            stopped = m_stopped;
34,438✔
563
        }
34,438✔
564
        if (!wrapper)
34,438✔
565
            break;
14,230✔
566
        if (finalize)
20,208✔
567
            wrapper->finalize(); // Throws
10,104✔
568
        else if (stopped)
10,104✔
569
            wrapper->finalize_before_actualization();
4✔
570
        else
10,100✔
571
            wrapper->actualize(); // Throws
10,100✔
572
    }
20,208✔
573
}
14,234✔
574

575

576
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
577
                                                   const std::string& authorization_header_name,
578
                                                   const std::map<std::string, std::string>& custom_http_headers,
579
                                                   bool verify_servers_ssl_certificate,
580
                                                   Optional<std::string> ssl_trust_certificate_path,
581
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
582
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
583
{
10,100✔
584
    auto&& [server_slot_it, inserted] =
10,100✔
585
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,100✔
586
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,100✔
587

588
    // TODO: enable multiplexing with proxies
589
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,100✔
590
        // Use preexisting connection
591
        REALM_ASSERT(server_slot.alt_connections.empty());
7,300✔
592
        return *server_slot.connection;
7,300✔
593
    }
7,300✔
594

595
    // Create a new connection
596
    REALM_ASSERT(!server_slot.connection);
2,800✔
597
    connection_ident_type ident = m_prev_connection_ident + 1;
2,800✔
598
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,800✔
599
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,800✔
600
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,800✔
601
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,800✔
602
    ClientImpl::Connection& conn = *conn_2;
2,800✔
603
    if (!m_one_connection_per_session) {
2,800✔
604
        server_slot.connection = std::move(conn_2);
2,786✔
605
    }
2,786✔
606
    else {
14✔
607
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
14✔
608
    }
14✔
609
    m_prev_connection_ident = ident;
2,800✔
610
    was_created = true;
2,800✔
611
    {
2,800✔
612
        util::CheckedLockGuard lk(m_drain_mutex);
2,800✔
613
        ++m_num_connections;
2,800✔
614
    }
2,800✔
615
    return conn;
2,800✔
616
}
10,100✔
617

618

619
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
620
{
2,800✔
621
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,800✔
622
    auto i = m_server_slots.find(endpoint);
2,800✔
623
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,800✔
624
    ServerSlot& server_slot = i->second;
2,800✔
625
    if (!m_one_connection_per_session) {
2,800✔
626
        REALM_ASSERT(server_slot.alt_connections.empty());
2,788✔
627
        REALM_ASSERT(&*server_slot.connection == &conn);
2,788✔
628
        server_slot.reconnect_info = conn.get_reconnect_info();
2,788✔
629
        server_slot.connection.reset();
2,788✔
630
    }
2,788✔
631
    else {
12✔
632
        REALM_ASSERT(!server_slot.connection);
12✔
633
        connection_ident_type ident = conn.get_ident();
12✔
634
        auto j = server_slot.alt_connections.find(ident);
12✔
635
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
636
        REALM_ASSERT(&*j->second == &conn);
12✔
637
        server_slot.alt_connections.erase(j);
12✔
638
    }
12✔
639

640
    bool notify;
2,800✔
641
    {
2,800✔
642
        util::CheckedLockGuard lk(m_drain_mutex);
2,800✔
643
        REALM_ASSERT(m_num_connections);
2,800✔
644
        notify = --m_num_connections <= 0;
2,800✔
645
    }
2,800✔
646
    if (notify) {
2,800✔
647
        m_drain_cv.notify_all();
2,186✔
648
    }
2,186✔
649
}
2,800✔
650

651

652
// ################ SessionImpl ################
653

654
void SessionImpl::force_close()
655
{
102✔
656
    // Allow force_close() if session is active or hasn't been activated yet.
657
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102!
658
        m_wrapper.force_close();
102✔
659
    }
102✔
660
}
102✔
661

662
void SessionImpl::on_connection_state_changed(ConnectionState state,
663
                                              const std::optional<SessionErrorInfo>& error_info)
664
{
12,320✔
665
    // Only used to report errors back to the SyncSession while the Session is active
666
    if (m_state == SessionImpl::Active) {
12,320✔
667
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
12,320✔
668
    }
12,320✔
669
}
12,320✔
670

671

672
const std::string& SessionImpl::get_virt_path() const noexcept
673
{
7,574✔
674
    // Can only be called if the session is active or being activated
675
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,574!
676
    return m_wrapper.m_virt_path;
7,574✔
677
}
7,574✔
678

679
const std::string& SessionImpl::get_realm_path() const noexcept
680
{
10,396✔
681
    // Can only be called if the session is active or being activated
682
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,396✔
683
    return m_wrapper.m_db->get_path();
10,396✔
684
}
10,396✔
685

686
DBRef SessionImpl::get_db() const noexcept
687
{
24,796✔
688
    // Can only be called if the session is active or being activated
689
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
24,796✔
690
    return m_wrapper.m_db;
24,796✔
691
}
24,796✔
692

693
ClientReplication& SessionImpl::get_repl() const noexcept
694
{
120,186✔
695
    // Can only be called if the session is active or being activated
696
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
120,186✔
697
    return m_wrapper.get_replication();
120,186✔
698
}
120,186✔
699

700
ClientHistory& SessionImpl::get_history() const noexcept
701
{
118,162✔
702
    return get_repl().get_history();
118,162✔
703
}
118,162✔
704

705
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
706
{
105,932✔
707
    // Can only be called if the session is active or being activated
708
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
105,932✔
709
    return m_wrapper.m_client_reset_config;
105,932✔
710
}
105,932✔
711

712
SessionReason SessionImpl::get_session_reason() noexcept
713
{
1,516✔
714
    // Can only be called if the session is active or being activated
715
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,516!
716
    return m_wrapper.m_session_reason;
1,516✔
717
}
1,516✔
718

719
uint64_t SessionImpl::get_schema_version() noexcept
720
{
1,516✔
721
    // Can only be called if the session is active or being activated
722
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,516!
723
    return m_wrapper.m_schema_version;
1,516✔
724
}
1,516✔
725

726
bool SessionImpl::upload_messages_allowed() noexcept
727
{
70,376✔
728
    // Can only be called if the session is active or being activated
729
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
70,376✔
730
    return m_wrapper.m_allow_upload_messages;
70,376✔
731
}
70,376✔
732

733
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
734
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
735
{
43,844✔
736
    // Ignore the call if the session is not active
737
    if (m_state != State::Active) {
43,844✔
738
        return;
×
739
    }
×
740

741
    try {
43,844✔
742
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
43,844!
743
        if (simulate_integration_error) {
43,844✔
744
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
745
        }
×
746
        version_type client_version;
43,844✔
747
        if (REALM_LIKELY(!get_client().is_dry_run())) {
43,844✔
748
            VersionInfo version_info;
43,844✔
749
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
43,844✔
750
            client_version = version_info.realm_version;
43,844✔
751
        }
43,844✔
752
        else {
×
753
            // Fake it for "dry run" mode
754
            client_version = m_last_version_available + 1;
×
755
        }
×
756
        on_changesets_integrated(client_version, progress); // Throws
43,844✔
757
    }
43,844✔
758
    catch (const IntegrationException& e) {
43,844✔
759
        on_integration_failure(e);
24✔
760
    }
24✔
761
}
43,844✔
762

763

764
void SessionImpl::on_download_completion()
765
{
16,266✔
766
    // Ignore the call if the session is not active
767
    if (m_state == State::Active) {
16,268✔
768
        m_wrapper.on_download_completion(); // Throws
16,268✔
769
    }
16,268✔
770
}
16,266✔
771

772

773
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
774
{
674✔
775
    // Ignore the call if the session is not active
776
    if (m_state == State::Active) {
674✔
777
        m_wrapper.on_suspended(error_info); // Throws
674✔
778
    }
674✔
779
}
674✔
780

781

782
void SessionImpl::on_resumed()
783
{
62✔
784
    // Ignore the call if the session is not active
785
    if (m_state == State::Active) {
62✔
786
        m_wrapper.on_resumed(); // Throws
62✔
787
    }
62✔
788
}
62✔
789

790
void SessionImpl::handle_pending_client_reset_acknowledgement()
791
{
10,396✔
792
    // Ignore the call if the session is not active
793
    if (m_state == State::Active) {
10,396✔
794
        m_wrapper.handle_pending_client_reset_acknowledgement();
10,394✔
795
    }
10,394✔
796
}
10,396✔
797

798
void SessionImpl::update_subscription_version_info()
799
{
296✔
800
    // Ignore the call if the session is not active
801
    if (m_state == State::Active) {
296✔
802
        m_wrapper.update_subscription_version_info();
296✔
803
    }
296✔
804
}
296✔
805

806
bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
807
{
46,174✔
808
    // Not a bootstrap message if this isn't a FLX download
809
    if (!message.last_in_batch || !message.query_version) {
46,174✔
810
        return false;
42,474✔
811
    }
42,474✔
812

813
    REALM_ASSERT(m_is_flx_sync_session);
3,700✔
814

815
    // Not a bootstrap message if it's for the already active query version
816
    if (*message.last_in_batch && *message.query_version == m_wrapper.m_flx_active_version) {
3,700✔
817
        return false;
1,370✔
818
    }
1,370✔
819

820
    auto batch_state = *message.last_in_batch ? DownloadBatchState::LastInBatch : DownloadBatchState::MoreToCome;
2,330✔
821
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,330✔
822
    std::optional<SyncProgress> maybe_progress;
2,330✔
823
    if (batch_state == DownloadBatchState::LastInBatch) {
2,330✔
824
        maybe_progress = message.progress;
2,012✔
825
    }
2,012✔
826

827
    bool new_batch = false;
2,330✔
828
    try {
2,330✔
829
        bootstrap_store->add_batch(*message.query_version, std::move(maybe_progress), message.downloadable,
2,330✔
830
                                   message.changesets, &new_batch);
2,330✔
831
    }
2,330✔
832
    catch (const LogicError& ex) {
2,330✔
833
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
834
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
835
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
836
                                    ProtocolError::bad_changeset_size);
×
837
            on_integration_failure(ex);
×
838
            return true;
×
839
        }
×
840
        throw;
×
841
    }
×
842

843
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
844
    // bootstrapping.
845
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
2,328✔
846
        on_flx_sync_progress(*message.query_version, DownloadBatchState::MoreToCome);
76✔
847
    }
76✔
848

849
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
2,328✔
850
                                       *message.query_version, batch_state, message.changesets.size());
2,328✔
851
    if (hook_action == SyncClientHookAction::EarlyReturn) {
2,328✔
852
        return true;
12✔
853
    }
12✔
854
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
2,316✔
855

856
    if (batch_state == DownloadBatchState::MoreToCome) {
2,316✔
857
        return true;
312✔
858
    }
312✔
859

860
    try {
2,004✔
861
        process_pending_flx_bootstrap();
2,004✔
862
    }
2,004✔
863
    catch (const IntegrationException& e) {
2,004✔
864
        on_integration_failure(e);
12✔
865
    }
12✔
866
    catch (...) {
2,004✔
867
        on_integration_failure(IntegrationException(exception_to_status()));
×
868
    }
×
869

870
    return true;
2,004✔
871
}
2,004✔
872

873

874
void SessionImpl::process_pending_flx_bootstrap()
875
{
12,104✔
876
    // Ignore the call if not a flx session or session is not active
877
    if (!m_is_flx_sync_session || m_state != State::Active) {
12,104✔
878
        return;
8,558✔
879
    }
8,558✔
880
    // Should never be called if session is not active
881
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,546✔
882
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,546✔
883
    if (!bootstrap_store->has_pending()) {
3,546✔
884
        return;
1,522✔
885
    }
1,522✔
886

887
    auto pending_batch_stats = bootstrap_store->pending_stats();
2,024✔
888
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
2,024✔
889
                "changeset size: %3)",
2,024✔
890
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
2,024✔
891
                pending_batch_stats.pending_changeset_bytes);
2,024✔
892
    auto& history = get_repl().get_history();
2,024✔
893
    VersionInfo new_version;
2,024✔
894
    SyncProgress progress;
2,024✔
895
    int64_t query_version = -1;
2,024✔
896
    size_t changesets_processed = 0;
2,024✔
897

898
    // Used to commit each batch after it was transformed.
899
    TransactionRef transact = get_db()->start_write();
2,024✔
900
    while (bootstrap_store->has_pending()) {
4,324✔
901
        auto start_time = std::chrono::steady_clock::now();
2,312✔
902
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,312✔
903
        if (!pending_batch.progress) {
2,312✔
904
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
905
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
906
            // bootstrap store requires a write transaction itself.
907
            transact->close();
8✔
908
            bootstrap_store->clear();
8✔
909
            return;
8✔
910
        }
8✔
911

912
        auto batch_state =
2,304✔
913
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,304✔
914
        query_version = pending_batch.query_version;
2,304✔
915
        bool simulate_integration_error =
2,304✔
916
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,304✔
917
        if (simulate_integration_error) {
2,304✔
918
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
919
        }
4✔
920

921
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,300✔
922
                        batch_state, pending_batch.changesets.size());
2,300✔
923

924
        history.integrate_server_changesets(
2,300✔
925
            *pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
2,300✔
926
            [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,300✔
927
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,288✔
928
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,288✔
929
            });
2,288✔
930
        progress = *pending_batch.progress;
2,300✔
931
        changesets_processed += pending_batch.changesets.size();
2,300✔
932
        auto duration = std::chrono::steady_clock::now() - start_time;
2,300✔
933

934
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,300✔
935
                                      batch_state, pending_batch.changesets.size());
2,300✔
936
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,300✔
937

938
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,300✔
939
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,300✔
940
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
2,300✔
941
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
2,300✔
942
                    pending_batch.remaining_changesets);
2,300✔
943
    }
2,300✔
944

945
    REALM_ASSERT_3(query_version, !=, -1);
2,012✔
946
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
2,012✔
947

948
    on_changesets_integrated(new_version.realm_version, progress);
2,012✔
949
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
2,012✔
950
                                  DownloadBatchState::LastInBatch, changesets_processed);
2,012✔
951
    // NoAction/EarlyReturn are both valid no-op actions to take here.
952
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
2,012✔
953
}
2,012✔
954

955
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
956
{
20✔
957
    // Ignore the call if the session is not active
958
    if (m_state == State::Active) {
20✔
959
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
960
    }
20✔
961
}
20✔
962

963
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
964
{
2,076✔
965
    // Ignore the call if the session is not active
966
    if (m_state == State::Active) {
2,076✔
967
        m_wrapper.on_flx_sync_progress(version, batch_state);
2,076✔
968
    }
2,076✔
969
}
2,076✔
970

971
SubscriptionStore* SessionImpl::get_flx_subscription_store()
972
{
19,336✔
973
    // Should never be called if session is not active
974
    REALM_ASSERT_EX(m_state == State::Active, m_state);
19,336✔
975
    return m_wrapper.get_flx_subscription_store();
19,336✔
976
}
19,336✔
977

978
MigrationStore* SessionImpl::get_migration_store()
979
{
67,236✔
980
    // Should never be called if session is not active
981
    REALM_ASSERT_EX(m_state == State::Active, m_state);
67,236✔
982
    return m_wrapper.get_migration_store();
67,236✔
983
}
67,236✔
984

985
void SessionImpl::on_flx_sync_version_complete(int64_t version)
986
{
300✔
987
    // Ignore the call if the session is not active
988
    if (m_state == State::Active) {
300✔
989
        m_wrapper.on_flx_sync_version_complete(version);
300✔
990
    }
300✔
991
}
300✔
992

993
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
994
{
8,364✔
995
    // Should never be called if session is not active
996
    REALM_ASSERT_EX(m_state == State::Active, m_state);
8,364✔
997

998
    // Make sure we don't call the debug hook recursively.
999
    if (m_wrapper.m_in_debug_hook) {
8,364✔
1000
        return SyncClientHookAction::NoAction;
24✔
1001
    }
24✔
1002
    m_wrapper.m_in_debug_hook = true;
8,340✔
1003
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
8,340✔
1004
        m_wrapper.m_in_debug_hook = false;
8,340✔
1005
    });
8,340✔
1006

1007
    auto action = m_wrapper.m_debug_hook(data);
8,340✔
1008
    switch (action) {
8,340✔
1009
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1010
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1011
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1012

1013
            auto err_processing_err = receive_error_message(err_info);
12✔
1014
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1015
            return SyncClientHookAction::EarlyReturn;
12✔
1016
        }
×
1017
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1018
            get_connection().voluntary_disconnect();
24✔
1019
            return SyncClientHookAction::EarlyReturn;
24✔
1020
        }
×
1021
        default:
8,296✔
1022
            return action;
8,296✔
1023
    }
8,340✔
1024
}
8,340✔
1025

1026
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1027
                                                  int64_t query_version, DownloadBatchState batch_state,
1028
                                                  size_t num_changesets)
1029
{
177,920✔
1030
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
177,920✔
1031
        return SyncClientHookAction::NoAction;
170,026✔
1032
    }
170,026✔
1033
    if (REALM_UNLIKELY(m_state != State::Active)) {
7,894✔
1034
        return SyncClientHookAction::NoAction;
×
1035
    }
×
1036

1037
    SyncClientHookData data;
7,894✔
1038
    data.event = event;
7,894✔
1039
    data.batch_state = batch_state;
7,894✔
1040
    data.progress = progress;
7,894✔
1041
    data.num_changesets = num_changesets;
7,894✔
1042
    data.query_version = query_version;
7,894✔
1043

1044
    return call_debug_hook(data);
7,894✔
1045
}
7,894✔
1046

1047
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1048
{
1,390✔
1049
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,390✔
1050
        return SyncClientHookAction::NoAction;
918✔
1051
    }
918✔
1052
    if (REALM_UNLIKELY(m_state != State::Active)) {
472✔
1053
        return SyncClientHookAction::NoAction;
×
1054
    }
×
1055

1056
    SyncClientHookData data;
472✔
1057
    data.event = event;
472✔
1058
    data.batch_state = DownloadBatchState::SteadyState;
472✔
1059
    data.progress = m_progress;
472✔
1060
    data.num_changesets = 0;
472✔
1061
    data.query_version = 0;
472✔
1062
    data.error_info = &error_info;
472✔
1063

1064
    return call_debug_hook(data);
472✔
1065
}
472✔
1066

1067
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
1068
{
78,970✔
1069
    return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
78,970✔
1070
}
78,970✔
1071

1072
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1073
{
46,184✔
1074
    // Should never be called if session is not active
1075
    REALM_ASSERT_EX(m_state == State::Active, m_state);
46,184✔
1076
    if (batch_state == DownloadBatchState::SteadyState) {
46,184✔
UNCOV
1077
        return true;
×
UNCOV
1078
    }
×
1079

1080
    if (!m_is_flx_sync_session) {
46,184✔
1081
        return true;
42,474✔
1082
    }
42,474✔
1083

1084
    // If this is a steady state DOWNLOAD, no need for special handling.
1085
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
3,710✔
1086
        return true;
1,370✔
1087
    }
1,370✔
1088

1089
    return false;
2,340✔
1090
}
3,710✔
1091

1092
void SessionImpl::init_progress_handler()
1093
{
10,396✔
1094
    REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
10,396✔
1095
    m_wrapper.init_progress_handler();
10,396✔
1096
}
10,396✔
1097

1098
void SessionImpl::enable_progress_notifications()
1099
{
44,524✔
1100
    m_wrapper.m_reliable_download_progress = true;
44,524✔
1101
}
44,524✔
1102

1103
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1104
{
60✔
1105
    if (m_state != State::Active) {
60✔
1106
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1107
    }
×
1108

1109
    try {
60✔
1110
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1111
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1112
            return Status{ErrorCodes::LogicError,
4✔
1113
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1114
        }
4✔
1115
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
1116
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1117
        }
×
1118
    }
56✔
1119
    catch (const nlohmann::json::parse_error& e) {
60✔
1120
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1121
    }
4✔
1122

1123
    auto pf = util::make_promise_future<std::string>();
52✔
1124
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1125
        // Includes operation_aborted
1126
        if (!status.is_ok()) {
52✔
1127
            promise.set_error(status);
×
1128
            return;
×
1129
        }
×
1130

1131
        auto id = ++m_last_pending_test_command_ident;
52✔
1132
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1133
        ensure_enlisted_to_send();
52✔
1134
    });
52✔
1135

1136
    return std::move(pf.future);
52✔
1137
}
60✔
1138

1139
// ################ SessionWrapper ################
1140

1141
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1142
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1143
// the ClientImpl::Connection that owns the ClientImpl::Session.
1144
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1145
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
1146
    : m_client{client}
4,914✔
1147
    , m_db(std::move(db))
4,914✔
1148
    , m_replication(m_db->get_replication())
4,914✔
1149
    , m_protocol_envelope{config.protocol_envelope}
4,914✔
1150
    , m_server_address{std::move(config.server_address)}
4,914✔
1151
    , m_server_port{config.server_port}
4,914✔
1152
    , m_server_verified{config.server_verified}
4,914✔
1153
    , m_user_id(std::move(config.user_id))
4,914✔
1154
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
4,914✔
1155
    , m_authorization_header_name{config.authorization_header_name}
4,914✔
1156
    , m_custom_http_headers{std::move(config.custom_http_headers)}
4,914✔
1157
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
4,914✔
1158
    , m_simulate_integration_error{config.simulate_integration_error}
4,914✔
1159
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
4,914✔
1160
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
4,914✔
1161
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
4,914✔
1162
    , m_http_request_path_prefix{std::move(config.service_identifier)}
4,914✔
1163
    , m_virt_path{std::move(config.realm_identifier)}
4,914✔
1164
    , m_proxy_config{std::move(config.proxy_config)}
4,914✔
1165
    , m_signed_access_token{std::move(config.signed_user_token)}
4,914✔
1166
    , m_client_reset_config{std::move(config.client_reset_config)}
4,914✔
1167
    , m_progress_handler(std::move(config.progress_handler))
4,914✔
1168
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
4,914✔
1169
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
4,914✔
1170
    , m_session_reason(m_client_reset_config || config.fresh_realm_download ? SessionReason::ClientReset
4,914✔
1171
                                                                            : SessionReason::Sync)
4,914✔
1172
    , m_allow_upload_messages(!config.fresh_realm_download)
4,914✔
1173
    , m_schema_version(config.schema_version)
4,914✔
1174
    , m_flx_subscription_store(std::move(flx_sub_store))
4,914✔
1175
    , m_migration_store(std::move(migration_store))
4,914✔
1176
{
10,170✔
1177
    REALM_ASSERT(m_db);
10,170✔
1178
    REALM_ASSERT(m_db->get_replication());
10,170✔
1179
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,170✔
1180

1181
    // SessionWrapper begins at +1 retain count because Client retains and
1182
    // releases it while performing async operations, and these need to not
1183
    // take it to 0 or it could be deleted before the caller can retain it.
1184
    bind_ptr();
10,170✔
1185
    m_client.register_unactualized_session_wrapper(this);
10,170✔
1186
}
10,170✔
1187

1188
SessionWrapper::~SessionWrapper() noexcept
1189
{
10,172✔
1190
    // We begin actualization in the constructor and do not delete the wrapper
1191
    // until both the Client is done with it and the Session has abandoned it,
1192
    // so at this point we must have actualized, finalized, and been abandoned.
1193
    REALM_ASSERT(m_actualized);
10,172✔
1194
    REALM_ASSERT(m_abandoned);
10,172✔
1195
    REALM_ASSERT(m_finalized);
10,172✔
1196
    REALM_ASSERT(m_closed);
10,172✔
1197
    REALM_ASSERT(!m_db);
10,172✔
1198
}
10,172✔
1199

1200

1201
inline ClientReplication& SessionWrapper::get_replication() noexcept
1202
{
120,186✔
1203
    REALM_ASSERT(m_db);
120,186✔
1204
    return static_cast<ClientReplication&>(*m_replication);
120,186✔
1205
}
120,186✔
1206

1207

1208
inline ClientImpl& SessionWrapper::get_client() noexcept
1209
{
×
1210
    return m_client;
×
1211
}
×
1212

1213
bool SessionWrapper::has_flx_subscription_store() const
1214
{
2,076✔
1215
    return static_cast<bool>(m_flx_subscription_store);
2,076✔
1216
}
2,076✔
1217

1218
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1219
{
20✔
1220
    REALM_ASSERT(!m_finalized);
20✔
1221
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1222
}
20✔
1223

1224
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1225
{
2,300✔
1226
    REALM_ASSERT(!m_finalized);
2,300✔
1227
    m_flx_last_seen_version = version;
2,300✔
1228
    m_flx_active_version = version;
2,300✔
1229
}
2,300✔
1230

1231
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1232
{
2,076✔
1233
    if (!has_flx_subscription_store()) {
2,076✔
1234
        return;
×
1235
    }
×
1236
    REALM_ASSERT(!m_finalized);
2,076✔
1237
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
2,076✔
1238
    REALM_ASSERT(new_version >= m_flx_active_version);
2,076✔
1239
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
2,076✔
1240

1241
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
2,076✔
1242

1243
    switch (batch_state) {
2,076✔
1244
        case DownloadBatchState::SteadyState:
✔
1245
            // Cannot be called with this value.
1246
            REALM_UNREACHABLE();
1247
        case DownloadBatchState::LastInBatch:
2,000✔
1248
            if (m_flx_active_version == new_version) {
2,000✔
1249
                return;
×
1250
            }
×
1251
            on_flx_sync_version_complete(new_version);
2,000✔
1252
            if (new_version == 0) {
2,000✔
1253
                new_state = SubscriptionSet::State::Complete;
956✔
1254
            }
956✔
1255
            else {
1,044✔
1256
                new_state = SubscriptionSet::State::AwaitingMark;
1,044✔
1257
                m_flx_pending_mark_version = new_version;
1,044✔
1258
            }
1,044✔
1259
            break;
2,000✔
1260
        case DownloadBatchState::MoreToCome:
76✔
1261
            if (m_flx_last_seen_version == new_version) {
76✔
1262
                return;
×
1263
            }
×
1264

1265
            m_flx_last_seen_version = new_version;
76✔
1266
            new_state = SubscriptionSet::State::Bootstrapping;
76✔
1267
            break;
76✔
1268
    }
2,076✔
1269

1270
    get_flx_subscription_store()->update_state(new_version, new_state);
2,076✔
1271
}
2,076✔
1272

1273
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1274
{
21,432✔
1275
    REALM_ASSERT(!m_finalized);
21,432✔
1276
    return m_flx_subscription_store.get();
21,432✔
1277
}
21,432✔
1278

1279
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1280
{
5,874✔
1281
    REALM_ASSERT(!m_finalized);
5,874✔
1282
    return m_flx_pending_bootstrap_store.get();
5,874✔
1283
}
5,874✔
1284

1285
MigrationStore* SessionWrapper::get_migration_store()
1286
{
67,236✔
1287
    REALM_ASSERT(!m_finalized);
67,236✔
1288
    return m_migration_store.get();
67,236✔
1289
}
67,236✔
1290

1291
inline bool SessionWrapper::mark_abandoned()
1292
{
10,172✔
1293
    REALM_ASSERT(!m_abandoned);
10,172✔
1294
    m_abandoned = true;
10,172✔
1295
    return m_finalized;
10,172✔
1296
}
10,172✔
1297

1298

1299
void SessionWrapper::on_commit(version_type new_version)
1300
{
113,816✔
1301
    // Thread safety required
1302
    m_client.post([self = util::bind_ptr{this}, new_version] {
113,816✔
1303
        REALM_ASSERT(self->m_actualized);
113,812✔
1304
        if (REALM_UNLIKELY(!self->m_sess))
113,812✔
1305
            return; // Already finalized
490✔
1306
        SessionImpl& sess = *self->m_sess;
113,322✔
1307
        sess.recognize_sync_version(new_version); // Throws
113,322✔
1308
        self->check_progress();                   // Throws
113,322✔
1309
    });
113,322✔
1310
}
113,816✔
1311

1312

1313
void SessionWrapper::cancel_reconnect_delay()
1314
{
28✔
1315
    // Thread safety required
1316

1317
    m_client.post([self = util::bind_ptr{this}] {
28✔
1318
        REALM_ASSERT(self->m_actualized);
28✔
1319
        if (REALM_UNLIKELY(self->m_closed)) {
28✔
1320
            return;
×
1321
        }
×
1322

1323
        if (REALM_UNLIKELY(!self->m_sess))
28✔
1324
            return; // Already finalized
×
1325
        SessionImpl& sess = *self->m_sess;
28✔
1326
        sess.cancel_resumption_delay(); // Throws
28✔
1327
        ClientImpl::Connection& conn = sess.get_connection();
28✔
1328
        conn.cancel_reconnect_delay(); // Throws
28✔
1329
    });                                // Throws
28✔
1330
}
28✔
1331

1332
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1333
                                    WaitOperCompletionHandler handler)
1334
{
28,202✔
1335
    REALM_ASSERT(upload_completion || download_completion);
28,202✔
1336

1337
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
28,202✔
1338
                   download_completion](Status status) mutable {
28,202✔
1339
        REALM_ASSERT(self->m_actualized);
28,202✔
1340
        if (!status.is_ok()) {
28,202✔
1341
            handler(status); // Throws
×
1342
            return;
×
1343
        }
×
1344
        if (REALM_UNLIKELY(!self->m_sess)) {
28,202✔
1345
            // Already finalized
1346
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
182✔
1347
            return;
182✔
1348
        }
182✔
1349
        if (upload_completion) {
28,020✔
1350
            self->m_upload_completion_requested_version = self->m_db->get_version_of_latest_snapshot();
15,396✔
1351
            if (download_completion) {
15,396✔
1352
                // Wait for upload and download completion
1353
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
314✔
1354
            }
314✔
1355
            else {
15,082✔
1356
                // Wait for upload completion only
1357
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
15,082✔
1358
            }
15,082✔
1359
        }
15,396✔
1360
        else {
12,624✔
1361
            // Wait for download completion only
1362
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
12,624✔
1363
        }
12,624✔
1364
        SessionImpl& sess = *self->m_sess;
28,020✔
1365
        if (upload_completion)
28,020✔
1366
            self->check_progress();
15,396✔
1367
        if (download_completion)
28,020✔
1368
            sess.request_download_completion_notification(); // Throws
12,938✔
1369
    });                                                      // Throws
28,020✔
1370
}
28,202✔
1371

1372

1373
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1374
{
12,912✔
1375
    // Thread safety required
1376
    REALM_ASSERT(!m_abandoned);
12,912✔
1377

1378
    auto pf = util::make_promise_future<bool>();
12,912✔
1379
    async_wait_for(true, false, [promise = std::move(pf.promise)](Status status) mutable {
12,912✔
1380
        promise.emplace_value(status.is_ok());
12,910✔
1381
    });
12,910✔
1382
    return pf.future.get();
12,912✔
1383
}
12,912✔
1384

1385

1386
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1387
{
10,004✔
1388
    // Thread safety required
1389
    REALM_ASSERT(!m_abandoned);
10,004✔
1390

1391
    auto pf = util::make_promise_future<bool>();
10,004✔
1392
    async_wait_for(false, true, [promise = std::move(pf.promise)](Status status) mutable {
10,004✔
1393
        promise.emplace_value(status.is_ok());
10,004✔
1394
    });
10,004✔
1395
    return pf.future.get();
10,004✔
1396
}
10,004✔
1397

1398

1399
void SessionWrapper::refresh(std::string_view signed_access_token)
1400
{
216✔
1401
    // Thread safety required
1402
    REALM_ASSERT(!m_abandoned);
216✔
1403

1404
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
216✔
1405
        REALM_ASSERT(self->m_actualized);
216✔
1406
        if (REALM_UNLIKELY(!self->m_sess))
216✔
1407
            return; // Already finalized
×
1408
        self->m_signed_access_token = std::move(token);
216✔
1409
        SessionImpl& sess = *self->m_sess;
216✔
1410
        ClientImpl::Connection& conn = sess.get_connection();
216✔
1411
        // FIXME: This only makes sense when each session uses a separate connection.
1412
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
216✔
1413
        sess.cancel_resumption_delay();                                                          // Throws
216✔
1414
        conn.cancel_reconnect_delay();                                                           // Throws
216✔
1415
    });
216✔
1416
}
216✔
1417

1418

1419
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1420
{
10,172✔
1421
    ClientImpl& client = wrapper->m_client;
10,172✔
1422
    client.register_abandoned_session_wrapper(std::move(wrapper));
10,172✔
1423
}
10,172✔
1424

1425

1426
// Must be called from event loop thread
1427
void SessionWrapper::actualize()
1428
{
10,104✔
1429
    // actualize() can only ever be called once
1430
    REALM_ASSERT(!m_actualized);
10,104✔
1431
    REALM_ASSERT(!m_sess);
10,104✔
1432
    // The client should have removed this wrapper from those pending
1433
    // actualization if it called force_close() or finalize_before_actualize()
1434
    REALM_ASSERT(!m_finalized);
10,104✔
1435
    REALM_ASSERT(!m_closed);
10,104✔
1436

1437
    m_actualized = true;
10,104✔
1438

1439
    ScopeExitFail close_on_error([&]() noexcept {
10,104✔
1440
        m_closed = true;
4✔
1441
    });
4✔
1442

1443
    m_db->claim_sync_agent();
10,104✔
1444
    m_db->add_commit_listener(this);
10,104✔
1445
    ScopeExitFail remove_commit_listener([&]() noexcept {
10,104✔
1446
        m_db->remove_commit_listener(this);
×
1447
    });
×
1448

1449
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,104✔
1450
                            m_user_id,           m_sync_mode,      m_server_verified};
10,104✔
1451
    bool was_created = false;
10,104✔
1452
    ClientImpl::Connection& conn = m_client.get_connection(
10,104✔
1453
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,104✔
1454
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,104✔
1455
        was_created); // Throws
10,104✔
1456
    ScopeExitFail remove_connection([&]() noexcept {
10,104✔
1457
        if (was_created)
×
1458
            m_client.remove_connection(conn);
×
1459
    });
×
1460

1461
    // FIXME: This only makes sense when each session uses a separate connection.
1462
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,104✔
1463
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,104✔
1464
    if (m_sync_mode == SyncServerMode::FLX) {
10,104✔
1465
        m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,542✔
1466
    }
1,542✔
1467

1468
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,104✔
1469
    m_sess = sess.get();
10,104✔
1470
    ScopeExitFail clear_sess([&]() noexcept {
10,104✔
1471
        m_sess = nullptr;
×
1472
    });
×
1473
    conn.activate_session(std::move(sess)); // Throws
10,104✔
1474

1475
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1476
    // session cannot change the state of the bootstrap store at the same time.
1477
    update_subscription_version_info();
10,104✔
1478

1479
    if (was_created)
10,104✔
1480
        conn.activate(); // Throws
2,796✔
1481

1482
    if (m_connection_state_change_listener) {
10,104✔
1483
        ConnectionState state = conn.get_state();
10,092✔
1484
        if (state != ConnectionState::disconnected) {
10,092✔
1485
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,096✔
1486
            if (state == ConnectionState::connected)
7,096✔
1487
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,972✔
1488
        }
7,096✔
1489
    }
10,092✔
1490

1491
    if (!m_client_reset_config)
10,104✔
1492
        check_progress(); // Throws
9,720✔
1493
}
10,104✔
1494

1495
void SessionWrapper::force_close()
1496
{
10,206✔
1497
    if (m_closed) {
10,206✔
1498
        return;
106✔
1499
    }
106✔
1500
    REALM_ASSERT(m_actualized);
10,100✔
1501
    REALM_ASSERT(m_sess);
10,100✔
1502
    m_closed = true;
10,100✔
1503

1504
    ClientImpl::Connection& conn = m_sess->get_connection();
10,100✔
1505
    conn.initiate_session_deactivation(m_sess); // Throws
10,100✔
1506

1507
    // We need to keep the DB open until finalization, but we no longer want to
1508
    // know when commits are made
1509
    m_db->remove_commit_listener(this);
10,100✔
1510

1511
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1512
    m_flx_pending_bootstrap_store.reset();
10,100✔
1513
    // Clear the subscription and migration store refs since they are owned by SyncSession
1514
    m_flx_subscription_store.reset();
10,100✔
1515
    m_migration_store.reset();
10,100✔
1516
    m_sess = nullptr;
10,100✔
1517
    // Everything is being torn down, no need to report connection state anymore
1518
    m_connection_state_change_listener = {};
10,100✔
1519

1520
    // All outstanding wait operations must be canceled
1521
    while (!m_upload_completion_handlers.empty()) {
10,464✔
1522
        auto handler = std::move(m_upload_completion_handlers.back());
364✔
1523
        m_upload_completion_handlers.pop_back();
364✔
1524
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before upload was complete"}); // Throws
364✔
1525
    }
364✔
1526
    while (!m_download_completion_handlers.empty()) {
10,482✔
1527
        auto handler = std::move(m_download_completion_handlers.back());
382✔
1528
        m_download_completion_handlers.pop_back();
382✔
1529
        handler(
382✔
1530
            {ErrorCodes::OperationAborted, "Sync session is being closed before download was complete"}); // Throws
382✔
1531
    }
382✔
1532
    while (!m_sync_completion_handlers.empty()) {
10,112✔
1533
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1534
        m_sync_completion_handlers.pop_back();
12✔
1535
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before sync was complete"}); // Throws
12✔
1536
    }
12✔
1537
}
10,100✔
1538

1539
// Must be called from event loop thread
1540
//
1541
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
1542
// have been acquired at some point in between the final read or write ever made
1543
// from a different thread and when this is called.
1544
void SessionWrapper::finalize()
1545
{
10,104✔
1546
    REALM_ASSERT(m_actualized);
10,104✔
1547
    REALM_ASSERT(m_abandoned);
10,104✔
1548
    REALM_ASSERT(!m_finalized);
10,104✔
1549

1550
    force_close();
10,104✔
1551

1552
    m_finalized = true;
10,104✔
1553

1554
    // The Realm file can be closed now, as no access to the Realm file is
1555
    // supposed to happen on behalf of a session after initiation of
1556
    // deactivation.
1557
    m_db->release_sync_agent();
10,104✔
1558
    m_db = nullptr;
10,104✔
1559
}
10,104✔
1560

1561

1562
// Must be called only when an unactualized session wrapper becomes abandoned.
1563
//
1564
// Called with a lock on `m_client.m_mutex`.
1565
inline void SessionWrapper::finalize_before_actualization() noexcept
1566
{
68✔
1567
    REALM_ASSERT(!m_finalized);
68✔
1568
    REALM_ASSERT(!m_sess);
68✔
1569
    m_actualized = true;
68✔
1570
    m_finalized = true;
68✔
1571
    m_closed = true;
68✔
1572
    m_db->remove_commit_listener(this);
68✔
1573
    m_db->release_sync_agent();
68✔
1574
    m_db = nullptr;
68✔
1575
}
68✔
1576

1577
void SessionWrapper::on_download_completion()
1578
{
16,268✔
1579
    // Ensure that progress handlers get called before completion handlers. The
1580
    // download completing performed a commit and will trigger progress
1581
    // notifications asynchronously, but they would arrive after the download
1582
    // completion without this.
1583
    check_progress();
16,268✔
1584

1585
    while (!m_download_completion_handlers.empty()) {
28,718✔
1586
        auto handler = std::move(m_download_completion_handlers.back());
12,450✔
1587
        m_download_completion_handlers.pop_back();
12,450✔
1588
        handler(Status::OK()); // Throws
12,450✔
1589
    }
12,450✔
1590
    while (!m_sync_completion_handlers.empty()) {
16,362✔
1591
        auto handler = std::move(m_sync_completion_handlers.back());
94✔
1592
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
94✔
1593
        m_sync_completion_handlers.pop_back();
94✔
1594
    }
94✔
1595

1596
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
16,268✔
1597
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
938✔
1598
                             m_flx_pending_mark_version);
938✔
1599
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
938✔
1600
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
938✔
1601
    }
938✔
1602
}
16,268✔
1603

1604

1605
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1606
{
674✔
1607
    REALM_ASSERT(!m_finalized);
674✔
1608
    m_suspended = true;
674✔
1609
    if (m_connection_state_change_listener) {
674✔
1610
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
674✔
1611
    }
674✔
1612
}
674✔
1613

1614

1615
void SessionWrapper::on_resumed()
1616
{
62✔
1617
    REALM_ASSERT(!m_finalized);
62✔
1618
    m_suspended = false;
62✔
1619
    if (m_connection_state_change_listener) {
62✔
1620
        ClientImpl::Connection& conn = m_sess->get_connection();
62✔
1621
        if (conn.get_state() != ConnectionState::disconnected) {
62✔
1622
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
58✔
1623
            if (conn.get_state() == ConnectionState::connected)
58✔
1624
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
50✔
1625
        }
58✔
1626
    }
62✔
1627
}
62✔
1628

1629

1630
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1631
                                                 const std::optional<SessionErrorInfo>& error_info)
1632
{
12,320✔
1633
    if (m_connection_state_change_listener && !m_suspended) {
12,320✔
1634
        m_connection_state_change_listener(state, error_info); // Throws
12,284✔
1635
    }
12,284✔
1636
}
12,320✔
1637

1638
void SessionWrapper::init_progress_handler()
1639
{
10,396✔
1640
    ClientHistory::get_upload_download_state(m_db.get(), m_final_downloaded, m_final_uploaded);
10,396✔
1641
}
10,396✔
1642

1643
void SessionWrapper::check_progress()
1644
{
154,706✔
1645
    REALM_ASSERT(!m_finalized);
154,706✔
1646
    REALM_ASSERT(m_sess);
154,706✔
1647

1648
    if (!m_progress_handler && m_upload_completion_handlers.empty() && m_sync_completion_handlers.empty())
154,706✔
1649
        return;
72,480✔
1650

1651
    version_type uploaded_version;
82,226✔
1652
    ReportedProgress p;
82,226✔
1653
    DownloadableProgress downloadable;
82,226✔
1654
    ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
82,226✔
1655
                                             uploaded_version);
82,226✔
1656
    p.query_version = m_flx_last_seen_version;
82,226✔
1657

1658
    report_progress(p, downloadable);
82,226✔
1659
    report_upload_completion(uploaded_version);
82,226✔
1660
}
82,226✔
1661

1662
void SessionWrapper::report_upload_completion(version_type uploaded_version)
1663
{
82,230✔
1664
    if (uploaded_version < m_upload_completion_requested_version)
82,230✔
1665
        return;
62,634✔
1666

1667
    std::move(m_sync_completion_handlers.begin(), m_sync_completion_handlers.end(),
19,596✔
1668
              std::back_inserter(m_download_completion_handlers));
19,596✔
1669
    m_sync_completion_handlers.clear();
19,596✔
1670

1671
    while (!m_upload_completion_handlers.empty()) {
34,408✔
1672
        auto handler = std::move(m_upload_completion_handlers.back());
14,812✔
1673
        m_upload_completion_handlers.pop_back();
14,812✔
1674
        handler(Status::OK()); // Throws
14,812✔
1675
    }
14,812✔
1676
}
19,596✔
1677

1678
void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress downloadable)
1679
{
82,232✔
1680
    if (!m_progress_handler)
82,232✔
1681
        return;
27,712✔
1682

1683
    // Ignore progress messages from before we first receive a DOWNLOAD message
1684
    if (!m_reliable_download_progress)
54,520✔
1685
        return;
29,086✔
1686

1687
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
25,434✔
1688
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
18,940✔
1689
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
18,940✔
1690

1691
        // The effect of this calculation is that if new bytes are added for download/upload,
1692
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1693
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1694
        // before progress has reached 1.0.
1695
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1696
        // Example for upload progress reported:
1697
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1698

1699
        double progress_estimate = 1.0;
18,940✔
1700
        if (final_transferred < transferable && transferred < transferable)
18,940✔
1701
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
9,806✔
1702
        return progress_estimate;
18,940✔
1703
    };
18,940✔
1704

1705
    bool upload_completed = p.uploaded == p.uploadable;
25,434✔
1706
    double upload_estimate = 1.0;
25,434✔
1707
    if (!upload_completed)
25,434✔
1708
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);
9,758✔
1709

1710
    bool download_completed = p.downloaded == 0;
25,434✔
1711
    p.download_estimate = 1.00;
25,434✔
1712
    if (m_flx_pending_bootstrap_store) {
25,434✔
1713
        p.download_estimate = downloadable.as_estimate();
12,970✔
1714
        if (m_flx_pending_bootstrap_store->has_pending()) {
12,970✔
1715
            p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
852✔
1716
        }
852✔
1717
        download_completed = p.download_estimate >= 1.0;
12,970✔
1718

1719
        // for flx with download estimate these bytes are not known
1720
        // provide some sensible value for non-streaming version of object-store callbacks
1721
        // until these field are completely removed from the api after pbs deprecation
1722
        p.downloadable = p.downloaded;
12,970✔
1723
        if (p.download_estimate > 0 && p.download_estimate < 1.0 && p.downloaded > m_final_downloaded)
12,970✔
1724
            p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / p.download_estimate);
860✔
1725
    }
12,970✔
1726
    else {
12,464✔
1727
        // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1728
        // is only the remaining to download. This is confusing, so make them use
1729
        // the same units.
1730
        p.downloadable = downloadable.as_bytes() + p.downloaded;
12,464✔
1731
        if (!download_completed)
12,464✔
1732
            p.download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
9,182✔
1733
    }
12,464✔
1734

1735
    if (download_completed)
25,434✔
1736
        m_final_downloaded = p.downloaded;
15,388✔
1737
    if (upload_completed)
25,434✔
1738
        m_final_uploaded = p.uploaded;
15,676✔
1739

1740
    if (p == m_reported_progress)
25,434✔
1741
        return;
17,954✔
1742

1743
    m_reported_progress = p;
7,480✔
1744

1745
    if (m_sess->logger.would_log(Logger::Level::debug)) {
7,480✔
1746
        auto to_str = [](double d) {
14,480✔
1747
            std::ostringstream ss;
14,480✔
1748
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1749
            ss << std::fixed << std::setprecision(4) << d;
14,480✔
1750
            return ss.str();
14,480✔
1751
        };
14,480✔
1752
        m_sess->logger.debug(
7,240✔
1753
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
7,240✔
1754
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
7,240✔
1755
            p.downloaded, p.downloadable, to_str(p.download_estimate), p.uploaded, p.uploadable,
7,240✔
1756
            to_str(upload_estimate), p.snapshot, p.query_version);
7,240✔
1757
    }
7,240✔
1758

1759
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, p.download_estimate,
7,480✔
1760
                       upload_estimate, p.query_version);
7,480✔
1761
}
7,480✔
1762

1763
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1764
{
60✔
1765
    if (!m_sess) {
60✔
1766
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1767
    }
×
1768

1769
    return m_sess->send_test_command(std::move(body));
60✔
1770
}
60✔
1771

1772
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1773
{
10,394✔
1774
    REALM_ASSERT(!m_finalized);
10,394✔
1775

1776
    auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
10,394✔
1777
    if (!has_pending_reset) {
10,394✔
1778
        return; // nothing to do
10,072✔
1779
    }
10,072✔
1780

1781
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
322✔
1782

1783
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
1784
    async_wait_for(
322✔
1785
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
322✔
1786
            if (status == ErrorCodes::OperationAborted) {
322✔
1787
                return;
158✔
1788
            }
158✔
1789
            auto& logger = self->m_sess->logger;
164✔
1790
            if (!status.is_ok()) {
164✔
1791
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
×
1792
                             status);
×
1793
                return;
×
1794
            }
×
1795

1796
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
164✔
1797

1798
            auto tr = self->m_db->start_write();
164✔
1799
            auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
164✔
1800
            if (!cur_pending_reset) {
164✔
1801
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
4✔
1802
                return;
4✔
1803
            }
4✔
1804
            if (*cur_pending_reset == pending_reset) {
160✔
1805
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
160✔
1806
            }
160✔
1807
            else {
×
1808
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
×
1809
            }
×
1810
            PendingResetStore::clear_pending_reset(tr);
160✔
1811
            tr->commit();
160✔
1812
        });
160✔
1813
}
322✔
1814

1815
void SessionWrapper::update_subscription_version_info()
1816
{
10,396✔
1817
    if (!m_flx_subscription_store)
10,396✔
1818
        return;
8,738✔
1819
    auto versions_info = m_flx_subscription_store->get_version_info();
1,658✔
1820
    m_flx_active_version = versions_info.active;
1,658✔
1821
    m_flx_pending_mark_version = versions_info.pending_mark;
1,658✔
1822
}
1,658✔
1823

1824
std::string SessionWrapper::get_appservices_connection_id()
1825
{
72✔
1826
    auto pf = util::make_promise_future<std::string>();
72✔
1827

1828
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
72✔
1829
        if (!status.is_ok()) {
72✔
1830
            promise.set_error(status);
×
1831
            return;
×
1832
        }
×
1833

1834
        if (!self->m_sess) {
72✔
1835
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1836
            return;
×
1837
        }
×
1838

1839
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1840
    });
72✔
1841

1842
    return pf.future.get();
72✔
1843
}
72✔
1844

1845
// ################ ClientImpl::Connection ################
1846

1847
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1848
                                   const std::string& authorization_header_name,
1849
                                   const std::map<std::string, std::string>& custom_http_headers,
1850
                                   bool verify_servers_ssl_certificate,
1851
                                   Optional<std::string> ssl_trust_certificate_path,
1852
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1853
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1854
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1,334✔
1855
                                                      client.logger_ptr)} // Throws
1,334✔
1856
    , logger{*logger_ptr}
1,334✔
1857
    , m_client{client}
1,334✔
1858
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,334✔
1859
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,334✔
1860
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,334✔
1861
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,334✔
1862
    , m_reconnect_info{reconnect_info}
1,334✔
1863
    , m_ident{ident}
1,334✔
1864
    , m_server_endpoint{std::move(endpoint)}
1,334✔
1865
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,334✔
1866
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,334✔
1867
{
2,800✔
1868
    m_on_idle = m_client.create_trigger([this](Status status) {
2,810✔
1869
        if (status == ErrorCodes::OperationAborted)
2,810✔
1870
            return;
×
1871
        else if (!status.is_ok())
2,810✔
1872
            throw Exception(status);
×
1873

1874
        REALM_ASSERT(m_activated);
2,810✔
1875
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,810✔
1876
            on_idle(); // Throws
2,800✔
1877
            // Connection object may be destroyed now.
1878
        }
2,800✔
1879
    });
2,810✔
1880
}
2,800✔
1881

1882
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1883
{
12✔
1884
    return m_ident;
12✔
1885
}
12✔
1886

1887

1888
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1889
{
2,800✔
1890
    return m_server_endpoint;
2,800✔
1891
}
2,800✔
1892

1893
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1894
                                                        const std::string& signed_access_token)
1895
{
10,314✔
1896
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,314✔
1897
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,314✔
1898
}
10,314✔
1899

1900

1901
void ClientImpl::Connection::resume_active_sessions()
1902
{
2,000✔
1903
    auto handler = [=](ClientImpl::Session& sess) {
3,996✔
1904
        sess.cancel_resumption_delay(); // Throws
3,996✔
1905
    };
3,996✔
1906
    for_each_active_session(std::move(handler)); // Throws
2,000✔
1907
}
2,000✔
1908

1909
void ClientImpl::Connection::on_idle()
1910
{
2,800✔
1911
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,800✔
1912
    ClientImpl& client = get_client();
2,800✔
1913
    client.remove_connection(*this);
2,800✔
1914
    // NOTE: This connection object is now destroyed!
1915
}
2,800✔
1916

1917

1918
std::string ClientImpl::Connection::get_http_request_path() const
1919
{
3,826✔
1920
    using namespace std::string_view_literals;
3,826✔
1921
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,826✔
1922

1923
    std::string path;
3,826✔
1924
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,826✔
1925
    path += m_http_request_path_prefix;
3,826✔
1926
    path += param;
3,826✔
1927
    path += m_signed_access_token;
3,826✔
1928

1929
    return path;
3,826✔
1930
}
3,826✔
1931

1932

1933
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1934
{
2,798✔
1935
    return util::format("Connection[%1] ", ident);
2,798✔
1936
}
2,798✔
1937

1938

1939
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1940
                                                            std::optional<SessionErrorInfo> error_info)
1941
{
11,272✔
1942
    if (m_force_closed) {
11,272✔
1943
        return;
2,438✔
1944
    }
2,438✔
1945
    auto handler = [=](ClientImpl::Session& sess) {
12,164✔
1946
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
12,164✔
1947
        sess_2.on_connection_state_changed(state, error_info); // Throws
12,164✔
1948
    };
12,164✔
1949
    for_each_active_session(std::move(handler)); // Throws
8,834✔
1950
}
8,834✔
1951

1952

1953
Client::Client(Config config)
1954
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,900✔
1955
{
9,938✔
1956
}
9,938✔
1957

1958

1959
Client::Client(Client&& client) noexcept
1960
    : m_impl{std::move(client.m_impl)}
1961
{
×
1962
}
×
1963

1964

1965
Client::~Client() noexcept {}
9,938✔
1966

1967

1968
void Client::shutdown() noexcept
1969
{
10,020✔
1970
    m_impl->shutdown();
10,020✔
1971
}
10,020✔
1972

1973
void Client::shutdown_and_wait()
1974
{
772✔
1975
    m_impl->shutdown_and_wait();
772✔
1976
}
772✔
1977

1978
void Client::cancel_reconnect_delay()
1979
{
2,004✔
1980
    m_impl->cancel_reconnect_delay();
2,004✔
1981
}
2,004✔
1982

1983
void Client::voluntary_disconnect_all_connections()
1984
{
12✔
1985
    m_impl->voluntary_disconnect_all_connections();
12✔
1986
}
12✔
1987

1988
bool Client::wait_for_session_terminations_or_client_stopped()
1989
{
9,572✔
1990
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,572✔
1991
}
9,572✔
1992

1993
util::Future<void> Client::notify_session_terminated()
1994
{
56✔
1995
    return m_impl->notify_session_terminated();
56✔
1996
}
56✔
1997

1998
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
1999
                                  port_type& port, std::string& path) const
2000
{
4,072✔
2001
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
4,072✔
2002
}
4,072✔
2003

2004

2005
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2006
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2007
{
10,170✔
2008
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,170✔
2009
                                std::move(config)}; // Throws
10,170✔
2010
}
10,170✔
2011

2012

2013
void Session::nonsync_transact_notify(version_type new_version)
2014
{
18,580✔
2015
    m_impl->on_commit(new_version); // Throws
18,580✔
2016
}
18,580✔
2017

2018

2019
void Session::cancel_reconnect_delay()
2020
{
28✔
2021
    m_impl->cancel_reconnect_delay(); // Throws
28✔
2022
}
28✔
2023

2024

2025
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2026
{
4,964✔
2027
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,964✔
2028
}
4,964✔
2029

2030

2031
bool Session::wait_for_upload_complete_or_client_stopped()
2032
{
12,912✔
2033
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,912✔
2034
}
12,912✔
2035

2036

2037
bool Session::wait_for_download_complete_or_client_stopped()
2038
{
10,004✔
2039
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,004✔
2040
}
10,004✔
2041

2042

2043
void Session::refresh(std::string_view signed_access_token)
2044
{
216✔
2045
    m_impl->refresh(signed_access_token); // Throws
216✔
2046
}
216✔
2047

2048

2049
void Session::abandon() noexcept
2050
{
10,172✔
2051
    REALM_ASSERT(m_impl);
10,172✔
2052
    // Reabsorb the ownership assigned to the applications naked pointer by
2053
    // Session constructor
2054
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,172✔
2055
    SessionWrapper::abandon(std::move(wrapper));
10,172✔
2056
}
10,172✔
2057

2058
util::Future<std::string> Session::send_test_command(std::string body)
2059
{
60✔
2060
    return m_impl->send_test_command(std::move(body));
60✔
2061
}
60✔
2062

2063
std::string Session::get_appservices_connection_id()
2064
{
72✔
2065
    return m_impl->get_appservices_connection_id();
72✔
2066
}
72✔
2067

2068
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2069
{
×
2070
    switch (proxyType) {
×
2071
        case ProxyConfig::Type::HTTP:
×
2072
            return os << "HTTP";
×
2073
        case ProxyConfig::Type::HTTPS:
×
2074
            return os << "HTTPS";
×
2075
    }
×
2076
    REALM_TERMINATE("Invalid Proxy Type object.");
2077
}
×
2078

2079
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc