• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jorgen.edelbo_402

21 Aug 2024 11:10AM UTC coverage: 91.054% (-0.03%) from 91.085%
jorgen.edelbo_402

Pull #7803

Evergreen

jedelbo
Small fix to Table::typed_write

When writing the realm to a new file from a write transaction,
the Table may be COW so that the top ref is changed. So don't
use the ref that is present in the group when the operation starts.
Pull Request #7803: Feature/string compression

103494 of 181580 branches covered (57.0%)

1929 of 1999 new or added lines in 46 files covered. (96.5%)

695 existing lines in 51 files now uncovered.

220142 of 241772 relevant lines covered (91.05%)

7344461.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.96
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    // Can be called from any thread.
108
    std::string get_appservices_connection_id();
109

110
    // Can be called from any thread, but inherently cannot be called
111
    // concurrently with calls to any of the other non-confined functions.
112
    bool mark_abandoned();
113

114
private:
115
    ClientImpl& m_client;
116
    DBRef m_db;
117
    Replication* m_replication;
118

119
    const ProtocolEnvelope m_protocol_envelope;
120
    const std::string m_server_address;
121
    const port_type m_server_port;
122
    const bool m_server_verified;
123
    const std::string m_user_id;
124
    const SyncServerMode m_sync_mode;
125
    const std::string m_authorization_header_name;
126
    const std::map<std::string, std::string> m_custom_http_headers;
127
    const bool m_verify_servers_ssl_certificate;
128
    const bool m_simulate_integration_error;
129
    const std::optional<std::string> m_ssl_trust_certificate_path;
130
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
131
    const size_t m_flx_bootstrap_batch_size_bytes;
132
    const std::string m_http_request_path_prefix;
133
    const std::string m_virt_path;
134
    const std::optional<ProxyConfig> m_proxy_config;
135

136
    // This one is different from null when, and only when the session wrapper
137
    // is in ClientImpl::m_abandoned_session_wrappers.
138
    SessionWrapper* m_next = nullptr;
139

140
    // These may only be accessed by the event loop thread.
141
    std::string m_signed_access_token;
142
    std::optional<ClientReset> m_client_reset_config;
143

144
    struct ReportedProgress {
145
        uint64_t snapshot;
146
        uint64_t uploaded;
147
        uint64_t uploadable;
148
        uint64_t downloaded;
149
        uint64_t downloadable;
150
        int64_t query_version = 0;
151
        double download_estimate;
152

153
        // Does not check snapshot
154
        bool operator==(const ReportedProgress& p) const noexcept
155
        {
24,632✔
156
            return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
24,632✔
157
                   downloadable == p.downloadable && query_version == p.query_version &&
24,632✔
158
                   download_estimate == p.download_estimate;
24,632✔
159
        }
24,632✔
160
    };
161
    std::optional<ReportedProgress> m_reported_progress;
162
    uint64_t m_final_uploaded = 0;
163
    uint64_t m_final_downloaded = 0;
164

165
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
166
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
167

168
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
169
    bool m_in_debug_hook = false;
170

171
    const SessionReason m_session_reason;
172

173
    // If false, QUERY and MARK messages are allowed but UPLOAD messages will not
174
    // be sent to the server.
175
    const bool m_allow_upload_messages;
176

177
    const uint64_t m_schema_version;
178

179
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
180
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
181
    std::shared_ptr<MigrationStore> m_migration_store;
182

183
    // Set to true when this session wrapper is actualized (i.e. the wrapped
184
    // session is created), or when the wrapper is finalized before actualization.
185
    // It is then never modified again.
186
    //
187
    // Actualization is scheduled during the construction of SessionWrapper, and
188
    // so a session specific post handler will always find that `m_actualized`
189
    // is true as the handler will always be run after the actualization job.
190
    // This holds even if the wrapper is finalized or closed before actualization.
191
    bool m_actualized = false;
192

193
    // Set to true when session deactivation is begun, either via force_close()
194
    // or finalize().
195
    bool m_closed = false;
196

197
    // Set to true in on_suspended() and then false in on_resumed(). Used to
198
    // suppress spurious connection state and error reporting while the session
199
    // is already in an error state.
200
    bool m_suspended = false;
201

202
    // Set when the session has been abandoned. After this point none of the
203
    // public API functions should be called again.
204
    bool m_abandoned = false;
205
    // Has the SessionWrapper been finalized?
206
    bool m_finalized = false;
207

208
    // Set to true when the first DOWNLOAD message is received to indicate that
209
    // the byte-level download progress parameters can be considered reasonable
210
    // reliable. Before that, a lot of time may have passed, so our record of
211
    // the download progress is likely completely out of date.
212
    bool m_reliable_download_progress = false;
213

214
    // Set to point to an activated session object during actualization of the
215
    // session wrapper. Set to null during finalization of the session
216
    // wrapper. Both modifications are guaranteed to be performed by the event
217
    // loop thread.
218
    //
219
    // If a session specific post handler, that is submitted after the
220
    // initiation of the session wrapper, sees that `m_sess` is null, it can
221
    // conclude that the session wrapper has either been force closed or has
222
    // been both abandoned and finalized.
223
    //
224
    // Must only be accessed from the event loop thread.
225
    SessionImpl* m_sess = nullptr;
226

227
    // These must only be accessed from the event loop thread.
228
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
229
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
230
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
231

232
    version_type m_upload_completion_requested_version = -1;
233

234
    void on_download_completion();
235
    void on_suspended(const SessionErrorInfo& error_info);
236
    void on_resumed();
237
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
238
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
239

240
    void init_progress_handler();
241
    void check_progress();
242
    void report_progress(ReportedProgress& p, DownloadableProgress downloadable);
243
    void report_upload_completion(version_type);
244

245
    friend class SessionWrapperStack;
246
    friend class ClientImpl::Session;
247
};
248

249

250
// ################ SessionWrapperStack ################
251

252
inline bool SessionWrapperStack::empty() const noexcept
253
{
19,932✔
254
    return !m_back;
19,932✔
255
}
19,932✔
256

257

258
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
259
{
20,888✔
260
    REALM_ASSERT(!w->m_next);
20,888✔
261
    w->m_next = m_back;
20,888✔
262
    m_back = w.release();
20,888✔
263
}
20,888✔
264

265

266
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
267
{
61,312✔
268
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
61,312✔
269
    if (m_back) {
61,312✔
270
        m_back = m_back->m_next;
20,816✔
271
        w->m_next = nullptr;
20,816✔
272
    }
20,816✔
273
    return w;
61,312✔
274
}
61,312✔
275

276

277
inline void SessionWrapperStack::clear() noexcept
278
{
19,932✔
279
    while (m_back) {
19,932✔
280
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
281
        m_back = w->m_next;
×
282
    }
×
283
}
19,932✔
284

285

286
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
287
{
10,478✔
288
    SessionWrapper** p = &m_back;
10,478✔
289
    while (*p && *p != w) {
10,660✔
290
        p = &(*p)->m_next;
182✔
291
    }
182✔
292
    if (!*p) {
10,478✔
293
        return false;
10,406✔
294
    }
10,406✔
295
    *p = w->m_next;
72✔
296
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
72✔
297
    return true;
72✔
298
}
10,478✔
299

300

301
SessionWrapperStack::~SessionWrapperStack()
302
{
19,932✔
303
    clear();
19,932✔
304
}
19,932✔
305

306

307
// ################ ClientImpl ################
308

309
ClientImpl::~ClientImpl()
310
{
9,966✔
311
    // Since no other thread is allowed to be accessing this client or any of
312
    // its subobjects at this time, no mutex locking is necessary.
313

314
    shutdown_and_wait();
9,966✔
315
    // Session wrappers are removed from m_unactualized_session_wrappers as they
316
    // are abandoned.
317
    REALM_ASSERT(m_stopped);
9,966✔
318
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,966✔
319
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
9,966✔
320
}
9,966✔
321

322

323
void ClientImpl::cancel_reconnect_delay()
324
{
1,896✔
325
    // Thread safety required
326
    post([this] {
1,896✔
327
        for (auto& p : m_server_slots) {
1,896✔
328
            ServerSlot& slot = p.second;
1,896✔
329
            if (m_one_connection_per_session) {
1,896✔
330
                REALM_ASSERT(!slot.connection);
×
331
                for (const auto& p : slot.alt_connections) {
×
332
                    ClientImpl::Connection& conn = *p.second;
×
333
                    conn.resume_active_sessions(); // Throws
×
334
                    conn.cancel_reconnect_delay(); // Throws
×
335
                }
×
336
            }
×
337
            else {
1,896✔
338
                REALM_ASSERT(slot.alt_connections.empty());
1,896✔
339
                if (slot.connection) {
1,896✔
340
                    ClientImpl::Connection& conn = *slot.connection;
1,892✔
341
                    conn.resume_active_sessions(); // Throws
1,892✔
342
                    conn.cancel_reconnect_delay(); // Throws
1,892✔
343
                }
1,892✔
344
                else {
4✔
345
                    slot.reconnect_info.reset();
4✔
346
                }
4✔
347
            }
1,896✔
348
        }
1,896✔
349
    }); // Throws
1,896✔
350
}
1,896✔
351

352

353
void ClientImpl::voluntary_disconnect_all_connections()
354
{
12✔
355
    auto done_pf = util::make_promise_future<void>();
12✔
356
    post([this, promise = std::move(done_pf.promise)]() mutable {
12✔
357
        try {
12✔
358
            for (auto& p : m_server_slots) {
12✔
359
                ServerSlot& slot = p.second;
12✔
360
                if (m_one_connection_per_session) {
12✔
361
                    REALM_ASSERT(!slot.connection);
×
362
                    for (const auto& [_, conn] : slot.alt_connections) {
×
363
                        conn->voluntary_disconnect();
×
364
                    }
×
365
                }
×
366
                else {
12✔
367
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
368
                    if (slot.connection) {
12✔
369
                        slot.connection->voluntary_disconnect();
12✔
370
                    }
12✔
371
                }
12✔
372
            }
12✔
373
        }
12✔
374
        catch (...) {
12✔
375
            promise.set_error(exception_to_status());
×
376
            return;
×
377
        }
×
378
        promise.emplace_value();
12✔
379
    });
12✔
380
    done_pf.future.get();
12✔
381
}
12✔
382

383

384
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
385
{
9,636✔
386
    // Thread safety required
387

388
    {
9,636✔
389
        util::CheckedLockGuard lock{m_mutex};
9,636✔
390
        m_sessions_terminated = false;
9,636✔
391
    }
9,636✔
392

393
    // The technique employed here relies on the fact that
394
    // actualize_and_finalize_session_wrappers() must get to execute at least
395
    // once before the post handler submitted below gets to execute, but still
396
    // at a time where all session wrappers, that are abandoned prior to the
397
    // execution of wait_for_session_terminations_or_client_stopped(), have been
398
    // added to `m_abandoned_session_wrappers`.
399
    //
400
    // To see that this is the case, consider a session wrapper that was
401
    // abandoned before wait_for_session_terminations_or_client_stopped() was
402
    // invoked. Then the session wrapper will have been added to
403
    // `m_abandoned_session_wrappers`, and an invocation of
404
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
405
    // guarantees mentioned in the documentation of Trigger then ensure
406
    // that at least one execution of actualize_and_finalize_session_wrappers()
407
    // will happen after the session wrapper has been added to
408
    // `m_abandoned_session_wrappers`, but before the post handler submitted
409
    // below gets to execute.
410
    post([this] {
9,636✔
411
        {
9,636✔
412
            util::CheckedLockGuard lock{m_mutex};
9,636✔
413
            m_sessions_terminated = true;
9,636✔
414
        }
9,636✔
415
        m_wait_or_client_stopped_cond.notify_all();
9,636✔
416
    }); // Throws
9,636✔
417

418
    bool completion_condition_was_satisfied;
9,636✔
419
    {
9,636✔
420
        util::CheckedUniqueLock lock{m_mutex};
9,636✔
421
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
19,272✔
422
            return m_sessions_terminated || m_stopped;
19,272✔
423
        });
19,272✔
424
        completion_condition_was_satisfied = !m_stopped;
9,636✔
425
    }
9,636✔
426
    return completion_condition_was_satisfied;
9,636✔
427
}
9,636✔
428

429

430
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
431
util::Future<void> ClientImpl::notify_session_terminated()
432
{
56✔
433
    auto pf = util::make_promise_future<void>();
56✔
434
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
435
        // Includes operation_aborted
436
        if (!status.is_ok()) {
56✔
437
            promise.set_error(status);
×
438
            return;
×
439
        }
×
440

441
        promise.emplace_value();
56✔
442
    });
56✔
443

444
    return std::move(pf.future);
56✔
445
}
56✔
446

447
void ClientImpl::drain_connections_on_loop()
448
{
9,966✔
449
    post([this](Status status) {
9,966✔
450
        REALM_ASSERT(status.is_ok());
9,966✔
451
        drain_connections();
9,966✔
452
    });
9,966✔
453
}
9,966✔
454

455
void ClientImpl::shutdown_and_wait()
456
{
10,738✔
457
    shutdown();
10,738✔
458
    util::CheckedUniqueLock lock{m_drain_mutex};
10,738✔
459
    if (m_drained) {
10,738✔
460
        return;
772✔
461
    }
772✔
462

463
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,966✔
464
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
15,902✔
465
        return m_num_connections == 0 && m_outstanding_posts == 0;
15,902✔
466
    });
15,902✔
467

468
    m_drained = true;
9,966✔
469
}
9,966✔
470

471
void ClientImpl::shutdown() noexcept
472
{
20,786✔
473
    {
20,786✔
474
        util::CheckedLockGuard lock{m_mutex};
20,786✔
475
        if (m_stopped)
20,786✔
476
            return;
10,820✔
477
        m_stopped = true;
9,966✔
478
    }
9,966✔
479
    m_wait_or_client_stopped_cond.notify_all();
×
480

481
    drain_connections_on_loop();
9,966✔
482
}
9,966✔
483

484

485
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
486
{
10,482✔
487
    // Thread safety required.
488
    {
10,482✔
489
        util::CheckedLockGuard lock{m_mutex};
10,482✔
490
        // We can't actualize the session if we've already been stopped, so
491
        // just finalize it immediately.
492
        if (m_stopped) {
10,482✔
493
            wrapper->finalize_before_actualization();
×
494
            return;
×
495
        }
×
496

497
        REALM_ASSERT(m_actualize_and_finalize);
10,482✔
498
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
10,482✔
499
    }
10,482✔
500
    m_actualize_and_finalize->trigger();
×
501
}
10,482✔
502

503

504
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
505
{
10,480✔
506
    // Thread safety required.
507
    {
10,480✔
508
        util::CheckedLockGuard lock{m_mutex};
10,480✔
509
        REALM_ASSERT(m_actualize_and_finalize);
10,480✔
510
        // The wrapper may have already been finalized before being abandoned
511
        // if we were stopped when it was created.
512
        if (wrapper->mark_abandoned())
10,480✔
513
            return;
4✔
514

515
        // If the session wrapper has not yet been actualized (on the event loop
516
        // thread), it can be immediately finalized. This ensures that we will
517
        // generally not actualize a session wrapper that has already been
518
        // abandoned.
519
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
10,476✔
520
            wrapper->finalize_before_actualization();
72✔
521
            return;
72✔
522
        }
72✔
523
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,404✔
524
    }
10,404✔
525
    m_actualize_and_finalize->trigger();
×
526
}
10,404✔
527

528

529
// Must be called from the event loop thread.
530
void ClientImpl::actualize_and_finalize_session_wrappers()
531
{
15,052✔
532
    // We need to pop from the wrapper stacks while holding the lock to ensure
533
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
534
    // release the lock before finalizing or actualizing because those functions
535
    // invoke user callbacks which may try to access the client and reacquire
536
    // the lock.
537
    //
538
    // Finalization must always happen before actualization because we may be
539
    // finalizing and actualizing sessions for the same Realm file, and
540
    // actualizing first would result in overlapping sessions. Because we're
541
    // releasing the lock new sessions may come in as we're looping, so we need
542
    // a single loop that checks both fields.
543
    while (true) {
35,866✔
544
        bool finalize = true;
35,862✔
545
        bool stopped;
35,862✔
546
        util::bind_ptr<SessionWrapper> wrapper;
35,862✔
547
        {
35,862✔
548
            util::CheckedLockGuard lock{m_mutex};
35,862✔
549
            wrapper = m_abandoned_session_wrappers.pop();
35,862✔
550
            if (!wrapper) {
35,862✔
551
                wrapper = m_unactualized_session_wrappers.pop();
25,456✔
552
                finalize = false;
25,456✔
553
            }
25,456✔
554
            stopped = m_stopped;
35,862✔
555
        }
35,862✔
556
        if (!wrapper)
35,862✔
557
            break;
15,048✔
558
        if (finalize)
20,814✔
559
            wrapper->finalize(); // Throws
10,406✔
560
        else if (stopped)
10,408✔
561
            wrapper->finalize_before_actualization();
4✔
562
        else
10,404✔
563
            wrapper->actualize(); // Throws
10,404✔
564
    }
20,814✔
565
}
15,052✔
566

567

568
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
569
                                                   const std::string& authorization_header_name,
570
                                                   const std::map<std::string, std::string>& custom_http_headers,
571
                                                   bool verify_servers_ssl_certificate,
572
                                                   Optional<std::string> ssl_trust_certificate_path,
573
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
574
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
575
{
10,398✔
576
    auto&& [server_slot_it, inserted] =
10,398✔
577
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,398✔
578
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,398✔
579

580
    // TODO: enable multiplexing with proxies
581
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,398✔
582
        // Use preexisting connection
583
        REALM_ASSERT(server_slot.alt_connections.empty());
7,564✔
584
        return *server_slot.connection;
7,564✔
585
    }
7,564✔
586

587
    // Create a new connection
588
    REALM_ASSERT(!server_slot.connection);
2,834✔
589
    connection_ident_type ident = m_prev_connection_ident + 1;
2,834✔
590
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,834✔
591
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,834✔
592
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,834✔
593
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,834✔
594
    ClientImpl::Connection& conn = *conn_2;
2,834✔
595
    if (!m_one_connection_per_session) {
2,834✔
596
        server_slot.connection = std::move(conn_2);
2,824✔
597
    }
2,824✔
598
    else {
10✔
599
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
10✔
600
    }
10✔
601
    m_prev_connection_ident = ident;
2,834✔
602
    was_created = true;
2,834✔
603
    {
2,834✔
604
        util::CheckedLockGuard lk(m_drain_mutex);
2,834✔
605
        ++m_num_connections;
2,834✔
606
    }
2,834✔
607
    return conn;
2,834✔
608
}
10,398✔
609

610

611
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
612
{
2,836✔
613
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,836✔
614
    auto i = m_server_slots.find(endpoint);
2,836✔
615
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,836✔
616
    ServerSlot& server_slot = i->second;
2,836✔
617
    if (!m_one_connection_per_session) {
2,836✔
618
        REALM_ASSERT(server_slot.alt_connections.empty());
2,826✔
619
        REALM_ASSERT(&*server_slot.connection == &conn);
2,826✔
620
        server_slot.reconnect_info = conn.get_reconnect_info();
2,826✔
621
        server_slot.connection.reset();
2,826✔
622
    }
2,826✔
623
    else {
10✔
624
        REALM_ASSERT(!server_slot.connection);
10✔
625
        connection_ident_type ident = conn.get_ident();
10✔
626
        auto j = server_slot.alt_connections.find(ident);
10✔
627
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
10✔
628
        REALM_ASSERT(&*j->second == &conn);
10✔
629
        server_slot.alt_connections.erase(j);
10✔
630
    }
10✔
631

632
    bool notify;
2,836✔
633
    {
2,836✔
634
        util::CheckedLockGuard lk(m_drain_mutex);
2,836✔
635
        REALM_ASSERT(m_num_connections);
2,836✔
636
        notify = --m_num_connections <= 0;
2,836✔
637
    }
2,836✔
638
    if (notify) {
2,836✔
639
        m_drain_cv.notify_all();
2,214✔
640
    }
2,214✔
641
}
2,836✔
642

643

644
// ################ SessionImpl ################
645

646
void SessionImpl::force_close()
647
{
102✔
648
    // Allow force_close() if session is active or hasn't been activated yet.
649
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102!
650
        m_wrapper.force_close();
102✔
651
    }
102✔
652
}
102✔
653

654
void SessionImpl::on_connection_state_changed(ConnectionState state,
655
                                              const std::optional<SessionErrorInfo>& error_info)
656
{
12,074✔
657
    // Only used to report errors back to the SyncSession while the Session is active
658
    if (m_state == SessionImpl::Active) {
12,074✔
659
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
12,074✔
660
    }
12,074✔
661
}
12,074✔
662

663

664
const std::string& SessionImpl::get_virt_path() const noexcept
665
{
7,326✔
666
    // Can only be called if the session is active or being activated
667
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,326!
668
    return m_wrapper.m_virt_path;
7,326✔
669
}
7,326✔
670

671
const std::string& SessionImpl::get_realm_path() const noexcept
672
{
10,746✔
673
    // Can only be called if the session is active or being activated
674
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,746✔
675
    return m_wrapper.m_db->get_path();
10,746✔
676
}
10,746✔
677

678
DBRef SessionImpl::get_db() const noexcept
679
{
25,588✔
680
    // Can only be called if the session is active or being activated
681
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
25,588!
682
    return m_wrapper.m_db;
25,588✔
683
}
25,588✔
684

685
ClientReplication& SessionImpl::get_repl() const noexcept
686
{
123,324✔
687
    // Can only be called if the session is active or being activated
688
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
123,324✔
689
    return m_wrapper.get_replication();
123,324✔
690
}
123,324✔
691

692
ClientHistory& SessionImpl::get_history() const noexcept
693
{
120,900✔
694
    return get_repl().get_history();
120,900✔
695
}
120,900✔
696

697
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
698
{
105,340✔
699
    // Can only be called if the session is active or being activated
700
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
105,340✔
701
    return m_wrapper.m_client_reset_config;
105,340✔
702
}
105,340✔
703

704
SessionReason SessionImpl::get_session_reason() noexcept
705
{
1,888✔
706
    // Can only be called if the session is active or being activated
707
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,888!
708
    return m_wrapper.m_session_reason;
1,888✔
709
}
1,888✔
710

711
uint64_t SessionImpl::get_schema_version() noexcept
712
{
1,888✔
713
    // Can only be called if the session is active or being activated
714
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,888!
715
    return m_wrapper.m_schema_version;
1,888✔
716
}
1,888✔
717

718
bool SessionImpl::upload_messages_allowed() noexcept
719
{
67,298✔
720
    // Can only be called if the session is active or being activated
721
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
67,298✔
722
    return m_wrapper.m_allow_upload_messages;
67,298✔
723
}
67,298✔
724

725
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
726
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
727
{
44,136✔
728
    // Ignore the call if the session is not active
729
    if (m_state != State::Active) {
44,136✔
730
        return;
×
731
    }
×
732

733
    try {
44,136✔
734
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
44,136!
735
        if (simulate_integration_error) {
44,136✔
736
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
737
        }
×
738
        version_type client_version;
44,136✔
739
        if (REALM_LIKELY(!get_client().is_dry_run())) {
44,136✔
740
            VersionInfo version_info;
44,136✔
741
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
44,136✔
742
            client_version = version_info.realm_version;
44,136✔
743
        }
44,136✔
UNCOV
744
        else {
×
745
            // Fake it for "dry run" mode
UNCOV
746
            client_version = m_last_version_available + 1;
×
UNCOV
747
        }
×
748
        on_changesets_integrated(client_version, progress); // Throws
44,136✔
749
    }
44,136✔
750
    catch (const IntegrationException& e) {
44,136✔
751
        on_integration_failure(e);
24✔
752
    }
24✔
753
}
44,136✔
754

755

756
void SessionImpl::on_download_completion()
757
{
16,834✔
758
    // Ignore the call if the session is not active
759
    if (m_state == State::Active) {
16,834✔
760
        m_wrapper.on_download_completion(); // Throws
16,834✔
761
    }
16,834✔
762
}
16,834✔
763

764

765
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
766
{
830✔
767
    // Ignore the call if the session is not active
768
    if (m_state == State::Active) {
830✔
769
        m_wrapper.on_suspended(error_info); // Throws
830✔
770
    }
830✔
771
}
830✔
772

773

774
void SessionImpl::on_resumed()
775
{
170✔
776
    // Ignore the call if the session is not active
777
    if (m_state == State::Active) {
170✔
778
        m_wrapper.on_resumed(); // Throws
170✔
779
    }
170✔
780
}
170✔
781

782
void SessionImpl::handle_pending_client_reset_acknowledgement()
783
{
10,744✔
784
    // Ignore the call if the session is not active
785
    if (m_state == State::Active) {
10,744✔
786
        m_wrapper.handle_pending_client_reset_acknowledgement();
10,744✔
787
    }
10,744✔
788
}
10,744✔
789

790
bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
791
{
48,090✔
792
    // Ignore the message if the session is not active or a steady state message
793
    if (m_state != State::Active || message.batch_state == DownloadBatchState::SteadyState) {
48,090✔
794
        return false;
44,136✔
795
    }
44,136✔
796

797
    REALM_ASSERT(m_is_flx_sync_session);
3,954✔
798

799
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,954✔
800
    std::optional<SyncProgress> maybe_progress;
3,954✔
801
    if (message.batch_state == DownloadBatchState::LastInBatch) {
3,954✔
802
        maybe_progress = message.progress;
2,404✔
803
    }
2,404✔
804

805
    try {
3,954✔
806
        bootstrap_store->add_batch(*message.query_version, maybe_progress, message.downloadable, message.changesets);
3,954✔
807
    }
3,954✔
808
    catch (const LogicError& ex) {
3,954✔
809
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
810
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
811
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
812
                                    ProtocolError::bad_changeset_size);
×
813
            on_integration_failure(ex);
×
814
            return true;
×
815
        }
×
816
        throw;
×
817
    }
×
818

819
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
3,954✔
820
                                       *message.query_version, message.batch_state, message.changesets.size());
3,954✔
821
    if (hook_action == SyncClientHookAction::EarlyReturn) {
3,954✔
822
        return true;
12✔
823
    }
12✔
824
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
3,942✔
825

826
    if (message.batch_state == DownloadBatchState::MoreToCome) {
3,942✔
827
        return true;
1,546✔
828
    }
1,546✔
829

830
    try {
2,396✔
831
        process_pending_flx_bootstrap(); // throws
2,396✔
832
    }
2,396✔
833
    catch (const IntegrationException& e) {
2,396✔
834
        on_integration_failure(e);
12✔
835
    }
12✔
836
    catch (...) {
2,396✔
837
        on_integration_failure(IntegrationException(exception_to_status()));
×
838
    }
×
839

840
    return true;
2,396✔
841
}
2,396✔
842

843

844
void SessionImpl::process_pending_flx_bootstrap()
845
{
12,966✔
846
    // Ignore the call if not a flx session or session is not active
847
    if (!m_is_flx_sync_session || m_state != State::Active) {
12,966✔
848
        return;
8,608✔
849
    }
8,608✔
850
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
4,358✔
851
    if (!bootstrap_store->has_pending()) {
4,358✔
852
        return;
1,928✔
853
    }
1,928✔
854

855
    auto pending_batch_stats = bootstrap_store->pending_stats();
2,430✔
856
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
2,430✔
857
                "changeset size: %3)",
2,430✔
858
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
2,430✔
859
                pending_batch_stats.pending_changeset_bytes);
2,430✔
860
    auto& history = get_repl().get_history();
2,430✔
861
    VersionInfo new_version;
2,430✔
862
    SyncProgress progress;
2,430✔
863
    int64_t query_version = -1;
2,430✔
864
    size_t changesets_processed = 0;
2,430✔
865

866
    // Used to commit each batch after it was transformed.
867
    TransactionRef transact = get_db()->start_write();
2,430✔
868
    while (bootstrap_store->has_pending()) {
5,142✔
869
        auto start_time = std::chrono::steady_clock::now();
2,736✔
870
        auto pending_batch = bootstrap_store->peek_pending(*transact, m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,736✔
871
        if (!pending_batch.progress) {
2,736✔
872
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
20✔
873
            bootstrap_store->clear(*transact, pending_batch.query_version);
20✔
874
            transact->commit();
20✔
875
            return;
20✔
876
        }
20✔
877

878
        auto batch_state =
2,716✔
879
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,716✔
880
        query_version = pending_batch.query_version;
2,716✔
881
        bool simulate_integration_error =
2,716✔
882
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,716✔
883
        if (simulate_integration_error) {
2,716✔
884
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
885
        }
4✔
886

887
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,712✔
888
                        batch_state, pending_batch.changesets.size());
2,712✔
889

890
        history.integrate_server_changesets(
2,712✔
891
            *pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
2,712✔
892
            [&](const Transaction& tr, util::Span<Changeset> changesets_applied) {
2,712✔
893
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,700✔
894
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,700✔
895
            });
2,700✔
896
        progress = *pending_batch.progress;
2,712✔
897
        changesets_processed += pending_batch.changesets.size();
2,712✔
898
        auto duration = std::chrono::steady_clock::now() - start_time;
2,712✔
899

900
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,712✔
901
                                      batch_state, pending_batch.changesets.size());
2,712✔
902
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,712✔
903

904
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,712✔
905
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,712✔
906
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
2,712✔
907
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
2,712✔
908
                    pending_batch.remaining_changesets);
2,712✔
909
    }
2,712✔
910

911
    REALM_ASSERT_3(query_version, !=, -1);
2,406✔
912

913
    on_changesets_integrated(new_version.realm_version, progress);
2,406✔
914
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
2,406✔
915
                                  DownloadBatchState::LastInBatch, changesets_processed);
2,406✔
916
    // NoAction/EarlyReturn are both valid no-op actions to take here.
917
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
2,406✔
918
}
2,406✔
919

920
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
921
{
20✔
922
    // Ignore the call if the session is not active
923
    if (m_state == State::Active) {
20✔
924
        get_flx_subscription_store()->set_error(version, err_msg);
20✔
925
    }
20✔
926
}
20✔
927

928
SubscriptionStore* SessionImpl::get_flx_subscription_store()
929
{
25,612✔
930
    // Should never be called if session is not active
931
    REALM_ASSERT_EX(m_state == State::Active, m_state);
25,612✔
932
    return m_wrapper.get_flx_subscription_store();
25,612✔
933
}
25,612✔
934

935
MigrationStore* SessionImpl::get_migration_store()
936
{
73,650✔
937
    // Should never be called if session is not active
938
    REALM_ASSERT_EX(m_state == State::Active, m_state);
73,650✔
939
    return m_wrapper.get_migration_store();
73,650✔
940
}
73,650✔
941

942
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
943
{
15,642✔
944
    // Should never be called if session is not active
945
    REALM_ASSERT_EX(m_state == State::Active, m_state);
15,642✔
946

947
    // Make sure we don't call the debug hook recursively.
948
    if (m_wrapper.m_in_debug_hook) {
15,642✔
949
        return SyncClientHookAction::NoAction;
24✔
950
    }
24✔
951
    m_wrapper.m_in_debug_hook = true;
15,618✔
952
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
15,618✔
953
        m_wrapper.m_in_debug_hook = false;
15,618✔
954
    });
15,618✔
955

956
    auto action = m_wrapper.m_debug_hook(data);
15,618✔
957
    switch (action) {
15,618✔
958
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
959
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
960
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
961

962
            auto err_processing_err = receive_error_message(err_info);
12✔
963
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
964
            return SyncClientHookAction::EarlyReturn;
12✔
965
        }
×
966
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
967
            get_connection().voluntary_disconnect();
24✔
968
            return SyncClientHookAction::EarlyReturn;
24✔
969
        }
×
970
        default:
15,574✔
971
            return action;
15,574✔
972
    }
15,618✔
973
}
15,618✔
974

975
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
976
                                                  int64_t query_version, DownloadBatchState batch_state,
977
                                                  size_t num_changesets)
978
{
104,004✔
979
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
104,004✔
980
        return SyncClientHookAction::NoAction;
95,086✔
981
    }
95,086✔
982
    if (REALM_UNLIKELY(m_state != State::Active)) {
8,918✔
983
        return SyncClientHookAction::NoAction;
×
984
    }
×
985

986
    SyncClientHookData data;
8,918✔
987
    data.event = event;
8,918✔
988
    data.batch_state = batch_state;
8,918✔
989
    data.progress = progress;
8,918✔
990
    data.num_changesets = num_changesets;
8,918✔
991
    data.query_version = query_version;
8,918✔
992

993
    return call_debug_hook(data);
8,918✔
994
}
8,918✔
995

996
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo* error_info)
997
{
97,940✔
998
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
97,940✔
999
        return SyncClientHookAction::NoAction;
91,212✔
1000
    }
91,212✔
1001
    if (REALM_UNLIKELY(m_state != State::Active)) {
6,728✔
1002
        return SyncClientHookAction::NoAction;
×
1003
    }
×
1004

1005
    SyncClientHookData data;
6,728✔
1006
    data.event = event;
6,728✔
1007
    data.batch_state = DownloadBatchState::SteadyState;
6,728✔
1008
    data.progress = m_progress;
6,728✔
1009
    data.num_changesets = 0;
6,728✔
1010
    data.query_version = m_last_sent_flx_query_version;
6,728✔
1011
    data.error_info = error_info;
6,728✔
1012

1013
    return call_debug_hook(data);
6,728✔
1014
}
6,728✔
1015

1016
void SessionImpl::init_progress_handler()
1017
{
10,746✔
1018
    REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
10,746✔
1019
    m_wrapper.init_progress_handler();
10,746✔
1020
}
10,746✔
1021

1022
void SessionImpl::enable_progress_notifications()
1023
{
46,154✔
1024
    m_wrapper.m_reliable_download_progress = true;
46,154✔
1025
}
46,154✔
1026

1027
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1028
{
72✔
1029
    if (m_state != State::Active) {
72✔
1030
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1031
    }
×
1032

1033
    try {
72✔
1034
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
72✔
1035
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
72✔
1036
            return Status{ErrorCodes::LogicError,
4✔
1037
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1038
        }
4✔
1039
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
68✔
1040
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1041
        }
×
1042
    }
68✔
1043
    catch (const nlohmann::json::parse_error& e) {
72✔
1044
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1045
    }
4✔
1046

1047
    auto pf = util::make_promise_future<std::string>();
64✔
1048
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
64✔
1049
        // Includes operation_aborted
1050
        if (!status.is_ok()) {
64✔
1051
            promise.set_error(status);
×
1052
            return;
×
1053
        }
×
1054

1055
        auto id = ++m_last_pending_test_command_ident;
64✔
1056
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
64✔
1057
        ensure_enlisted_to_send();
64✔
1058
    });
64✔
1059

1060
    return std::move(pf.future);
64✔
1061
}
72✔
1062

1063
// ################ SessionWrapper ################
1064

1065
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1066
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1067
// the ClientImpl::Connection that owns the ClientImpl::Session.
1068
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1069
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
1070
    : m_client{client}
5,070✔
1071
    , m_db(std::move(db))
5,070✔
1072
    , m_replication(m_db->get_replication())
5,070✔
1073
    , m_protocol_envelope{config.protocol_envelope}
5,070✔
1074
    , m_server_address{std::move(config.server_address)}
5,070✔
1075
    , m_server_port{config.server_port}
5,070✔
1076
    , m_server_verified{config.server_verified}
5,070✔
1077
    , m_user_id(std::move(config.user_id))
5,070✔
1078
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
5,070✔
1079
    , m_authorization_header_name{config.authorization_header_name}
5,070✔
1080
    , m_custom_http_headers{std::move(config.custom_http_headers)}
5,070✔
1081
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
5,070✔
1082
    , m_simulate_integration_error{config.simulate_integration_error}
5,070✔
1083
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
5,070✔
1084
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
5,070✔
1085
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
5,070✔
1086
    , m_http_request_path_prefix{std::move(config.service_identifier)}
5,070✔
1087
    , m_virt_path{std::move(config.realm_identifier)}
5,070✔
1088
    , m_proxy_config{std::move(config.proxy_config)}
5,070✔
1089
    , m_signed_access_token{std::move(config.signed_user_token)}
5,070✔
1090
    , m_client_reset_config{std::move(config.client_reset_config)}
5,070✔
1091
    , m_progress_handler(std::move(config.progress_handler))
5,070✔
1092
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
5,070✔
1093
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
5,070✔
1094
    , m_session_reason(m_client_reset_config || config.fresh_realm_download ? SessionReason::ClientReset
5,070✔
1095
                                                                            : SessionReason::Sync)
5,070✔
1096
    , m_allow_upload_messages(!config.fresh_realm_download)
5,070✔
1097
    , m_schema_version(config.schema_version)
5,070✔
1098
    , m_flx_subscription_store(std::move(flx_sub_store))
5,070✔
1099
    , m_migration_store(std::move(migration_store))
5,070✔
1100
{
10,482✔
1101
    REALM_ASSERT(m_db);
10,482✔
1102
    REALM_ASSERT(m_db->get_replication());
10,482✔
1103
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,482✔
1104

1105
    // SessionWrapper begins at +1 retain count because Client retains and
1106
    // releases it while performing async operations, and these need to not
1107
    // take it to 0 or it could be deleted before the caller can retain it.
1108
    bind_ptr();
10,482✔
1109
    m_client.register_unactualized_session_wrapper(this);
10,482✔
1110
}
10,482✔
1111

1112
SessionWrapper::~SessionWrapper() noexcept
1113
{
10,482✔
1114
    // We begin actualization in the constructor and do not delete the wrapper
1115
    // until both the Client is done with it and the Session has abandoned it,
1116
    // so at this point we must have actualized, finalized, and been abandoned.
1117
    REALM_ASSERT(m_actualized);
10,482✔
1118
    REALM_ASSERT(m_abandoned);
10,482✔
1119
    REALM_ASSERT(m_finalized);
10,482✔
1120
    REALM_ASSERT(m_closed);
10,482✔
1121
    REALM_ASSERT(!m_db);
10,482✔
1122
}
10,482✔
1123

1124

1125
inline ClientReplication& SessionWrapper::get_replication() noexcept
1126
{
123,328✔
1127
    REALM_ASSERT(m_db);
123,328✔
1128
    return static_cast<ClientReplication&>(*m_replication);
123,328✔
1129
}
123,328✔
1130

1131

1132
inline ClientImpl& SessionWrapper::get_client() noexcept
1133
{
×
1134
    return m_client;
×
1135
}
×
1136

1137
bool SessionWrapper::has_flx_subscription_store() const
1138
{
×
1139
    return static_cast<bool>(m_flx_subscription_store);
×
1140
}
×
1141

1142
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1143
{
25,612✔
1144
    REALM_ASSERT(!m_finalized);
25,612✔
1145
    return m_flx_subscription_store.get();
25,612✔
1146
}
25,612✔
1147

1148
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1149
{
8,310✔
1150
    REALM_ASSERT(!m_finalized);
8,310✔
1151
    return m_flx_pending_bootstrap_store.get();
8,310✔
1152
}
8,310✔
1153

1154
MigrationStore* SessionWrapper::get_migration_store()
1155
{
73,650✔
1156
    REALM_ASSERT(!m_finalized);
73,650✔
1157
    return m_migration_store.get();
73,650✔
1158
}
73,650✔
1159

1160
inline bool SessionWrapper::mark_abandoned()
1161
{
10,482✔
1162
    REALM_ASSERT(!m_abandoned);
10,482✔
1163
    m_abandoned = true;
10,482✔
1164
    return m_finalized;
10,482✔
1165
}
10,482✔
1166

1167

1168
void SessionWrapper::on_commit(version_type new_version)
1169
{
117,814✔
1170
    // Thread safety required
1171
    m_client.post([self = util::bind_ptr{this}, new_version] {
117,814✔
1172
        REALM_ASSERT(self->m_actualized);
117,810✔
1173
        if (REALM_UNLIKELY(!self->m_sess))
117,810✔
1174
            return; // Already finalized
390✔
1175
        SessionImpl& sess = *self->m_sess;
117,420✔
1176
        sess.recognize_sync_version(new_version); // Throws
117,420✔
1177
        self->check_progress();                   // Throws
117,420✔
1178
    });
117,420✔
1179
}
117,814✔
1180

1181

1182
void SessionWrapper::cancel_reconnect_delay()
1183
{
32✔
1184
    // Thread safety required
1185

1186
    m_client.post([self = util::bind_ptr{this}] {
32✔
1187
        REALM_ASSERT(self->m_actualized);
32✔
1188
        if (REALM_UNLIKELY(self->m_closed)) {
32✔
1189
            return;
×
1190
        }
×
1191

1192
        if (REALM_UNLIKELY(!self->m_sess))
32✔
1193
            return; // Already finalized
×
1194
        SessionImpl& sess = *self->m_sess;
32✔
1195
        sess.cancel_resumption_delay(); // Throws
32✔
1196
        ClientImpl::Connection& conn = sess.get_connection();
32✔
1197
        conn.cancel_reconnect_delay(); // Throws
32✔
1198
    });                                // Throws
32✔
1199
}
32✔
1200

1201
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1202
                                    WaitOperCompletionHandler handler)
1203
{
28,720✔
1204
    REALM_ASSERT(upload_completion || download_completion);
28,720✔
1205

1206
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
28,720✔
1207
                   download_completion](Status status) mutable {
28,722✔
1208
        REALM_ASSERT(self->m_actualized);
28,722✔
1209
        if (!status.is_ok()) {
28,722✔
1210
            handler(status); // Throws
×
1211
            return;
×
1212
        }
×
1213
        if (REALM_UNLIKELY(!self->m_sess)) {
28,722✔
1214
            // Already finalized
1215
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
186✔
1216
            return;
186✔
1217
        }
186✔
1218
        if (upload_completion) {
28,536✔
1219
            self->m_upload_completion_requested_version = self->m_db->get_version_of_latest_snapshot();
15,654✔
1220
            if (download_completion) {
15,654✔
1221
                // Wait for upload and download completion
1222
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
362✔
1223
            }
362✔
1224
            else {
15,292✔
1225
                // Wait for upload completion only
1226
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
15,292✔
1227
            }
15,292✔
1228
        }
15,654✔
1229
        else {
12,882✔
1230
            // Wait for download completion only
1231
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
12,882✔
1232
        }
12,882✔
1233
        SessionImpl& sess = *self->m_sess;
28,536✔
1234
        if (upload_completion)
28,536✔
1235
            self->check_progress();
15,654✔
1236
        if (download_completion)
28,536✔
1237
            sess.request_download_completion_notification(); // Throws
13,244✔
1238
    });                                                      // Throws
28,536✔
1239
}
28,720✔
1240

1241

1242
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1243
{
12,912✔
1244
    // Thread safety required
1245
    REALM_ASSERT(!m_abandoned);
12,912✔
1246

1247
    auto pf = util::make_promise_future<bool>();
12,912✔
1248
    async_wait_for(true, false, [promise = std::move(pf.promise)](Status status) mutable {
12,912✔
1249
        promise.emplace_value(status.is_ok());
12,912✔
1250
    });
12,912✔
1251
    return pf.future.get();
12,912✔
1252
}
12,912✔
1253

1254

1255
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1256
{
10,004✔
1257
    // Thread safety required
1258
    REALM_ASSERT(!m_abandoned);
10,004✔
1259

1260
    auto pf = util::make_promise_future<bool>();
10,004✔
1261
    async_wait_for(false, true, [promise = std::move(pf.promise)](Status status) mutable {
10,004✔
1262
        promise.emplace_value(status.is_ok());
10,004✔
1263
    });
10,004✔
1264
    return pf.future.get();
10,004✔
1265
}
10,004✔
1266

1267

1268
void SessionWrapper::refresh(std::string_view signed_access_token)
1269
{
218✔
1270
    // Thread safety required
1271
    REALM_ASSERT(!m_abandoned);
218✔
1272

1273
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
218✔
1274
        REALM_ASSERT(self->m_actualized);
218✔
1275
        if (REALM_UNLIKELY(!self->m_sess))
218✔
1276
            return; // Already finalized
×
1277
        self->m_signed_access_token = std::move(token);
218✔
1278
        SessionImpl& sess = *self->m_sess;
218✔
1279
        ClientImpl::Connection& conn = sess.get_connection();
218✔
1280
        // FIXME: This only makes sense when each session uses a separate connection.
1281
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
218✔
1282
        sess.cancel_resumption_delay();                                                          // Throws
218✔
1283
        conn.cancel_reconnect_delay();                                                           // Throws
218✔
1284
    });
218✔
1285
}
218✔
1286

1287

1288
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1289
{
10,482✔
1290
    ClientImpl& client = wrapper->m_client;
10,482✔
1291
    client.register_abandoned_session_wrapper(std::move(wrapper));
10,482✔
1292
}
10,482✔
1293

1294

1295
// Must be called from event loop thread
1296
void SessionWrapper::actualize()
1297
{
10,406✔
1298
    // actualize() can only ever be called once
1299
    REALM_ASSERT(!m_actualized);
10,406✔
1300
    REALM_ASSERT(!m_sess);
10,406✔
1301
    // The client should have removed this wrapper from those pending
1302
    // actualization if it called force_close() or finalize_before_actualize()
1303
    REALM_ASSERT(!m_finalized);
10,406✔
1304
    REALM_ASSERT(!m_closed);
10,406✔
1305

1306
    m_actualized = true;
10,406✔
1307

1308
    ScopeExitFail close_on_error([&]() noexcept {
10,406✔
1309
        m_closed = true;
4✔
1310
    });
4✔
1311

1312
    m_db->claim_sync_agent();
10,406✔
1313
    m_db->add_commit_listener(this);
10,406✔
1314
    ScopeExitFail remove_commit_listener([&]() noexcept {
10,406✔
1315
        m_db->remove_commit_listener(this);
×
1316
    });
×
1317

1318
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,406✔
1319
                            m_user_id,           m_sync_mode,      m_server_verified};
10,406✔
1320
    bool was_created = false;
10,406✔
1321
    ClientImpl::Connection& conn = m_client.get_connection(
10,406✔
1322
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,406✔
1323
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,406✔
1324
        was_created); // Throws
10,406✔
1325
    ScopeExitFail remove_connection([&]() noexcept {
10,406✔
1326
        if (was_created)
×
1327
            m_client.remove_connection(conn);
×
1328
    });
×
1329

1330
    // FIXME: This only makes sense when each session uses a separate connection.
1331
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,406✔
1332
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,406✔
1333
    if (m_sync_mode == SyncServerMode::FLX) {
10,406✔
1334
        m_flx_pending_bootstrap_store =
1,806✔
1335
            std::make_unique<PendingBootstrapStore>(m_db, sess->logger, m_flx_subscription_store);
1,806✔
1336
    }
1,806✔
1337

1338
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,406✔
1339
    m_sess = sess.get();
10,406✔
1340
    ScopeExitFail clear_sess([&]() noexcept {
10,406✔
1341
        m_sess = nullptr;
×
1342
    });
×
1343
    conn.activate_session(std::move(sess)); // Throws
10,406✔
1344

1345
    if (was_created)
10,406✔
1346
        conn.activate(); // Throws
2,836✔
1347

1348
    if (m_connection_state_change_listener) {
10,406✔
1349
        ConnectionState state = conn.get_state();
10,392✔
1350
        if (state != ConnectionState::disconnected) {
10,392✔
1351
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,398✔
1352
            if (state == ConnectionState::connected)
7,398✔
1353
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
7,290✔
1354
        }
7,398✔
1355
    }
10,392✔
1356

1357
    if (!m_client_reset_config)
10,406✔
1358
        check_progress(); // Throws
9,976✔
1359
}
10,406✔
1360

1361
void SessionWrapper::force_close()
1362
{
10,508✔
1363
    if (m_closed) {
10,508✔
1364
        return;
106✔
1365
    }
106✔
1366
    REALM_ASSERT(m_actualized);
10,402✔
1367
    REALM_ASSERT(m_sess);
10,402✔
1368
    m_closed = true;
10,402✔
1369

1370
    ClientImpl::Connection& conn = m_sess->get_connection();
10,402✔
1371
    conn.initiate_session_deactivation(m_sess); // Throws
10,402✔
1372

1373
    // We need to keep the DB open until finalization, but we no longer want to
1374
    // know when commits are made
1375
    m_db->remove_commit_listener(this);
10,402✔
1376

1377
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1378
    m_flx_pending_bootstrap_store.reset();
10,402✔
1379
    // Clear the subscription and migration store refs since they are owned by SyncSession
1380
    m_flx_subscription_store.reset();
10,402✔
1381
    m_migration_store.reset();
10,402✔
1382
    m_sess = nullptr;
10,402✔
1383
    // Everything is being torn down, no need to report connection state anymore
1384
    m_connection_state_change_listener = {};
10,402✔
1385

1386
    // All outstanding wait operations must be canceled
1387
    while (!m_upload_completion_handlers.empty()) {
10,764✔
1388
        auto handler = std::move(m_upload_completion_handlers.back());
362✔
1389
        m_upload_completion_handlers.pop_back();
362✔
1390
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before upload was complete"}); // Throws
362✔
1391
    }
362✔
1392
    while (!m_download_completion_handlers.empty()) {
10,798✔
1393
        auto handler = std::move(m_download_completion_handlers.back());
396✔
1394
        m_download_completion_handlers.pop_back();
396✔
1395
        handler(
396✔
1396
            {ErrorCodes::OperationAborted, "Sync session is being closed before download was complete"}); // Throws
396✔
1397
    }
396✔
1398
    while (!m_sync_completion_handlers.empty()) {
10,414✔
1399
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1400
        m_sync_completion_handlers.pop_back();
12✔
1401
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before sync was complete"}); // Throws
12✔
1402
    }
12✔
1403
}
10,402✔
1404

1405
// Must be called from event loop thread
1406
//
1407
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
1408
// have been acquired at some point in between the final read or write ever made
1409
// from a different thread and when this is called.
1410
void SessionWrapper::finalize()
1411
{
10,406✔
1412
    REALM_ASSERT(m_actualized);
10,406✔
1413
    REALM_ASSERT(m_abandoned);
10,406✔
1414
    REALM_ASSERT(!m_finalized);
10,406✔
1415

1416
    force_close();
10,406✔
1417

1418
    m_finalized = true;
10,406✔
1419

1420
    // The Realm file can be closed now, as no access to the Realm file is
1421
    // supposed to happen on behalf of a session after initiation of
1422
    // deactivation.
1423
    m_db->release_sync_agent();
10,406✔
1424
    m_db = nullptr;
10,406✔
1425
}
10,406✔
1426

1427

1428
// Must be called only when an unactualized session wrapper becomes abandoned.
1429
//
1430
// Called with a lock on `m_client.m_mutex`.
1431
inline void SessionWrapper::finalize_before_actualization() noexcept
1432
{
76✔
1433
    REALM_ASSERT(!m_finalized);
76✔
1434
    REALM_ASSERT(!m_sess);
76✔
1435
    m_actualized = true;
76✔
1436
    m_finalized = true;
76✔
1437
    m_closed = true;
76✔
1438
    m_db->remove_commit_listener(this);
76✔
1439
    m_db->release_sync_agent();
76✔
1440
    m_db = nullptr;
76✔
1441
}
76✔
1442

1443
void SessionWrapper::on_download_completion()
1444
{
16,834✔
1445
    // Ensure that progress handlers get called before completion handlers. The
1446
    // download completing performed a commit and will trigger progress
1447
    // notifications asynchronously, but they would arrive after the download
1448
    // completion without this.
1449
    check_progress();
16,834✔
1450

1451
    if (m_flx_subscription_store) {
16,834✔
1452
        m_flx_subscription_store->download_complete();
2,994✔
1453
    }
2,994✔
1454

1455
    while (!m_download_completion_handlers.empty()) {
29,576✔
1456
        auto handler = std::move(m_download_completion_handlers.back());
12,742✔
1457
        m_download_completion_handlers.pop_back();
12,742✔
1458
        handler(Status::OK()); // Throws
12,742✔
1459
    }
12,742✔
1460
    while (!m_sync_completion_handlers.empty()) {
16,928✔
1461
        auto handler = std::move(m_sync_completion_handlers.back());
94✔
1462
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
94✔
1463
        m_sync_completion_handlers.pop_back();
94✔
1464
    }
94✔
1465
}
16,834✔
1466

1467

1468
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1469
{
830✔
1470
    REALM_ASSERT(!m_finalized);
830✔
1471
    m_suspended = true;
830✔
1472
    if (m_connection_state_change_listener) {
830✔
1473
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
830✔
1474
    }
830✔
1475
}
830✔
1476

1477

1478
void SessionWrapper::on_resumed()
1479
{
170✔
1480
    REALM_ASSERT(!m_finalized);
170✔
1481
    m_suspended = false;
170✔
1482
    if (m_connection_state_change_listener) {
170✔
1483
        ClientImpl::Connection& conn = m_sess->get_connection();
170✔
1484
        if (conn.get_state() != ConnectionState::disconnected) {
170✔
1485
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
160✔
1486
            if (conn.get_state() == ConnectionState::connected)
160✔
1487
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
146✔
1488
        }
160✔
1489
    }
170✔
1490
}
170✔
1491

1492

1493
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1494
                                                 const std::optional<SessionErrorInfo>& error_info)
1495
{
12,072✔
1496
    if (m_connection_state_change_listener && !m_suspended) {
12,072✔
1497
        m_connection_state_change_listener(state, error_info); // Throws
12,006✔
1498
    }
12,006✔
1499
}
12,072✔
1500

1501
void SessionWrapper::init_progress_handler()
1502
{
10,746✔
1503
    ClientHistory::get_upload_download_state(m_db.get(), m_final_downloaded, m_final_uploaded);
10,746✔
1504
}
10,746✔
1505

1506
void SessionWrapper::check_progress()
1507
{
159,882✔
1508
    REALM_ASSERT(!m_finalized);
159,882✔
1509
    REALM_ASSERT(m_sess);
159,882✔
1510

1511
    // Check if there's anything which even wants progress or completion information
1512
    bool has_progress_handler = m_progress_handler && m_reliable_download_progress;
159,882✔
1513
    bool has_completion_handler = !m_upload_completion_handlers.empty() || !m_sync_completion_handlers.empty();
159,882✔
1514
    if (!m_flx_subscription_store && !has_progress_handler && !has_completion_handler)
159,882✔
1515
        return;
80,914✔
1516

1517
    // The order in which we report each type of completion or progress is important,
1518
    // and changing it needs to be avoided as it'd be a breaking change to the APIs
1519

1520
    TransactionRef tr;
78,968✔
1521
    ReportedProgress p;
78,968✔
1522
    if (m_flx_subscription_store) {
78,968✔
1523
        m_flx_subscription_store->report_progress(tr);
36,834✔
1524
    }
36,834✔
1525

1526
    if (!has_progress_handler && !has_completion_handler)
78,968✔
1527
        return;
20,466✔
1528
    // The subscription store may have started a read transaction that we'll
1529
    // reuse, but it may not have needed to or may not exist
1530
    if (!tr)
58,502✔
1531
        tr = m_db->start_read();
52,186✔
1532

1533
    version_type uploaded_version;
58,502✔
1534
    DownloadableProgress downloadable;
58,502✔
1535
    ClientHistory::get_upload_download_state(*tr, m_db->get_alloc(), p.downloaded, downloadable, p.uploaded,
58,502✔
1536
                                             p.uploadable, p.snapshot, uploaded_version);
58,502✔
1537
    if (m_flx_subscription_store && has_progress_handler)
58,502✔
1538
        p.query_version = m_flx_subscription_store->get_downloading_query_version(*tr);
14,824✔
1539

1540
    report_progress(p, downloadable);
58,502✔
1541
    report_upload_completion(uploaded_version);
58,502✔
1542
}
58,502✔
1543

1544
void SessionWrapper::report_upload_completion(version_type uploaded_version)
1545
{
58,500✔
1546
    if (uploaded_version < m_upload_completion_requested_version)
58,500✔
1547
        return;
38,836✔
1548

1549
    std::move(m_sync_completion_handlers.begin(), m_sync_completion_handlers.end(),
19,664✔
1550
              std::back_inserter(m_download_completion_handlers));
19,664✔
1551
    m_sync_completion_handlers.clear();
19,664✔
1552

1553
    while (!m_upload_completion_handlers.empty()) {
34,688✔
1554
        auto handler = std::move(m_upload_completion_handlers.back());
15,024✔
1555
        m_upload_completion_handlers.pop_back();
15,024✔
1556
        handler(Status::OK()); // Throws
15,024✔
1557
    }
15,024✔
1558
}
19,664✔
1559

1560
void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress downloadable)
1561
{
58,500✔
1562
    if (!m_progress_handler)
58,500✔
1563
        return;
27,774✔
1564

1565
    // Ignore progress messages from before we first receive a DOWNLOAD message
1566
    if (!m_reliable_download_progress)
30,726✔
1567
        return;
3,302✔
1568

1569
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
27,424✔
1570
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
18,954✔
1571
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
18,954✔
1572

1573
        // The effect of this calculation is that if new bytes are added for download/upload,
1574
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1575
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1576
        // before progress has reached 1.0.
1577
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1578
        // Example for upload progress reported:
1579
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1580

1581
        double progress_estimate = 1.0;
18,954✔
1582
        if (final_transferred < transferable && transferred < transferable)
18,954✔
1583
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
9,734✔
1584
        return progress_estimate;
18,954✔
1585
    };
18,954✔
1586

1587
    bool upload_completed = p.uploaded == p.uploadable;
27,424✔
1588
    double upload_estimate = 1.0;
27,424✔
1589
    if (!upload_completed)
27,424✔
1590
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);
9,686✔
1591

1592
    bool download_completed = p.downloaded == 0;
27,424✔
1593
    p.download_estimate = 1.00;
27,424✔
1594
    if (m_flx_pending_bootstrap_store) {
27,424✔
1595
        p.download_estimate = downloadable.as_estimate();
14,824✔
1596
        if (m_flx_pending_bootstrap_store->has_pending()) {
14,824✔
1597
            p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
2,742✔
1598
        }
2,742✔
1599
        download_completed = p.download_estimate >= 1.0;
14,824✔
1600

1601
        // for flx with download estimate these bytes are not known
1602
        // provide some sensible value for non-streaming version of object-store callbacks
1603
        // until these field are completely removed from the api after pbs deprecation
1604
        p.downloadable = p.downloaded;
14,824✔
1605
        if (p.download_estimate > 0 && p.download_estimate < 1.0 && p.downloaded > m_final_downloaded)
14,824✔
1606
            p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / p.download_estimate);
2,770✔
1607
    }
14,824✔
1608
    else {
12,600✔
1609
        // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1610
        // is only the remaining to download. This is confusing, so make them use
1611
        // the same units.
1612
        p.downloadable = downloadable.as_bytes() + p.downloaded;
12,600✔
1613
        if (!download_completed)
12,600✔
1614
            p.download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
9,268✔
1615
    }
12,600✔
1616

1617
    if (download_completed)
27,424✔
1618
        m_final_downloaded = p.downloaded;
15,374✔
1619
    if (upload_completed)
27,424✔
1620
        m_final_uploaded = p.uploaded;
17,738✔
1621

1622
    if (p == m_reported_progress)
27,424✔
1623
        return;
18,314✔
1624

1625
    m_reported_progress = p;
9,110✔
1626

1627
    if (m_sess->logger.would_log(Logger::Level::debug)) {
9,110✔
1628
        auto to_str = [](double d) {
17,748✔
1629
            std::ostringstream ss;
17,748✔
1630
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1631
            ss << std::fixed << std::setprecision(4) << d;
17,748✔
1632
            return ss.str();
17,748✔
1633
        };
17,748✔
1634
        m_sess->logger.debug(
8,874✔
1635
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
8,874✔
1636
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
8,874✔
1637
            p.downloaded, p.downloadable, to_str(p.download_estimate), p.uploaded, p.uploadable,
8,874✔
1638
            to_str(upload_estimate), p.snapshot, p.query_version);
8,874✔
1639
    }
8,874✔
1640

1641
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, p.download_estimate,
9,110✔
1642
                       upload_estimate, p.query_version);
9,110✔
1643
}
9,110✔
1644

1645
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1646
{
72✔
1647
    if (!m_sess) {
72✔
1648
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1649
    }
×
1650

1651
    return m_sess->send_test_command(std::move(body));
72✔
1652
}
72✔
1653

1654
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1655
{
10,744✔
1656
    REALM_ASSERT(!m_finalized);
10,744✔
1657

1658
    auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
10,744✔
1659
    if (!has_pending_reset) {
10,744✔
1660
        return; // nothing to do
10,374✔
1661
    }
10,374✔
1662

1663
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
370✔
1664

1665
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
1666
    async_wait_for(
370✔
1667
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
370✔
1668
            if (status == ErrorCodes::OperationAborted) {
368✔
1669
                return;
160✔
1670
            }
160✔
1671
            auto& logger = self->m_sess->logger;
208✔
1672
            if (!status.is_ok()) {
208✔
1673
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
×
1674
                             status);
×
1675
                return;
×
1676
            }
×
1677

1678
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
208✔
1679

1680
            auto tr = self->m_db->start_write();
208✔
1681
            auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
208✔
1682
            if (!cur_pending_reset) {
208✔
1683
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
4✔
1684
                return;
4✔
1685
            }
4✔
1686
            if (*cur_pending_reset == pending_reset) {
204✔
1687
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
204✔
1688
            }
204✔
1689
            else {
×
1690
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
×
1691
            }
×
1692
            PendingResetStore::clear_pending_reset(tr);
204✔
1693
            tr->commit();
204✔
1694
        });
204✔
1695
}
370✔
1696

1697
std::string SessionWrapper::get_appservices_connection_id()
1698
{
76✔
1699
    auto pf = util::make_promise_future<std::string>();
76✔
1700

1701
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
76✔
1702
        if (!status.is_ok()) {
76✔
1703
            promise.set_error(status);
×
1704
            return;
×
1705
        }
×
1706

1707
        if (!self->m_sess) {
76✔
1708
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1709
            return;
×
1710
        }
×
1711

1712
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
76✔
1713
    });
76✔
1714

1715
    return pf.future.get();
76✔
1716
}
76✔
1717

1718
// ################ ClientImpl::Connection ################
1719

1720
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1721
                                   const std::string& authorization_header_name,
1722
                                   const std::map<std::string, std::string>& custom_http_headers,
1723
                                   bool verify_servers_ssl_certificate,
1724
                                   Optional<std::string> ssl_trust_certificate_path,
1725
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1726
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1727
    : logger{make_logger(ident, std::nullopt, client.logger.base_logger)} // Throws
1,352✔
1728
    , m_client{client}
1,352✔
1729
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,352✔
1730
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,352✔
1731
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,352✔
1732
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,352✔
1733
    , m_reconnect_info{reconnect_info}
1,352✔
1734
    , m_ident{ident}
1,352✔
1735
    , m_server_endpoint{std::move(endpoint)}
1,352✔
1736
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,352✔
1737
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,352✔
1738
{
2,836✔
1739
    m_on_idle = m_client.create_trigger([this](Status status) {
2,844✔
1740
        if (status == ErrorCodes::OperationAborted)
2,844✔
1741
            return;
×
1742
        else if (!status.is_ok())
2,844✔
1743
            throw Exception(status);
×
1744

1745
        REALM_ASSERT(m_activated);
2,844✔
1746
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,844✔
1747
            on_idle(); // Throws
2,836✔
1748
            // Connection object may be destroyed now.
1749
        }
2,836✔
1750
    });
2,844✔
1751
}
2,836✔
1752

1753
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1754
{
12✔
1755
    return m_ident;
12✔
1756
}
12✔
1757

1758

1759
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1760
{
2,836✔
1761
    return m_server_endpoint;
2,836✔
1762
}
2,836✔
1763

1764
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1765
                                                        const std::string& signed_access_token)
1766
{
10,620✔
1767
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,620✔
1768
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,620✔
1769
}
10,620✔
1770

1771

1772
void ClientImpl::Connection::resume_active_sessions()
1773
{
1,892✔
1774
    auto handler = [=](ClientImpl::Session& sess) {
3,780✔
1775
        sess.cancel_resumption_delay(); // Throws
3,780✔
1776
    };
3,780✔
1777
    for_each_active_session(std::move(handler)); // Throws
1,892✔
1778
}
1,892✔
1779

1780
void ClientImpl::Connection::on_idle()
1781
{
2,836✔
1782
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,836✔
1783
    ClientImpl& client = get_client();
2,836✔
1784
    client.remove_connection(*this);
2,836✔
1785
    // NOTE: This connection object is now destroyed!
1786
}
2,836✔
1787

1788

1789
std::string ClientImpl::Connection::get_http_request_path() const
1790
{
3,830✔
1791
    using namespace std::string_view_literals;
3,830✔
1792
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,830✔
1793

1794
    std::string path;
3,830✔
1795
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,830✔
1796
    path += m_http_request_path_prefix;
3,830✔
1797
    path += param;
3,830✔
1798
    path += m_signed_access_token;
3,830✔
1799

1800
    return path;
3,830✔
1801
}
3,830✔
1802

1803

1804
std::shared_ptr<util::Logger> ClientImpl::Connection::make_logger(connection_ident_type ident,
1805
                                                                  std::optional<std::string_view> coid,
1806
                                                                  std::shared_ptr<util::Logger> base_logger)
1807
{
10,020✔
1808
    std::string prefix =
10,020✔
1809
        coid ? util::format("Connection[%1:%2] ", ident, *coid) : util::format("Connection[%1] ", ident);
10,020✔
1810
    return std::make_shared<util::PrefixLogger>(util::LogCategory::session, std::move(prefix), base_logger);
10,020✔
1811
}
10,020✔
1812

1813

1814
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1815
                                                            std::optional<SessionErrorInfo> error_info)
1816
{
11,278✔
1817
    if (m_force_closed) {
11,278✔
1818
        return;
2,474✔
1819
    }
2,474✔
1820
    auto handler = [=](ClientImpl::Session& sess) {
11,910✔
1821
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
11,910✔
1822
        sess_2.on_connection_state_changed(state, error_info); // Throws
11,910✔
1823
    };
11,910✔
1824
    for_each_active_session(std::move(handler)); // Throws
8,804✔
1825
}
8,804✔
1826

1827

1828
Client::Client(Config config)
1829
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,914✔
1830
{
9,966✔
1831
}
9,966✔
1832

1833

1834
Client::Client(Client&& client) noexcept
1835
    : m_impl{std::move(client.m_impl)}
1836
{
×
1837
}
×
1838

1839

1840
Client::~Client() noexcept {}
9,966✔
1841

1842

1843
void Client::shutdown() noexcept
1844
{
10,048✔
1845
    m_impl->shutdown();
10,048✔
1846
}
10,048✔
1847

1848
void Client::shutdown_and_wait()
1849
{
772✔
1850
    m_impl->shutdown_and_wait();
772✔
1851
}
772✔
1852

1853
void Client::cancel_reconnect_delay()
1854
{
1,896✔
1855
    m_impl->cancel_reconnect_delay();
1,896✔
1856
}
1,896✔
1857

1858
void Client::voluntary_disconnect_all_connections()
1859
{
12✔
1860
    m_impl->voluntary_disconnect_all_connections();
12✔
1861
}
12✔
1862

1863
bool Client::wait_for_session_terminations_or_client_stopped()
1864
{
9,636✔
1865
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,636✔
1866
}
9,636✔
1867

1868
util::Future<void> Client::notify_session_terminated()
1869
{
56✔
1870
    return m_impl->notify_session_terminated();
56✔
1871
}
56✔
1872

1873
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
1874
                                  port_type& port, std::string& path) const
1875
{
4,382✔
1876
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
4,382✔
1877
}
4,382✔
1878

1879

1880
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1881
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
1882
{
10,482✔
1883
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,482✔
1884
                                std::move(config)}; // Throws
10,482✔
1885
}
10,482✔
1886

1887

1888
void Session::nonsync_transact_notify(version_type new_version)
1889
{
20,170✔
1890
    m_impl->on_commit(new_version); // Throws
20,170✔
1891
}
20,170✔
1892

1893

1894
void Session::cancel_reconnect_delay()
1895
{
32✔
1896
    m_impl->cancel_reconnect_delay(); // Throws
32✔
1897
}
32✔
1898

1899

1900
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
1901
{
5,438✔
1902
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
5,438✔
1903
}
5,438✔
1904

1905

1906
bool Session::wait_for_upload_complete_or_client_stopped()
1907
{
12,912✔
1908
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,912✔
1909
}
12,912✔
1910

1911

1912
bool Session::wait_for_download_complete_or_client_stopped()
1913
{
10,004✔
1914
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,004✔
1915
}
10,004✔
1916

1917

1918
void Session::refresh(std::string_view signed_access_token)
1919
{
218✔
1920
    m_impl->refresh(signed_access_token); // Throws
218✔
1921
}
218✔
1922

1923

1924
void Session::abandon() noexcept
1925
{
10,480✔
1926
    REALM_ASSERT(m_impl);
10,480✔
1927
    // Reabsorb the ownership assigned to the applications naked pointer by
1928
    // Session constructor
1929
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,480✔
1930
    SessionWrapper::abandon(std::move(wrapper));
10,480✔
1931
}
10,480✔
1932

1933
util::Future<std::string> Session::send_test_command(std::string body)
1934
{
72✔
1935
    return m_impl->send_test_command(std::move(body));
72✔
1936
}
72✔
1937

1938
std::string Session::get_appservices_connection_id()
1939
{
76✔
1940
    return m_impl->get_appservices_connection_id();
76✔
1941
}
76✔
1942

1943
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
1944
{
×
1945
    switch (proxyType) {
×
1946
        case ProxyConfig::Type::HTTP:
×
1947
            return os << "HTTP";
×
1948
        case ProxyConfig::Type::HTTPS:
×
1949
            return os << "HTTPS";
×
1950
    }
×
1951
    REALM_TERMINATE("Invalid Proxy Type object.");
1952
}
×
1953

1954
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc