• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_459

12 Jul 2024 09:52PM UTC coverage: 91.005% (+0.03%) from 90.98%
thomas.goyne_459

Pull #7870

Evergreen

tgoyne
Report steady-state download progress
Pull Request #7870: RCORE-2192 RCORE-2193 Fix FLX download progress reporting

102390 of 180586 branches covered (56.7%)

247 of 257 new or added lines in 10 files covered. (96.11%)

76 existing lines in 16 files now uncovered.

215445 of 236741 relevant lines covered (91.0%)

5767316.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.09
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    void update_subscription_version_info();
108

109
    // Can be called from any thread.
110
    std::string get_appservices_connection_id();
111

112
    // Can be called from any thread, but inherently cannot be called
113
    // concurrently with calls to any of the other non-confined functions.
114
    bool mark_abandoned();
115

116
private:
117
    ClientImpl& m_client;
118
    DBRef m_db;
119
    Replication* m_replication;
120

121
    const ProtocolEnvelope m_protocol_envelope;
122
    const std::string m_server_address;
123
    const port_type m_server_port;
124
    const bool m_server_verified;
125
    const std::string m_user_id;
126
    const SyncServerMode m_sync_mode;
127
    const std::string m_authorization_header_name;
128
    const std::map<std::string, std::string> m_custom_http_headers;
129
    const bool m_verify_servers_ssl_certificate;
130
    const bool m_simulate_integration_error;
131
    const std::optional<std::string> m_ssl_trust_certificate_path;
132
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
133
    const size_t m_flx_bootstrap_batch_size_bytes;
134
    const std::string m_http_request_path_prefix;
135
    const std::string m_virt_path;
136
    const std::optional<ProxyConfig> m_proxy_config;
137

138
    // This one is different from null when, and only when the session wrapper
139
    // is in ClientImpl::m_abandoned_session_wrappers.
140
    SessionWrapper* m_next = nullptr;
141

142
    // These may only be accessed by the event loop thread.
143
    std::string m_signed_access_token;
144
    std::optional<ClientReset> m_client_reset_config;
145

146
    struct ReportedProgress {
147
        uint64_t snapshot;
148
        uint64_t uploaded;
149
        uint64_t uploadable;
150
        uint64_t downloaded;
151
        uint64_t downloadable;
152
        int64_t query_version;
153
        double download_estimate;
154

155
        // Does not check snapshot
156
        bool operator==(const ReportedProgress& p) const noexcept
157
        {
22,826✔
158
            return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
22,826✔
159
                   downloadable == p.downloadable && query_version == p.query_version &&
22,826✔
160
                   download_estimate == p.download_estimate;
22,826✔
161
        }
22,826✔
162
    };
163
    std::optional<ReportedProgress> m_reported_progress;
164
    uint64_t m_final_uploaded = 0;
165
    uint64_t m_final_downloaded = 0;
166

167
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
168
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
169

170
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
171
    bool m_in_debug_hook = false;
172

173
    const SessionReason m_session_reason;
174

175
    // If false, QUERY and MARK messages are allowed but UPLOAD messages will not
176
    // be sent to the server.
177
    const bool m_allow_upload_messages;
178

179
    const uint64_t m_schema_version;
180

181
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
182
    int64_t m_flx_active_version = 0;
183
    int64_t m_flx_last_seen_version = 0;
184
    int64_t m_flx_pending_mark_version = 0;
185
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
186

187
    std::shared_ptr<MigrationStore> m_migration_store;
188

189
    // Set to true when this session wrapper is actualized (i.e. the wrapped
190
    // session is created), or when the wrapper is finalized before actualization.
191
    // It is then never modified again.
192
    //
193
    // Actualization is scheduled during the construction of SessionWrapper, and
194
    // so a session specific post handler will always find that `m_actualized`
195
    // is true as the handler will always be run after the actualization job.
196
    // This holds even if the wrapper is finalized or closed before actualization.
197
    bool m_actualized = false;
198

199
    // Set to true when session deactivation is begun, either via force_close()
200
    // or finalize().
201
    bool m_closed = false;
202

203
    // Set to true in on_suspended() and then false in on_resumed(). Used to
204
    // suppress spurious connection state and error reporting while the session
205
    // is already in an error state.
206
    bool m_suspended = false;
207

208
    // Set when the session has been abandoned. After this point none of the
209
    // public API functions should be called again.
210
    bool m_abandoned = false;
211
    // Has the SessionWrapper been finalized?
212
    bool m_finalized = false;
213

214
    // Set to true when the first DOWNLOAD message is received to indicate that
215
    // the byte-level download progress parameters can be considered reasonable
216
    // reliable. Before that, a lot of time may have passed, so our record of
217
    // the download progress is likely completely out of date.
218
    bool m_reliable_download_progress = false;
219

220
    // Set to point to an activated session object during actualization of the
221
    // session wrapper. Set to null during finalization of the session
222
    // wrapper. Both modifications are guaranteed to be performed by the event
223
    // loop thread.
224
    //
225
    // If a session specific post handler, that is submitted after the
226
    // initiation of the session wrapper, sees that `m_sess` is null, it can
227
    // conclude that the session wrapper has either been force closed or has
228
    // been both abandoned and finalized.
229
    //
230
    // Must only be accessed from the event loop thread.
231
    SessionImpl* m_sess = nullptr;
232

233
    // These must only be accessed from the event loop thread.
234
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
235
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
236
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
237

238
    version_type m_upload_completion_requested_version = -1;
239

240
    void on_download_completion();
241
    void on_suspended(const SessionErrorInfo& error_info);
242
    void on_resumed();
243
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
244
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
245
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
246
    void on_flx_sync_version_complete(int64_t version);
247

248
    void init_progress_handler();
249
    void check_progress();
250
    void report_progress(ReportedProgress& p, DownloadableProgress downloadable);
251
    void report_upload_completion(version_type);
252

253
    friend class SessionWrapperStack;
254
    friend class ClientImpl::Session;
255
};
256

257

258
// ################ SessionWrapperStack ################
259

260
inline bool SessionWrapperStack::empty() const noexcept
261
{
19,876✔
262
    return !m_back;
19,876✔
263
}
19,876✔
264

265

266
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
267
{
20,270✔
268
    REALM_ASSERT(!w->m_next);
20,270✔
269
    w->m_next = m_back;
20,270✔
270
    m_back = w.release();
20,270✔
271
}
20,270✔
272

273

274
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
275
{
59,052✔
276
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
59,052✔
277
    if (m_back) {
59,052✔
278
        m_back = m_back->m_next;
20,196✔
279
        w->m_next = nullptr;
20,196✔
280
    }
20,196✔
281
    return w;
59,052✔
282
}
59,052✔
283

284

285
inline void SessionWrapperStack::clear() noexcept
286
{
19,876✔
287
    while (m_back) {
19,876✔
288
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
289
        m_back = w->m_next;
×
290
    }
×
291
}
19,876✔
292

293

294
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
295
{
10,170✔
296
    SessionWrapper** p = &m_back;
10,170✔
297
    while (*p && *p != w) {
10,392✔
298
        p = &(*p)->m_next;
222✔
299
    }
222✔
300
    if (!*p) {
10,170✔
301
        return false;
10,096✔
302
    }
10,096✔
303
    *p = w->m_next;
74✔
304
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
74✔
305
    return true;
74✔
306
}
10,170✔
307

308

309
SessionWrapperStack::~SessionWrapperStack()
310
{
19,876✔
311
    clear();
19,876✔
312
}
19,876✔
313

314

315
// ################ ClientImpl ################
316

317
ClientImpl::~ClientImpl()
318
{
9,938✔
319
    // Since no other thread is allowed to be accessing this client or any of
320
    // its subobjects at this time, no mutex locking is necessary.
321

322
    shutdown_and_wait();
9,938✔
323
    // Session wrappers are removed from m_unactualized_session_wrappers as they
324
    // are abandoned.
325
    REALM_ASSERT(m_stopped);
9,938✔
326
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,938✔
327
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
9,938✔
328
}
9,938✔
329

330

331
void ClientImpl::cancel_reconnect_delay()
332
{
1,908✔
333
    // Thread safety required
334
    post([this] {
1,908✔
335
        for (auto& p : m_server_slots) {
1,908✔
336
            ServerSlot& slot = p.second;
1,908✔
337
            if (m_one_connection_per_session) {
1,908✔
338
                REALM_ASSERT(!slot.connection);
×
339
                for (const auto& p : slot.alt_connections) {
×
340
                    ClientImpl::Connection& conn = *p.second;
×
341
                    conn.resume_active_sessions(); // Throws
×
342
                    conn.cancel_reconnect_delay(); // Throws
×
343
                }
×
344
            }
×
345
            else {
1,908✔
346
                REALM_ASSERT(slot.alt_connections.empty());
1,908✔
347
                if (slot.connection) {
1,908✔
348
                    ClientImpl::Connection& conn = *slot.connection;
1,904✔
349
                    conn.resume_active_sessions(); // Throws
1,904✔
350
                    conn.cancel_reconnect_delay(); // Throws
1,904✔
351
                }
1,904✔
352
                else {
4✔
353
                    slot.reconnect_info.reset();
4✔
354
                }
4✔
355
            }
1,908✔
356
        }
1,908✔
357
    }); // Throws
1,908✔
358
}
1,908✔
359

360

361
void ClientImpl::voluntary_disconnect_all_connections()
362
{
12✔
363
    auto done_pf = util::make_promise_future<void>();
12✔
364
    post([this, promise = std::move(done_pf.promise)]() mutable {
12✔
365
        try {
12✔
366
            for (auto& p : m_server_slots) {
12✔
367
                ServerSlot& slot = p.second;
12✔
368
                if (m_one_connection_per_session) {
12✔
369
                    REALM_ASSERT(!slot.connection);
×
370
                    for (const auto& [_, conn] : slot.alt_connections) {
×
371
                        conn->voluntary_disconnect();
×
372
                    }
×
373
                }
×
374
                else {
12✔
375
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
376
                    if (slot.connection) {
12✔
377
                        slot.connection->voluntary_disconnect();
12✔
378
                    }
12✔
379
                }
12✔
380
            }
12✔
381
        }
12✔
382
        catch (...) {
12✔
383
            promise.set_error(exception_to_status());
×
384
            return;
×
385
        }
×
386
        promise.emplace_value();
12✔
387
    });
12✔
388
    done_pf.future.get();
12✔
389
}
12✔
390

391

392
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
393
{
9,572✔
394
    // Thread safety required
395

396
    {
9,572✔
397
        util::CheckedLockGuard lock{m_mutex};
9,572✔
398
        m_sessions_terminated = false;
9,572✔
399
    }
9,572✔
400

401
    // The technique employed here relies on the fact that
402
    // actualize_and_finalize_session_wrappers() must get to execute at least
403
    // once before the post handler submitted below gets to execute, but still
404
    // at a time where all session wrappers, that are abandoned prior to the
405
    // execution of wait_for_session_terminations_or_client_stopped(), have been
406
    // added to `m_abandoned_session_wrappers`.
407
    //
408
    // To see that this is the case, consider a session wrapper that was
409
    // abandoned before wait_for_session_terminations_or_client_stopped() was
410
    // invoked. Then the session wrapper will have been added to
411
    // `m_abandoned_session_wrappers`, and an invocation of
412
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
413
    // guarantees mentioned in the documentation of Trigger then ensure
414
    // that at least one execution of actualize_and_finalize_session_wrappers()
415
    // will happen after the session wrapper has been added to
416
    // `m_abandoned_session_wrappers`, but before the post handler submitted
417
    // below gets to execute.
418
    post([this] {
9,572✔
419
        {
9,572✔
420
            util::CheckedLockGuard lock{m_mutex};
9,572✔
421
            m_sessions_terminated = true;
9,572✔
422
        }
9,572✔
423
        m_wait_or_client_stopped_cond.notify_all();
9,572✔
424
    }); // Throws
9,572✔
425

426
    bool completion_condition_was_satisfied;
9,572✔
427
    {
9,572✔
428
        util::CheckedUniqueLock lock{m_mutex};
9,572✔
429
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
19,138✔
430
            return m_sessions_terminated || m_stopped;
19,138✔
431
        });
19,138✔
432
        completion_condition_was_satisfied = !m_stopped;
9,572✔
433
    }
9,572✔
434
    return completion_condition_was_satisfied;
9,572✔
435
}
9,572✔
436

437

438
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
439
util::Future<void> ClientImpl::notify_session_terminated()
440
{
56✔
441
    auto pf = util::make_promise_future<void>();
56✔
442
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
443
        // Includes operation_aborted
444
        if (!status.is_ok()) {
56✔
445
            promise.set_error(status);
×
446
            return;
×
447
        }
×
448

449
        promise.emplace_value();
56✔
450
    });
56✔
451

452
    return std::move(pf.future);
56✔
453
}
56✔
454

455
void ClientImpl::drain_connections_on_loop()
456
{
9,938✔
457
    post([this](Status status) {
9,938✔
458
        REALM_ASSERT(status.is_ok());
9,938✔
459
        drain_connections();
9,938✔
460
    });
9,938✔
461
}
9,938✔
462

463
void ClientImpl::shutdown_and_wait()
464
{
10,710✔
465
    shutdown();
10,710✔
466
    util::CheckedUniqueLock lock{m_drain_mutex};
10,710✔
467
    if (m_drained) {
10,710✔
468
        return;
772✔
469
    }
772✔
470

471
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,938✔
472
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
15,842✔
473
        return m_num_connections == 0 && m_outstanding_posts == 0;
15,842✔
474
    });
15,842✔
475

476
    m_drained = true;
9,938✔
477
}
9,938✔
478

479
void ClientImpl::shutdown() noexcept
480
{
20,730✔
481
    {
20,730✔
482
        util::CheckedLockGuard lock{m_mutex};
20,730✔
483
        if (m_stopped)
20,730✔
484
            return;
10,792✔
485
        m_stopped = true;
9,938✔
486
    }
9,938✔
487
    m_wait_or_client_stopped_cond.notify_all();
×
488

489
    drain_connections_on_loop();
9,938✔
490
}
9,938✔
491

492

493
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
494
{
10,172✔
495
    // Thread safety required.
496
    {
10,172✔
497
        util::CheckedLockGuard lock{m_mutex};
10,172✔
498
        // We can't actualize the session if we've already been stopped, so
499
        // just finalize it immediately.
500
        if (m_stopped) {
10,172✔
501
            wrapper->finalize_before_actualization();
×
502
            return;
×
503
        }
×
504

505
        REALM_ASSERT(m_actualize_and_finalize);
10,172✔
506
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
10,172✔
507
    }
10,172✔
508
    m_actualize_and_finalize->trigger();
×
509
}
10,172✔
510

511

512
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
513
{
10,172✔
514
    // Thread safety required.
515
    {
10,172✔
516
        util::CheckedLockGuard lock{m_mutex};
10,172✔
517
        REALM_ASSERT(m_actualize_and_finalize);
10,172✔
518
        // The wrapper may have already been finalized before being abandoned
519
        // if we were stopped when it was created.
520
        if (wrapper->mark_abandoned())
10,172✔
521
            return;
4✔
522

523
        // If the session wrapper has not yet been actualized (on the event loop
524
        // thread), it can be immediately finalized. This ensures that we will
525
        // generally not actualize a session wrapper that has already been
526
        // abandoned.
527
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
10,168✔
528
            wrapper->finalize_before_actualization();
74✔
529
            return;
74✔
530
        }
74✔
531
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,094✔
532
    }
10,094✔
533
    m_actualize_and_finalize->trigger();
×
534
}
10,094✔
535

536

537
// Must be called from the event loop thread.
538
void ClientImpl::actualize_and_finalize_session_wrappers()
539
{
14,384✔
540
    // We need to pop from the wrapper stacks while holding the lock to ensure
541
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
542
    // release the lock before finalizing or actualizing because those functions
543
    // invoke user callbacks which may try to access the client and reacquire
544
    // the lock.
545
    //
546
    // Finalization must always happen before actualization because we may be
547
    // finalizing and actualizing sessions for the same Realm file, and
548
    // actualizing first would result in overlapping sessions. Because we're
549
    // releasing the lock new sessions may come in as we're looping, so we need
550
    // a single loop that checks both fields.
551
    while (true) {
34,578✔
552
        bool finalize = true;
34,572✔
553
        bool stopped;
34,572✔
554
        util::bind_ptr<SessionWrapper> wrapper;
34,572✔
555
        {
34,572✔
556
            util::CheckedLockGuard lock{m_mutex};
34,572✔
557
            wrapper = m_abandoned_session_wrappers.pop();
34,572✔
558
            if (!wrapper) {
34,572✔
559
                wrapper = m_unactualized_session_wrappers.pop();
24,476✔
560
                finalize = false;
24,476✔
561
            }
24,476✔
562
            stopped = m_stopped;
34,572✔
563
        }
34,572✔
564
        if (!wrapper)
34,572✔
565
            break;
14,378✔
566
        if (finalize)
20,194✔
567
            wrapper->finalize(); // Throws
10,096✔
568
        else if (stopped)
10,098✔
569
            wrapper->finalize_before_actualization();
4✔
570
        else
10,094✔
571
            wrapper->actualize(); // Throws
10,094✔
572
    }
20,194✔
573
}
14,384✔
574

575

576
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
577
                                                   const std::string& authorization_header_name,
578
                                                   const std::map<std::string, std::string>& custom_http_headers,
579
                                                   bool verify_servers_ssl_certificate,
580
                                                   Optional<std::string> ssl_trust_certificate_path,
581
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
582
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
583
{
10,092✔
584
    auto&& [server_slot_it, inserted] =
10,092✔
585
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,092✔
586
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,092✔
587

588
    // TODO: enable multiplexing with proxies
589
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,092✔
590
        // Use preexisting connection
591
        REALM_ASSERT(server_slot.alt_connections.empty());
7,292✔
592
        return *server_slot.connection;
7,292✔
593
    }
7,292✔
594

595
    // Create a new connection
596
    REALM_ASSERT(!server_slot.connection);
2,800✔
597
    connection_ident_type ident = m_prev_connection_ident + 1;
2,800✔
598
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,800✔
599
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,800✔
600
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,800✔
601
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,800✔
602
    ClientImpl::Connection& conn = *conn_2;
2,800✔
603
    if (!m_one_connection_per_session) {
2,800✔
604
        server_slot.connection = std::move(conn_2);
2,788✔
605
    }
2,788✔
606
    else {
12✔
607
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
12✔
608
    }
12✔
609
    m_prev_connection_ident = ident;
2,800✔
610
    was_created = true;
2,800✔
611
    {
2,800✔
612
        util::CheckedLockGuard lk(m_drain_mutex);
2,800✔
613
        ++m_num_connections;
2,800✔
614
    }
2,800✔
615
    return conn;
2,800✔
616
}
10,092✔
617

618

619
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
620
{
2,800✔
621
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,800✔
622
    auto i = m_server_slots.find(endpoint);
2,800✔
623
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,800✔
624
    ServerSlot& server_slot = i->second;
2,800✔
625
    if (!m_one_connection_per_session) {
2,800✔
626
        REALM_ASSERT(server_slot.alt_connections.empty());
2,784✔
627
        REALM_ASSERT(&*server_slot.connection == &conn);
2,784✔
628
        server_slot.reconnect_info = conn.get_reconnect_info();
2,784✔
629
        server_slot.connection.reset();
2,784✔
630
    }
2,784✔
631
    else {
16✔
632
        REALM_ASSERT(!server_slot.connection);
16✔
633
        connection_ident_type ident = conn.get_ident();
16✔
634
        auto j = server_slot.alt_connections.find(ident);
16✔
635
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
16✔
636
        REALM_ASSERT(&*j->second == &conn);
16✔
637
        server_slot.alt_connections.erase(j);
16✔
638
    }
16✔
639

640
    bool notify;
2,800✔
641
    {
2,800✔
642
        util::CheckedLockGuard lk(m_drain_mutex);
2,800✔
643
        REALM_ASSERT(m_num_connections);
2,800✔
644
        notify = --m_num_connections <= 0;
2,800✔
645
    }
2,800✔
646
    if (notify) {
2,800✔
647
        m_drain_cv.notify_all();
2,184✔
648
    }
2,184✔
649
}
2,800✔
650

651

652
// ################ SessionImpl ################
653

654
void SessionImpl::force_close()
655
{
102✔
656
    // Allow force_close() if session is active or hasn't been activated yet.
657
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102!
658
        m_wrapper.force_close();
102✔
659
    }
102✔
660
}
102✔
661

662
void SessionImpl::on_connection_state_changed(ConnectionState state,
663
                                              const std::optional<SessionErrorInfo>& error_info)
664
{
11,978✔
665
    // Only used to report errors back to the SyncSession while the Session is active
666
    if (m_state == SessionImpl::Active) {
11,978✔
667
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
11,978✔
668
    }
11,978✔
669
}
11,978✔
670

671

672
const std::string& SessionImpl::get_virt_path() const noexcept
673
{
7,170✔
674
    // Can only be called if the session is active or being activated
675
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,170!
676
    return m_wrapper.m_virt_path;
7,170✔
677
}
7,170✔
678

679
const std::string& SessionImpl::get_realm_path() const noexcept
680
{
10,388✔
681
    // Can only be called if the session is active or being activated
682
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,388✔
683
    return m_wrapper.m_db->get_path();
10,388✔
684
}
10,388✔
685

686
DBRef SessionImpl::get_db() const noexcept
687
{
25,104✔
688
    // Can only be called if the session is active or being activated
689
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
25,104✔
690
    return m_wrapper.m_db;
25,104✔
691
}
25,104✔
692

693
ClientReplication& SessionImpl::get_repl() const noexcept
694
{
120,260✔
695
    // Can only be called if the session is active or being activated
696
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
120,260✔
697
    return m_wrapper.get_replication();
120,260✔
698
}
120,260✔
699

700
ClientHistory& SessionImpl::get_history() const noexcept
701
{
118,236✔
702
    return get_repl().get_history();
118,236✔
703
}
118,236✔
704

705
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
706
{
105,386✔
707
    // Can only be called if the session is active or being activated
708
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
105,386✔
709
    return m_wrapper.m_client_reset_config;
105,386✔
710
}
105,386✔
711

712
SessionReason SessionImpl::get_session_reason() noexcept
713
{
1,514✔
714
    // Can only be called if the session is active or being activated
715
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,514!
716
    return m_wrapper.m_session_reason;
1,514✔
717
}
1,514✔
718

719
uint64_t SessionImpl::get_schema_version() noexcept
720
{
1,514✔
721
    // Can only be called if the session is active or being activated
722
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,514!
723
    return m_wrapper.m_schema_version;
1,514✔
724
}
1,514✔
725

726
bool SessionImpl::upload_messages_allowed() noexcept
727
{
69,980✔
728
    // Can only be called if the session is active or being activated
729
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
69,980✔
730
    return m_wrapper.m_allow_upload_messages;
69,980✔
731
}
69,980✔
732

733
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
734
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
735
{
44,306✔
736
    // Ignore the call if the session is not active
737
    if (m_state != State::Active) {
44,306✔
738
        return;
×
739
    }
×
740

741
    try {
44,306✔
742
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
44,306!
743
        if (simulate_integration_error) {
44,306✔
744
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
745
        }
×
746
        version_type client_version;
44,306✔
747
        if (REALM_LIKELY(!get_client().is_dry_run())) {
44,306✔
748
            VersionInfo version_info;
44,306✔
749
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
44,306✔
750
            client_version = version_info.realm_version;
44,306✔
751
        }
44,306✔
752
        else {
×
753
            // Fake it for "dry run" mode
754
            client_version = m_last_version_available + 1;
×
755
        }
×
756
        on_changesets_integrated(client_version, progress); // Throws
44,306✔
757
    }
44,306✔
758
    catch (const IntegrationException& e) {
44,306✔
759
        on_integration_failure(e);
24✔
760
    }
24✔
761
}
44,306✔
762

763

764
void SessionImpl::on_download_completion()
765
{
16,252✔
766
    // Ignore the call if the session is not active
767
    if (m_state == State::Active) {
16,252✔
768
        m_wrapper.on_download_completion(); // Throws
16,252✔
769
    }
16,252✔
770
}
16,252✔
771

772

773
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
774
{
676✔
775
    // Ignore the call if the session is not active
776
    if (m_state == State::Active) {
676✔
777
        m_wrapper.on_suspended(error_info); // Throws
676✔
778
    }
676✔
779
}
676✔
780

781

782
void SessionImpl::on_resumed()
783
{
60✔
784
    // Ignore the call if the session is not active
785
    if (m_state == State::Active) {
60✔
786
        m_wrapper.on_resumed(); // Throws
60✔
787
    }
60✔
788
}
60✔
789

790
void SessionImpl::handle_pending_client_reset_acknowledgement()
791
{
10,386✔
792
    // Ignore the call if the session is not active
793
    if (m_state == State::Active) {
10,386✔
794
        m_wrapper.handle_pending_client_reset_acknowledgement();
10,386✔
795
    }
10,386✔
796
}
10,386✔
797

798
void SessionImpl::update_subscription_version_info()
799
{
296✔
800
    // Ignore the call if the session is not active
801
    if (m_state == State::Active) {
296✔
802
        m_wrapper.update_subscription_version_info();
296✔
803
    }
296✔
804
}
296✔
805

806
bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
807
{
46,636✔
808
    // Not a bootstrap message if this isn't a FLX download
809
    if (!message.last_in_batch || !message.query_version) {
46,636✔
810
        return false;
42,926✔
811
    }
42,926✔
812

813
    REALM_ASSERT(m_is_flx_sync_session);
3,710✔
814

815
    // Not a bootstrap message if it's for the already active query version
816
    if (*message.last_in_batch && *message.query_version == m_wrapper.m_flx_active_version) {
3,710✔
817
        return false;
1,380✔
818
    }
1,380✔
819

820
    auto batch_state = *message.last_in_batch ? DownloadBatchState::LastInBatch : DownloadBatchState::MoreToCome;
2,330✔
821
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,330✔
822
    std::optional<SyncProgress> maybe_progress;
2,330✔
823
    if (batch_state == DownloadBatchState::LastInBatch) {
2,330✔
824
        maybe_progress = message.progress;
2,012✔
825
    }
2,012✔
826

827
    bool new_batch = false;
2,330✔
828
    try {
2,330✔
829
        bootstrap_store->add_batch(*message.query_version, std::move(maybe_progress), message.downloadable,
2,330✔
830
                                   message.changesets, &new_batch);
2,330✔
831
    }
2,330✔
832
    catch (const LogicError& ex) {
2,330✔
833
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
834
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
835
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
836
                                    ProtocolError::bad_changeset_size);
×
837
            on_integration_failure(ex);
×
838
            return true;
×
839
        }
×
840
        throw;
×
841
    }
×
842

843
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
844
    // bootstrapping.
845
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
2,328✔
846
        on_flx_sync_progress(*message.query_version, DownloadBatchState::MoreToCome);
76✔
847
    }
76✔
848

849
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
2,328✔
850
                                       *message.query_version, batch_state, message.changesets.size());
2,328✔
851
    if (hook_action == SyncClientHookAction::EarlyReturn) {
2,328✔
852
        return true;
12✔
853
    }
12✔
854
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
2,316✔
855

856
    if (batch_state == DownloadBatchState::MoreToCome) {
2,316✔
857
        return true;
312✔
858
    }
312✔
859

860
    try {
2,004✔
861
        process_pending_flx_bootstrap();
2,004✔
862
    }
2,004✔
863
    catch (const IntegrationException& e) {
2,004✔
864
        on_integration_failure(e);
12✔
865
    }
12✔
866
    catch (...) {
2,004✔
867
        on_integration_failure(IntegrationException(exception_to_status()));
×
868
    }
×
869

870
    return true;
2,004✔
871
}
2,004✔
872

873

874
void SessionImpl::process_pending_flx_bootstrap()
875
{
12,096✔
876
    // Ignore the call if not a flx session or session is not active
877
    if (!m_is_flx_sync_session || m_state != State::Active) {
12,096✔
878
        return;
8,550✔
879
    }
8,550✔
880
    // Should never be called if session is not active
881
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,546✔
882
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,546✔
883
    if (!bootstrap_store->has_pending()) {
3,546✔
884
        return;
1,522✔
885
    }
1,522✔
886

887
    auto pending_batch_stats = bootstrap_store->pending_stats();
2,024✔
888
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
2,024✔
889
                "changeset size: %3)",
2,024✔
890
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
2,024✔
891
                pending_batch_stats.pending_changeset_bytes);
2,024✔
892
    auto& history = get_repl().get_history();
2,024✔
893
    VersionInfo new_version;
2,024✔
894
    SyncProgress progress;
2,024✔
895
    int64_t query_version = -1;
2,024✔
896
    size_t changesets_processed = 0;
2,024✔
897

898
    // Used to commit each batch after it was transformed.
899
    TransactionRef transact = get_db()->start_write();
2,024✔
900
    while (bootstrap_store->has_pending()) {
4,324✔
901
        auto start_time = std::chrono::steady_clock::now();
2,312✔
902
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,312✔
903
        if (!pending_batch.progress) {
2,312✔
904
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
905
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
906
            // bootstrap store requires a write transaction itself.
907
            transact->close();
8✔
908
            bootstrap_store->clear();
8✔
909
            return;
8✔
910
        }
8✔
911

912
        auto batch_state =
2,304✔
913
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,304✔
914
        query_version = pending_batch.query_version;
2,304✔
915
        bool simulate_integration_error =
2,304✔
916
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,304✔
917
        if (simulate_integration_error) {
2,304✔
918
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
919
        }
4✔
920

921
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,300✔
922
                        batch_state, pending_batch.changesets.size());
2,300✔
923

924
        history.integrate_server_changesets(
2,300✔
925
            *pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
2,300✔
926
            [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,300✔
927
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,288✔
928
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,288✔
929
            });
2,288✔
930
        progress = *pending_batch.progress;
2,300✔
931
        changesets_processed += pending_batch.changesets.size();
2,300✔
932
        auto duration = std::chrono::steady_clock::now() - start_time;
2,300✔
933

934
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,300✔
935
                                      batch_state, pending_batch.changesets.size());
2,300✔
936
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,300✔
937

938
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,300✔
939
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,300✔
940
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
2,300✔
941
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
2,300✔
942
                    pending_batch.remaining_changesets);
2,300✔
943
    }
2,300✔
944

945
    REALM_ASSERT_3(query_version, !=, -1);
2,012✔
946
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
2,012✔
947

948
    on_changesets_integrated(new_version.realm_version, progress);
2,012✔
949
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
2,012✔
950
                                  DownloadBatchState::LastInBatch, changesets_processed);
2,012✔
951
    // NoAction/EarlyReturn are both valid no-op actions to take here.
952
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
2,012✔
953
}
2,012✔
954

955
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
956
{
20✔
957
    // Ignore the call if the session is not active
958
    if (m_state == State::Active) {
20✔
959
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
960
    }
20✔
961
}
20✔
962

963
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
964
{
2,076✔
965
    // Ignore the call if the session is not active
966
    if (m_state == State::Active) {
2,076✔
967
        m_wrapper.on_flx_sync_progress(version, batch_state);
2,076✔
968
    }
2,076✔
969
}
2,076✔
970

971
SubscriptionStore* SessionImpl::get_flx_subscription_store()
972
{
19,298✔
973
    // Should never be called if session is not active
974
    REALM_ASSERT_EX(m_state == State::Active, m_state);
19,298✔
975
    return m_wrapper.get_flx_subscription_store();
19,298✔
976
}
19,298✔
977

978
MigrationStore* SessionImpl::get_migration_store()
979
{
67,670✔
980
    // Should never be called if session is not active
981
    REALM_ASSERT_EX(m_state == State::Active, m_state);
67,670✔
982
    return m_wrapper.get_migration_store();
67,670✔
983
}
67,670✔
984

985
void SessionImpl::on_flx_sync_version_complete(int64_t version)
986
{
300✔
987
    // Ignore the call if the session is not active
988
    if (m_state == State::Active) {
300✔
989
        m_wrapper.on_flx_sync_version_complete(version);
300✔
990
    }
300✔
991
}
300✔
992

993
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
994
{
8,396✔
995
    // Should never be called if session is not active
996
    REALM_ASSERT_EX(m_state == State::Active, m_state);
8,396✔
997

998
    // Make sure we don't call the debug hook recursively.
999
    if (m_wrapper.m_in_debug_hook) {
8,396✔
1000
        return SyncClientHookAction::NoAction;
24✔
1001
    }
24✔
1002
    m_wrapper.m_in_debug_hook = true;
8,372✔
1003
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
8,372✔
1004
        m_wrapper.m_in_debug_hook = false;
8,372✔
1005
    });
8,372✔
1006

1007
    auto action = m_wrapper.m_debug_hook(data);
8,372✔
1008
    switch (action) {
8,372✔
1009
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1010
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1011
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1012

1013
            auto err_processing_err = receive_error_message(err_info);
12✔
1014
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1015
            return SyncClientHookAction::EarlyReturn;
12✔
1016
        }
×
1017
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1018
            get_connection().voluntary_disconnect();
24✔
1019
            return SyncClientHookAction::EarlyReturn;
24✔
1020
        }
×
1021
        default:
8,328✔
1022
            return action;
8,328✔
1023
    }
8,372✔
1024
}
8,372✔
1025

1026
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1027
                                                  int64_t query_version, DownloadBatchState batch_state,
1028
                                                  size_t num_changesets)
1029
{
178,044✔
1030
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
178,044✔
1031
        return SyncClientHookAction::NoAction;
170,128✔
1032
    }
170,128✔
1033
    if (REALM_UNLIKELY(m_state != State::Active)) {
7,916✔
1034
        return SyncClientHookAction::NoAction;
×
1035
    }
×
1036

1037
    SyncClientHookData data;
7,916✔
1038
    data.event = event;
7,916✔
1039
    data.batch_state = batch_state;
7,916✔
1040
    data.progress = progress;
7,916✔
1041
    data.num_changesets = num_changesets;
7,916✔
1042
    data.query_version = query_version;
7,916✔
1043

1044
    return call_debug_hook(data);
7,916✔
1045
}
7,916✔
1046

1047
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1048
{
1,392✔
1049
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,392✔
1050
        return SyncClientHookAction::NoAction;
908✔
1051
    }
908✔
1052
    if (REALM_UNLIKELY(m_state != State::Active)) {
484✔
1053
        return SyncClientHookAction::NoAction;
×
1054
    }
×
1055

1056
    SyncClientHookData data;
484✔
1057
    data.event = event;
484✔
1058
    data.batch_state = DownloadBatchState::SteadyState;
484✔
1059
    data.progress = m_progress;
484✔
1060
    data.num_changesets = 0;
484✔
1061
    data.query_version = 0;
484✔
1062
    data.error_info = &error_info;
484✔
1063

1064
    return call_debug_hook(data);
484✔
1065
}
484✔
1066

1067
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
1068
{
78,166✔
1069
    return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
78,166✔
1070
}
78,166✔
1071

1072
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1073
{
46,652✔
1074
    // Should never be called if session is not active
1075
    REALM_ASSERT_EX(m_state == State::Active, m_state);
46,652✔
1076
    if (batch_state == DownloadBatchState::SteadyState) {
46,652✔
UNCOV
1077
        return true;
×
UNCOV
1078
    }
×
1079

1080
    if (!m_is_flx_sync_session) {
46,652✔
1081
        return true;
42,932✔
1082
    }
42,932✔
1083

1084
    // If this is a steady state DOWNLOAD, no need for special handling.
1085
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
3,720✔
1086
        return true;
1,380✔
1087
    }
1,380✔
1088

1089
    return false;
2,340✔
1090
}
3,720✔
1091

1092
void SessionImpl::init_progress_handler()
1093
{
10,386✔
1094
    REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
10,386✔
1095
    m_wrapper.init_progress_handler();
10,386✔
1096
}
10,386✔
1097

1098
void SessionImpl::enable_progress_notifications()
1099
{
44,982✔
1100
    m_wrapper.m_reliable_download_progress = true;
44,982✔
1101
}
44,982✔
1102

1103
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1104
{
60✔
1105
    if (m_state != State::Active) {
60✔
1106
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1107
    }
×
1108

1109
    try {
60✔
1110
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1111
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1112
            return Status{ErrorCodes::LogicError,
4✔
1113
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1114
        }
4✔
1115
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
1116
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1117
        }
×
1118
    }
56✔
1119
    catch (const nlohmann::json::parse_error& e) {
60✔
1120
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1121
    }
4✔
1122

1123
    auto pf = util::make_promise_future<std::string>();
52✔
1124
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1125
        // Includes operation_aborted
1126
        if (!status.is_ok()) {
52✔
1127
            promise.set_error(status);
×
1128
            return;
×
1129
        }
×
1130

1131
        auto id = ++m_last_pending_test_command_ident;
52✔
1132
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1133
        ensure_enlisted_to_send();
52✔
1134
    });
52✔
1135

1136
    return std::move(pf.future);
52✔
1137
}
60✔
1138

1139
// ################ SessionWrapper ################
1140

1141
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1142
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1143
// the ClientImpl::Connection that owns the ClientImpl::Session.
1144
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1145
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
1146
    : m_client{client}
4,916✔
1147
    , m_db(std::move(db))
4,916✔
1148
    , m_replication(m_db->get_replication())
4,916✔
1149
    , m_protocol_envelope{config.protocol_envelope}
4,916✔
1150
    , m_server_address{std::move(config.server_address)}
4,916✔
1151
    , m_server_port{config.server_port}
4,916✔
1152
    , m_server_verified{config.server_verified}
4,916✔
1153
    , m_user_id(std::move(config.user_id))
4,916✔
1154
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
4,916✔
1155
    , m_authorization_header_name{config.authorization_header_name}
4,916✔
1156
    , m_custom_http_headers{std::move(config.custom_http_headers)}
4,916✔
1157
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
4,916✔
1158
    , m_simulate_integration_error{config.simulate_integration_error}
4,916✔
1159
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
4,916✔
1160
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
4,916✔
1161
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
4,916✔
1162
    , m_http_request_path_prefix{std::move(config.service_identifier)}
4,916✔
1163
    , m_virt_path{std::move(config.realm_identifier)}
4,916✔
1164
    , m_proxy_config{std::move(config.proxy_config)}
4,916✔
1165
    , m_signed_access_token{std::move(config.signed_user_token)}
4,916✔
1166
    , m_client_reset_config{std::move(config.client_reset_config)}
4,916✔
1167
    , m_progress_handler(std::move(config.progress_handler))
4,916✔
1168
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
4,916✔
1169
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
4,916✔
1170
    , m_session_reason(m_client_reset_config || config.fresh_realm_download ? SessionReason::ClientReset
4,916✔
1171
                                                                            : SessionReason::Sync)
4,916✔
1172
    , m_allow_upload_messages(!config.fresh_realm_download)
4,916✔
1173
    , m_schema_version(config.schema_version)
4,916✔
1174
    , m_flx_subscription_store(std::move(flx_sub_store))
4,916✔
1175
    , m_migration_store(std::move(migration_store))
4,916✔
1176
{
10,174✔
1177
    REALM_ASSERT(m_db);
10,174✔
1178
    REALM_ASSERT(m_db->get_replication());
10,174✔
1179
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,174✔
1180

1181
    // SessionWrapper begins at +1 retain count because Client retains and
1182
    // releases it while performing async operations, and these need to not
1183
    // take it to 0 or it could be deleted before the caller can retain it.
1184
    bind_ptr();
10,174✔
1185
    m_client.register_unactualized_session_wrapper(this);
10,174✔
1186
}
10,174✔
1187

1188
SessionWrapper::~SessionWrapper() noexcept
1189
{
10,172✔
1190
    // We begin actualization in the constructor and do not delete the wrapper
1191
    // until both the Client is done with it and the Session has abandoned it,
1192
    // so at this point we must have actualized, finalized, and been abandoned.
1193
    REALM_ASSERT(m_actualized);
10,172✔
1194
    REALM_ASSERT(m_abandoned);
10,172✔
1195
    REALM_ASSERT(m_finalized);
10,172✔
1196
    REALM_ASSERT(m_closed);
10,172✔
1197
    REALM_ASSERT(!m_db);
10,172✔
1198
}
10,172✔
1199

1200

1201
inline ClientReplication& SessionWrapper::get_replication() noexcept
1202
{
120,262✔
1203
    REALM_ASSERT(m_db);
120,262✔
1204
    return static_cast<ClientReplication&>(*m_replication);
120,262✔
1205
}
120,262✔
1206

1207

1208
inline ClientImpl& SessionWrapper::get_client() noexcept
1209
{
×
1210
    return m_client;
×
1211
}
×
1212

1213
bool SessionWrapper::has_flx_subscription_store() const
1214
{
2,076✔
1215
    return static_cast<bool>(m_flx_subscription_store);
2,076✔
1216
}
2,076✔
1217

1218
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1219
{
20✔
1220
    REALM_ASSERT(!m_finalized);
20✔
1221
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1222
}
20✔
1223

1224
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1225
{
2,300✔
1226
    REALM_ASSERT(!m_finalized);
2,300✔
1227
    m_flx_last_seen_version = version;
2,300✔
1228
    m_flx_active_version = version;
2,300✔
1229
}
2,300✔
1230

1231
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1232
{
2,076✔
1233
    if (!has_flx_subscription_store()) {
2,076✔
1234
        return;
×
1235
    }
×
1236
    REALM_ASSERT(!m_finalized);
2,076✔
1237
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
2,076✔
1238
    REALM_ASSERT(new_version >= m_flx_active_version);
2,076✔
1239
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
2,076✔
1240

1241
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
2,076✔
1242

1243
    switch (batch_state) {
2,076✔
1244
        case DownloadBatchState::SteadyState:
✔
1245
            // Cannot be called with this value.
1246
            REALM_UNREACHABLE();
1247
        case DownloadBatchState::LastInBatch:
2,000✔
1248
            if (m_flx_active_version == new_version) {
2,000✔
1249
                return;
×
1250
            }
×
1251
            on_flx_sync_version_complete(new_version);
2,000✔
1252
            if (new_version == 0) {
2,000✔
1253
                new_state = SubscriptionSet::State::Complete;
956✔
1254
            }
956✔
1255
            else {
1,044✔
1256
                new_state = SubscriptionSet::State::AwaitingMark;
1,044✔
1257
                m_flx_pending_mark_version = new_version;
1,044✔
1258
            }
1,044✔
1259
            break;
2,000✔
1260
        case DownloadBatchState::MoreToCome:
76✔
1261
            if (m_flx_last_seen_version == new_version) {
76✔
1262
                return;
×
1263
            }
×
1264

1265
            m_flx_last_seen_version = new_version;
76✔
1266
            new_state = SubscriptionSet::State::Bootstrapping;
76✔
1267
            break;
76✔
1268
    }
2,076✔
1269

1270
    get_flx_subscription_store()->update_state(new_version, new_state);
2,076✔
1271
}
2,076✔
1272

1273
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1274
{
21,394✔
1275
    REALM_ASSERT(!m_finalized);
21,394✔
1276
    return m_flx_subscription_store.get();
21,394✔
1277
}
21,394✔
1278

1279
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1280
{
5,874✔
1281
    REALM_ASSERT(!m_finalized);
5,874✔
1282
    return m_flx_pending_bootstrap_store.get();
5,874✔
1283
}
5,874✔
1284

1285
MigrationStore* SessionWrapper::get_migration_store()
1286
{
67,672✔
1287
    REALM_ASSERT(!m_finalized);
67,672✔
1288
    return m_migration_store.get();
67,672✔
1289
}
67,672✔
1290

1291
inline bool SessionWrapper::mark_abandoned()
1292
{
10,174✔
1293
    REALM_ASSERT(!m_abandoned);
10,174✔
1294
    m_abandoned = true;
10,174✔
1295
    return m_finalized;
10,174✔
1296
}
10,174✔
1297

1298

1299
void SessionWrapper::on_commit(version_type new_version)
1300
{
114,290✔
1301
    // Thread safety required
1302
    m_client.post([self = util::bind_ptr{this}, new_version] {
114,290✔
1303
        REALM_ASSERT(self->m_actualized);
114,286✔
1304
        if (REALM_UNLIKELY(!self->m_sess))
114,286✔
1305
            return; // Already finalized
462✔
1306
        SessionImpl& sess = *self->m_sess;
113,824✔
1307
        sess.recognize_sync_version(new_version); // Throws
113,824✔
1308
        self->check_progress();                   // Throws
113,824✔
1309
    });
113,824✔
1310
}
114,290✔
1311

1312

1313
void SessionWrapper::cancel_reconnect_delay()
1314
{
28✔
1315
    // Thread safety required
1316

1317
    m_client.post([self = util::bind_ptr{this}] {
28✔
1318
        REALM_ASSERT(self->m_actualized);
28✔
1319
        if (REALM_UNLIKELY(self->m_closed)) {
28✔
1320
            return;
×
1321
        }
×
1322

1323
        if (REALM_UNLIKELY(!self->m_sess))
28✔
1324
            return; // Already finalized
×
1325
        SessionImpl& sess = *self->m_sess;
28✔
1326
        sess.cancel_resumption_delay(); // Throws
28✔
1327
        ClientImpl::Connection& conn = sess.get_connection();
28✔
1328
        conn.cancel_reconnect_delay(); // Throws
28✔
1329
    });                                // Throws
28✔
1330
}
28✔
1331

1332
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1333
                                    WaitOperCompletionHandler handler)
1334
{
28,202✔
1335
    REALM_ASSERT(upload_completion || download_completion);
28,202✔
1336

1337
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
28,202✔
1338
                   download_completion](Status status) mutable {
28,202✔
1339
        REALM_ASSERT(self->m_actualized);
28,202✔
1340
        if (!status.is_ok()) {
28,202✔
1341
            handler(status); // Throws
×
1342
            return;
×
1343
        }
×
1344
        if (REALM_UNLIKELY(!self->m_sess)) {
28,202✔
1345
            // Already finalized
1346
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
188✔
1347
            return;
188✔
1348
        }
188✔
1349
        if (upload_completion) {
28,014✔
1350
            self->m_upload_completion_requested_version = self->m_db->get_version_of_latest_snapshot();
15,390✔
1351
            if (download_completion) {
15,390✔
1352
                // Wait for upload and download completion
1353
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
314✔
1354
            }
314✔
1355
            else {
15,076✔
1356
                // Wait for upload completion only
1357
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
15,076✔
1358
            }
15,076✔
1359
        }
15,390✔
1360
        else {
12,624✔
1361
            // Wait for download completion only
1362
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
12,624✔
1363
        }
12,624✔
1364
        SessionImpl& sess = *self->m_sess;
28,014✔
1365
        if (upload_completion)
28,014✔
1366
            self->check_progress();
15,390✔
1367
        if (download_completion)
28,014✔
1368
            sess.request_download_completion_notification(); // Throws
12,938✔
1369
    });                                                      // Throws
28,014✔
1370
}
28,202✔
1371

1372

1373
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1374
{
12,912✔
1375
    // Thread safety required
1376
    REALM_ASSERT(!m_abandoned);
12,912✔
1377

1378
    auto pf = util::make_promise_future<bool>();
12,912✔
1379
    async_wait_for(true, false, [promise = std::move(pf.promise)](Status status) mutable {
12,912✔
1380
        promise.emplace_value(status.is_ok());
12,912✔
1381
    });
12,912✔
1382
    return pf.future.get();
12,912✔
1383
}
12,912✔
1384

1385

1386
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1387
{
10,004✔
1388
    // Thread safety required
1389
    REALM_ASSERT(!m_abandoned);
10,004✔
1390

1391
    auto pf = util::make_promise_future<bool>();
10,004✔
1392
    async_wait_for(false, true, [promise = std::move(pf.promise)](Status status) mutable {
10,004✔
1393
        promise.emplace_value(status.is_ok());
10,004✔
1394
    });
10,004✔
1395
    return pf.future.get();
10,004✔
1396
}
10,004✔
1397

1398

1399
void SessionWrapper::refresh(std::string_view signed_access_token)
1400
{
216✔
1401
    // Thread safety required
1402
    REALM_ASSERT(!m_abandoned);
216✔
1403

1404
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
216✔
1405
        REALM_ASSERT(self->m_actualized);
216✔
1406
        if (REALM_UNLIKELY(!self->m_sess))
216✔
1407
            return; // Already finalized
×
1408
        self->m_signed_access_token = std::move(token);
216✔
1409
        SessionImpl& sess = *self->m_sess;
216✔
1410
        ClientImpl::Connection& conn = sess.get_connection();
216✔
1411
        // FIXME: This only makes sense when each session uses a separate connection.
1412
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
216✔
1413
        sess.cancel_resumption_delay();                                                          // Throws
216✔
1414
        conn.cancel_reconnect_delay();                                                           // Throws
216✔
1415
    });
216✔
1416
}
216✔
1417

1418

1419
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1420
{
10,172✔
1421
    ClientImpl& client = wrapper->m_client;
10,172✔
1422
    client.register_abandoned_session_wrapper(std::move(wrapper));
10,172✔
1423
}
10,172✔
1424

1425

1426
// Must be called from event loop thread
1427
void SessionWrapper::actualize()
1428
{
10,096✔
1429
    // actualize() can only ever be called once
1430
    REALM_ASSERT(!m_actualized);
10,096✔
1431
    REALM_ASSERT(!m_sess);
10,096✔
1432
    // The client should have removed this wrapper from those pending
1433
    // actualization if it called force_close() or finalize_before_actualize()
1434
    REALM_ASSERT(!m_finalized);
10,096✔
1435
    REALM_ASSERT(!m_closed);
10,096✔
1436

1437
    m_actualized = true;
10,096✔
1438

1439
    ScopeExitFail close_on_error([&]() noexcept {
10,096✔
1440
        m_closed = true;
4✔
1441
    });
4✔
1442

1443
    m_db->claim_sync_agent();
10,096✔
1444
    m_db->add_commit_listener(this);
10,096✔
1445
    ScopeExitFail remove_commit_listener([&]() noexcept {
10,096✔
1446
        m_db->remove_commit_listener(this);
×
1447
    });
×
1448

1449
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,096✔
1450
                            m_user_id,           m_sync_mode,      m_server_verified};
10,096✔
1451
    bool was_created = false;
10,096✔
1452
    ClientImpl::Connection& conn = m_client.get_connection(
10,096✔
1453
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,096✔
1454
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,096✔
1455
        was_created); // Throws
10,096✔
1456
    ScopeExitFail remove_connection([&]() noexcept {
10,096✔
1457
        if (was_created)
×
1458
            m_client.remove_connection(conn);
×
1459
    });
×
1460

1461
    // FIXME: This only makes sense when each session uses a separate connection.
1462
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,096✔
1463
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,096✔
1464
    if (m_sync_mode == SyncServerMode::FLX) {
10,096✔
1465
        m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,542✔
1466
    }
1,542✔
1467

1468
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,096✔
1469
    m_sess = sess.get();
10,096✔
1470
    ScopeExitFail clear_sess([&]() noexcept {
10,096✔
1471
        m_sess = nullptr;
×
1472
    });
×
1473
    conn.activate_session(std::move(sess)); // Throws
10,096✔
1474

1475
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1476
    // session cannot change the state of the bootstrap store at the same time.
1477
    update_subscription_version_info();
10,096✔
1478

1479
    if (was_created)
10,096✔
1480
        conn.activate(); // Throws
2,796✔
1481

1482
    if (m_connection_state_change_listener) {
10,096✔
1483
        ConnectionState state = conn.get_state();
10,082✔
1484
        if (state != ConnectionState::disconnected) {
10,082✔
1485
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,116✔
1486
            if (state == ConnectionState::connected)
7,116✔
1487
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
7,008✔
1488
        }
7,116✔
1489
    }
10,082✔
1490

1491
    if (!m_client_reset_config)
10,096✔
1492
        check_progress(); // Throws
9,714✔
1493
}
10,096✔
1494

1495
void SessionWrapper::force_close()
1496
{
10,198✔
1497
    if (m_closed) {
10,198✔
1498
        return;
106✔
1499
    }
106✔
1500
    REALM_ASSERT(m_actualized);
10,092✔
1501
    REALM_ASSERT(m_sess);
10,092✔
1502
    m_closed = true;
10,092✔
1503

1504
    ClientImpl::Connection& conn = m_sess->get_connection();
10,092✔
1505
    conn.initiate_session_deactivation(m_sess); // Throws
10,092✔
1506

1507
    // We need to keep the DB open until finalization, but we no longer want to
1508
    // know when commits are made
1509
    m_db->remove_commit_listener(this);
10,092✔
1510

1511
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1512
    m_flx_pending_bootstrap_store.reset();
10,092✔
1513
    // Clear the subscription and migration store refs since they are owned by SyncSession
1514
    m_flx_subscription_store.reset();
10,092✔
1515
    m_migration_store.reset();
10,092✔
1516
    m_sess = nullptr;
10,092✔
1517
    // Everything is being torn down, no need to report connection state anymore
1518
    m_connection_state_change_listener = {};
10,092✔
1519

1520
    // All outstanding wait operations must be canceled
1521
    while (!m_upload_completion_handlers.empty()) {
10,452✔
1522
        auto handler = std::move(m_upload_completion_handlers.back());
360✔
1523
        m_upload_completion_handlers.pop_back();
360✔
1524
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before upload was complete"}); // Throws
360✔
1525
    }
360✔
1526
    while (!m_download_completion_handlers.empty()) {
10,474✔
1527
        auto handler = std::move(m_download_completion_handlers.back());
382✔
1528
        m_download_completion_handlers.pop_back();
382✔
1529
        handler(
382✔
1530
            {ErrorCodes::OperationAborted, "Sync session is being closed before download was complete"}); // Throws
382✔
1531
    }
382✔
1532
    while (!m_sync_completion_handlers.empty()) {
10,104✔
1533
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1534
        m_sync_completion_handlers.pop_back();
12✔
1535
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before sync was complete"}); // Throws
12✔
1536
    }
12✔
1537
}
10,092✔
1538

1539
// Must be called from event loop thread
1540
//
1541
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
1542
// have been acquired at some point in between the final read or write ever made
1543
// from a different thread and when this is called.
1544
void SessionWrapper::finalize()
1545
{
10,096✔
1546
    REALM_ASSERT(m_actualized);
10,096✔
1547
    REALM_ASSERT(m_abandoned);
10,096✔
1548
    REALM_ASSERT(!m_finalized);
10,096✔
1549

1550
    force_close();
10,096✔
1551

1552
    m_finalized = true;
10,096✔
1553

1554
    // The Realm file can be closed now, as no access to the Realm file is
1555
    // supposed to happen on behalf of a session after initiation of
1556
    // deactivation.
1557
    m_db->release_sync_agent();
10,096✔
1558
    m_db = nullptr;
10,096✔
1559
}
10,096✔
1560

1561

1562
// Must be called only when an unactualized session wrapper becomes abandoned.
1563
//
1564
// Called with a lock on `m_client.m_mutex`.
1565
inline void SessionWrapper::finalize_before_actualization() noexcept
1566
{
78✔
1567
    REALM_ASSERT(!m_finalized);
78✔
1568
    REALM_ASSERT(!m_sess);
78✔
1569
    m_actualized = true;
78✔
1570
    m_finalized = true;
78✔
1571
    m_closed = true;
78✔
1572
    m_db->remove_commit_listener(this);
78✔
1573
    m_db->release_sync_agent();
78✔
1574
    m_db = nullptr;
78✔
1575
}
78✔
1576

1577
void SessionWrapper::on_download_completion()
1578
{
16,252✔
1579
    // Ensure that progress handlers get called before completion handlers. The
1580
    // download completing performed a commit and will trigger progress
1581
    // notifications asynchronously, but they would arrive after the download
1582
    // completion without this.
1583
    check_progress();
16,252✔
1584

1585
    while (!m_download_completion_handlers.empty()) {
28,702✔
1586
        auto handler = std::move(m_download_completion_handlers.back());
12,450✔
1587
        m_download_completion_handlers.pop_back();
12,450✔
1588
        handler(Status::OK()); // Throws
12,450✔
1589
    }
12,450✔
1590
    while (!m_sync_completion_handlers.empty()) {
16,346✔
1591
        auto handler = std::move(m_sync_completion_handlers.back());
94✔
1592
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
94✔
1593
        m_sync_completion_handlers.pop_back();
94✔
1594
    }
94✔
1595

1596
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
16,252✔
1597
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
936✔
1598
                             m_flx_pending_mark_version);
936✔
1599
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
936✔
1600
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
936✔
1601
    }
936✔
1602
}
16,252✔
1603

1604

1605
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1606
{
676✔
1607
    REALM_ASSERT(!m_finalized);
676✔
1608
    m_suspended = true;
676✔
1609
    if (m_connection_state_change_listener) {
676✔
1610
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
676✔
1611
    }
676✔
1612
}
676✔
1613

1614

1615
void SessionWrapper::on_resumed()
1616
{
60✔
1617
    REALM_ASSERT(!m_finalized);
60✔
1618
    m_suspended = false;
60✔
1619
    if (m_connection_state_change_listener) {
60✔
1620
        ClientImpl::Connection& conn = m_sess->get_connection();
60✔
1621
        if (conn.get_state() != ConnectionState::disconnected) {
60✔
1622
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
54✔
1623
            if (conn.get_state() == ConnectionState::connected)
54✔
1624
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
46✔
1625
        }
54✔
1626
    }
60✔
1627
}
60✔
1628

1629

1630
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1631
                                                 const std::optional<SessionErrorInfo>& error_info)
1632
{
11,978✔
1633
    if (m_connection_state_change_listener && !m_suspended) {
11,978✔
1634
        m_connection_state_change_listener(state, error_info); // Throws
11,942✔
1635
    }
11,942✔
1636
}
11,978✔
1637

1638
void SessionWrapper::init_progress_handler()
1639
{
10,386✔
1640
    ClientHistory::get_upload_download_state(m_db.get(), m_final_downloaded, m_final_uploaded);
10,386✔
1641
}
10,386✔
1642

1643
void SessionWrapper::check_progress()
1644
{
155,176✔
1645
    REALM_ASSERT(!m_finalized);
155,176✔
1646
    REALM_ASSERT(m_sess);
155,176✔
1647

1648
    if (!m_progress_handler && m_upload_completion_handlers.empty() && m_sync_completion_handlers.empty())
155,176✔
1649
        return;
72,842✔
1650

1651
    version_type uploaded_version;
82,334✔
1652
    ReportedProgress p;
82,334✔
1653
    DownloadableProgress downloadable;
82,334✔
1654
    ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
82,334✔
1655
                                             uploaded_version);
82,334✔
1656
    p.query_version = m_flx_last_seen_version;
82,334✔
1657

1658
    report_progress(p, downloadable);
82,334✔
1659
    report_upload_completion(uploaded_version);
82,334✔
1660
}
82,334✔
1661

1662
void SessionWrapper::report_upload_completion(version_type uploaded_version)
1663
{
82,338✔
1664
    if (uploaded_version < m_upload_completion_requested_version)
82,338✔
1665
        return;
62,754✔
1666

1667
    std::move(m_sync_completion_handlers.begin(), m_sync_completion_handlers.end(),
19,584✔
1668
              std::back_inserter(m_download_completion_handlers));
19,584✔
1669
    m_sync_completion_handlers.clear();
19,584✔
1670

1671
    while (!m_upload_completion_handlers.empty()) {
34,394✔
1672
        auto handler = std::move(m_upload_completion_handlers.back());
14,810✔
1673
        m_upload_completion_handlers.pop_back();
14,810✔
1674
        handler(Status::OK()); // Throws
14,810✔
1675
    }
14,810✔
1676
}
19,584✔
1677

1678
void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress downloadable)
1679
{
82,338✔
1680
    if (!m_progress_handler)
82,338✔
1681
        return;
27,848✔
1682

1683
    // Ignore progress messages from before we first receive a DOWNLOAD message
1684
    if (!m_reliable_download_progress)
54,490✔
1685
        return;
29,088✔
1686

1687
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
25,402✔
1688
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
18,976✔
1689
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
18,976✔
1690

1691
        // The effect of this calculation is that if new bytes are added for download/upload,
1692
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1693
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1694
        // before progress has reached 1.0.
1695
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1696
        // Example for upload progress reported:
1697
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1698

1699
        double progress_estimate = 1.0;
18,976✔
1700
        if (final_transferred < transferable && transferred < transferable)
18,976✔
1701
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
9,816✔
1702
        return progress_estimate;
18,976✔
1703
    };
18,976✔
1704

1705
    bool upload_completed = p.uploaded == p.uploadable;
25,402✔
1706
    double upload_estimate = 1.0;
25,402✔
1707
    if (!upload_completed)
25,402✔
1708
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);
9,768✔
1709

1710
    bool download_completed = p.downloaded == 0;
25,402✔
1711
    p.download_estimate = 1.00;
25,402✔
1712
    if (m_flx_pending_bootstrap_store) {
25,402✔
1713
        p.download_estimate = downloadable.as_estimate();
12,954✔
1714
        if (m_flx_pending_bootstrap_store->has_pending()) {
12,954✔
1715
            p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
852✔
1716
        }
852✔
1717
        download_completed = p.download_estimate >= 1.0;
12,954✔
1718

1719
        // for flx with download estimate these bytes are not known
1720
        // provide some sensible value for non-streaming version of object-store callbacks
1721
        // until these field are completely removed from the api after pbs deprecation
1722
        p.downloadable = p.downloaded;
12,954✔
1723
        if (p.download_estimate > 0 && p.download_estimate < 1.0 && p.downloaded > m_final_downloaded)
12,954✔
1724
            p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / p.download_estimate);
856✔
1725
    }
12,954✔
1726
    else {
12,448✔
1727
        // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1728
        // is only the remaining to download. This is confusing, so make them use
1729
        // the same units.
1730
        p.downloadable = downloadable.as_bytes() + p.downloaded;
12,448✔
1731
        if (!download_completed)
12,448✔
1732
            p.download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
9,208✔
1733
    }
12,448✔
1734

1735
    if (download_completed)
25,402✔
1736
        m_final_downloaded = p.downloaded;
15,338✔
1737
    if (upload_completed)
25,402✔
1738
        m_final_uploaded = p.uploaded;
15,634✔
1739

1740
    if (p == m_reported_progress)
25,402✔
1741
        return;
17,872✔
1742

1743
    m_reported_progress = p;
7,530✔
1744

1745
    if (m_sess->logger.would_log(Logger::Level::debug)) {
7,530✔
1746
        auto to_str = [](double d) {
14,572✔
1747
            std::ostringstream ss;
14,572✔
1748
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1749
            ss << std::fixed << std::setprecision(4) << d;
14,572✔
1750
            return ss.str();
14,572✔
1751
        };
14,572✔
1752
        m_sess->logger.debug(
7,286✔
1753
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
7,286✔
1754
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
7,286✔
1755
            p.downloaded, p.downloadable, to_str(p.download_estimate), p.uploaded, p.uploadable,
7,286✔
1756
            to_str(upload_estimate), p.snapshot, p.query_version);
7,286✔
1757
    }
7,286✔
1758

1759
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, p.download_estimate,
7,530✔
1760
                       upload_estimate, p.query_version);
7,530✔
1761
}
7,530✔
1762

1763
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1764
{
60✔
1765
    if (!m_sess) {
60✔
1766
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1767
    }
×
1768

1769
    return m_sess->send_test_command(std::move(body));
60✔
1770
}
60✔
1771

1772
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1773
{
10,386✔
1774
    REALM_ASSERT(!m_finalized);
10,386✔
1775

1776
    auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
10,386✔
1777
    if (!has_pending_reset) {
10,386✔
1778
        return; // nothing to do
10,062✔
1779
    }
10,062✔
1780

1781
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
324✔
1782

1783
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
1784
    async_wait_for(
324✔
1785
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
324✔
1786
            if (status == ErrorCodes::OperationAborted) {
322✔
1787
                return;
160✔
1788
            }
160✔
1789
            auto& logger = self->m_sess->logger;
162✔
1790
            if (!status.is_ok()) {
162✔
1791
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
×
1792
                             status);
×
1793
                return;
×
1794
            }
×
1795

1796
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
162✔
1797

1798
            auto tr = self->m_db->start_write();
162✔
1799
            auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
162✔
1800
            if (!cur_pending_reset) {
162✔
1801
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
4✔
1802
                return;
4✔
1803
            }
4✔
1804
            if (*cur_pending_reset == pending_reset) {
158✔
1805
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
158✔
1806
            }
158✔
1807
            else {
×
1808
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
×
1809
            }
×
1810
            PendingResetStore::clear_pending_reset(tr);
158✔
1811
            tr->commit();
158✔
1812
        });
158✔
1813
}
324✔
1814

1815
void SessionWrapper::update_subscription_version_info()
1816
{
10,388✔
1817
    if (!m_flx_subscription_store)
10,388✔
1818
        return;
8,732✔
1819
    auto versions_info = m_flx_subscription_store->get_version_info();
1,656✔
1820
    m_flx_active_version = versions_info.active;
1,656✔
1821
    m_flx_pending_mark_version = versions_info.pending_mark;
1,656✔
1822
}
1,656✔
1823

1824
std::string SessionWrapper::get_appservices_connection_id()
1825
{
72✔
1826
    auto pf = util::make_promise_future<std::string>();
72✔
1827

1828
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
72✔
1829
        if (!status.is_ok()) {
72✔
1830
            promise.set_error(status);
×
1831
            return;
×
1832
        }
×
1833

1834
        if (!self->m_sess) {
72✔
1835
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1836
            return;
×
1837
        }
×
1838

1839
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1840
    });
72✔
1841

1842
    return pf.future.get();
72✔
1843
}
72✔
1844

1845
// ################ ClientImpl::Connection ################
1846

1847
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1848
                                   const std::string& authorization_header_name,
1849
                                   const std::map<std::string, std::string>& custom_http_headers,
1850
                                   bool verify_servers_ssl_certificate,
1851
                                   Optional<std::string> ssl_trust_certificate_path,
1852
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1853
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1854
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1,334✔
1855
                                                      client.logger_ptr)} // Throws
1,334✔
1856
    , logger{*logger_ptr}
1,334✔
1857
    , m_client{client}
1,334✔
1858
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,334✔
1859
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,334✔
1860
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,334✔
1861
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,334✔
1862
    , m_reconnect_info{reconnect_info}
1,334✔
1863
    , m_ident{ident}
1,334✔
1864
    , m_server_endpoint{std::move(endpoint)}
1,334✔
1865
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,334✔
1866
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,334✔
1867
{
2,800✔
1868
    m_on_idle = m_client.create_trigger([this](Status status) {
2,810✔
1869
        if (status == ErrorCodes::OperationAborted)
2,810✔
1870
            return;
×
1871
        else if (!status.is_ok())
2,810✔
1872
            throw Exception(status);
×
1873

1874
        REALM_ASSERT(m_activated);
2,810✔
1875
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,810✔
1876
            on_idle(); // Throws
2,798✔
1877
            // Connection object may be destroyed now.
1878
        }
2,798✔
1879
    });
2,810✔
1880
}
2,800✔
1881

1882
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1883
{
12✔
1884
    return m_ident;
12✔
1885
}
12✔
1886

1887

1888
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1889
{
2,800✔
1890
    return m_server_endpoint;
2,800✔
1891
}
2,800✔
1892

1893
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1894
                                                        const std::string& signed_access_token)
1895
{
10,308✔
1896
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,308✔
1897
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,308✔
1898
}
10,308✔
1899

1900

1901
void ClientImpl::Connection::resume_active_sessions()
1902
{
1,904✔
1903
    auto handler = [=](ClientImpl::Session& sess) {
3,804✔
1904
        sess.cancel_resumption_delay(); // Throws
3,804✔
1905
    };
3,804✔
1906
    for_each_active_session(std::move(handler)); // Throws
1,904✔
1907
}
1,904✔
1908

1909
void ClientImpl::Connection::on_idle()
1910
{
2,800✔
1911
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,800✔
1912
    ClientImpl& client = get_client();
2,800✔
1913
    client.remove_connection(*this);
2,800✔
1914
    // NOTE: This connection object is now destroyed!
1915
}
2,800✔
1916

1917

1918
std::string ClientImpl::Connection::get_http_request_path() const
1919
{
3,778✔
1920
    using namespace std::string_view_literals;
3,778✔
1921
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
2,147,485,613✔
1922

1923
    std::string path;
3,778✔
1924
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,778✔
1925
    path += m_http_request_path_prefix;
3,778✔
1926
    path += param;
3,778✔
1927
    path += m_signed_access_token;
3,778✔
1928

1929
    return path;
3,778✔
1930
}
3,778✔
1931

1932

1933
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1934
{
2,800✔
1935
    return util::format("Connection[%1] ", ident);
2,800✔
1936
}
2,800✔
1937

1938

1939
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1940
                                                            std::optional<SessionErrorInfo> error_info)
1941
{
11,128✔
1942
    if (m_force_closed) {
11,128✔
1943
        return;
2,430✔
1944
    }
2,430✔
1945
    auto handler = [=](ClientImpl::Session& sess) {
11,828✔
1946
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
11,828✔
1947
        sess_2.on_connection_state_changed(state, error_info); // Throws
11,828✔
1948
    };
11,828✔
1949
    for_each_active_session(std::move(handler)); // Throws
8,698✔
1950
}
8,698✔
1951

1952

1953
Client::Client(Config config)
1954
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,900✔
1955
{
9,938✔
1956
}
9,938✔
1957

1958

1959
Client::Client(Client&& client) noexcept
1960
    : m_impl{std::move(client.m_impl)}
1961
{
×
1962
}
×
1963

1964

1965
Client::~Client() noexcept {}
9,938✔
1966

1967

1968
void Client::shutdown() noexcept
1969
{
10,020✔
1970
    m_impl->shutdown();
10,020✔
1971
}
10,020✔
1972

1973
void Client::shutdown_and_wait()
1974
{
772✔
1975
    m_impl->shutdown_and_wait();
772✔
1976
}
772✔
1977

1978
void Client::cancel_reconnect_delay()
1979
{
1,908✔
1980
    m_impl->cancel_reconnect_delay();
1,908✔
1981
}
1,908✔
1982

1983
void Client::voluntary_disconnect_all_connections()
1984
{
12✔
1985
    m_impl->voluntary_disconnect_all_connections();
12✔
1986
}
12✔
1987

1988
bool Client::wait_for_session_terminations_or_client_stopped()
1989
{
9,572✔
1990
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,572✔
1991
}
9,572✔
1992

1993
util::Future<void> Client::notify_session_terminated()
1994
{
56✔
1995
    return m_impl->notify_session_terminated();
56✔
1996
}
56✔
1997

1998
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
1999
                                  port_type& port, std::string& path) const
2000
{
4,074✔
2001
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
4,074✔
2002
}
4,074✔
2003

2004

2005
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2006
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2007
{
10,174✔
2008
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,174✔
2009
                                std::move(config)}; // Throws
10,174✔
2010
}
10,174✔
2011

2012

2013
void Session::nonsync_transact_notify(version_type new_version)
2014
{
18,568✔
2015
    m_impl->on_commit(new_version); // Throws
18,568✔
2016
}
18,568✔
2017

2018

2019
void Session::cancel_reconnect_delay()
2020
{
28✔
2021
    m_impl->cancel_reconnect_delay(); // Throws
28✔
2022
}
28✔
2023

2024

2025
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2026
{
4,964✔
2027
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,964✔
2028
}
4,964✔
2029

2030

2031
bool Session::wait_for_upload_complete_or_client_stopped()
2032
{
12,912✔
2033
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,912✔
2034
}
12,912✔
2035

2036

2037
bool Session::wait_for_download_complete_or_client_stopped()
2038
{
10,004✔
2039
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,004✔
2040
}
10,004✔
2041

2042

2043
void Session::refresh(std::string_view signed_access_token)
2044
{
216✔
2045
    m_impl->refresh(signed_access_token); // Throws
216✔
2046
}
216✔
2047

2048

2049
void Session::abandon() noexcept
2050
{
10,174✔
2051
    REALM_ASSERT(m_impl);
10,174✔
2052
    // Reabsorb the ownership assigned to the applications naked pointer by
2053
    // Session constructor
2054
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,174✔
2055
    SessionWrapper::abandon(std::move(wrapper));
10,174✔
2056
}
10,174✔
2057

2058
util::Future<std::string> Session::send_test_command(std::string body)
2059
{
60✔
2060
    return m_impl->send_test_command(std::move(body));
60✔
2061
}
60✔
2062

2063
std::string Session::get_appservices_connection_id()
2064
{
72✔
2065
    return m_impl->get_appservices_connection_id();
72✔
2066
}
72✔
2067

2068
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2069
{
×
2070
    switch (proxyType) {
×
2071
        case ProxyConfig::Type::HTTP:
×
2072
            return os << "HTTP";
×
2073
        case ProxyConfig::Type::HTTPS:
×
2074
            return os << "HTTPS";
×
2075
    }
×
2076
    REALM_TERMINATE("Invalid Proxy Type object.");
2077
}
×
2078

2079
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc