• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / michael.wilkersonbarker_1094

06 May 2024 04:51PM UTC coverage: 90.761% (+0.3%) from 90.422%
michael.wilkersonbarker_1094

Pull #7675

Evergreen

michael-wb
Added test to create 2 user and 2 rules when only 1 updated
Pull Request #7675: RCORE-1973 Add role/permissions tests for new bootstrap feature

102046 of 180410 branches covered (56.56%)

13 of 15 new or added lines in 2 files covered. (86.67%)

47 existing lines in 9 files now uncovered.

212810 of 234473 relevant lines covered (90.76%)

5670520.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.26
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string_view signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    void update_subscription_version_info();
127

128
    std::string get_appservices_connection_id();
129

130
protected:
131
    friend class ClientImpl;
132

133
    // m_initiated/m_abandoned is used to check that we aren't trying to update immutable properties like the progress
134
    // handler or connection state listener after we've bound the session. We read the variable a bunch in
135
    // REALM_ASSERTS on the event loop and on the user's thread, but we only set it once and while we're registering
136
    // the session wrapper to be actualized. This function gets called from
137
    // ClientImpl::register_unactualized_session_wrapper() to synchronize updating this variable on the main thread
138
    // with reading the variable on the event loop.
139
    void mark_initiated();
140
    void mark_abandoned();
141

142
private:
143
    ClientImpl& m_client;
144
    DBRef m_db;
145
    Replication* m_replication;
146

147
    const ProtocolEnvelope m_protocol_envelope;
148
    const std::string m_server_address;
149
    const port_type m_server_port;
150
    const bool m_server_verified;
151
    const std::string m_user_id;
152
    const SyncServerMode m_sync_mode;
153
    const std::string m_authorization_header_name;
154
    const std::map<std::string, std::string> m_custom_http_headers;
155
    const bool m_verify_servers_ssl_certificate;
156
    const bool m_simulate_integration_error;
157
    const Optional<std::string> m_ssl_trust_certificate_path;
158
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
159
    const size_t m_flx_bootstrap_batch_size_bytes;
160

161
    // This one is different from null when, and only when the session wrapper
162
    // is in ClientImpl::m_abandoned_session_wrappers.
163
    SessionWrapper* m_next = nullptr;
164

165
    // After initiation, these may only be accessed by the event loop thread.
166
    std::string m_http_request_path_prefix;
167
    std::string m_virt_path;
168
    std::string m_signed_access_token;
169

170
    util::Optional<ClientReset> m_client_reset_config;
171

172
    util::Optional<ProxyConfig> m_proxy_config;
173

174
    struct ReportedProgress {
175
        uint64_t snapshot = 0;
176
        uint64_t uploaded = 0;
177
        uint64_t uploadable = 0;
178
        uint64_t downloaded = 0;
179
        uint64_t downloadable = 0;
180
        uint64_t final_uploaded = 0;
181
        uint64_t final_downloaded = 0;
182
    } m_reported_progress;
183

184
    util::UniqueFunction<ProgressHandler> m_progress_handler;
185
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
186

187
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
188
    bool m_in_debug_hook = false;
189

190
    SessionReason m_session_reason;
191

192
    const uint64_t m_schema_version;
193

194
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
195
    int64_t m_flx_active_version = 0;
196
    int64_t m_flx_last_seen_version = 0;
197
    int64_t m_flx_pending_mark_version = 0;
198
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
199

200
    std::shared_ptr<MigrationStore> m_migration_store;
201

202
    bool m_initiated = false;
203

204
    // Set to true when this session wrapper is actualized (or when it is
205
    // finalized before proper actualization). It is then never modified again.
206
    //
207
    // A session specific post handler submitted after the initiation of the
208
    // session wrapper (initiate()) will always find that `m_actualized` is
209
    // true. This is the case, because the scheduling of such a post handler
210
    // will have been preceded by the triggering of
211
    // `ClientImpl::m_actualize_and_finalize` (in
212
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
213
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
214
    // before the post handler. If the session wrapper is no longer in
215
    // `ClientImpl::m_unactualized_session_wrappers` when
216
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
217
    // have been abandoned already, but in that case,
218
    // finalize_before_actualization() has already been called.
219
    bool m_actualized = false;
220

221
    bool m_force_closed = false;
222

223
    bool m_suspended = false;
224

225
    // Set when the session has been abandoned, but before it's been finalized.
226
    bool m_abandoned = false;
227
    // Has the SessionWrapper been finalized?
228
    bool m_finalized = false;
229

230
    // Set to true when the first DOWNLOAD message is received to indicate that
231
    // the byte-level download progress parameters can be considered reasonable
232
    // reliable. Before that, a lot of time may have passed, so our record of
233
    // the download progress is likely completely out of date.
234
    bool m_reliable_download_progress = false;
235

236
    std::optional<double> m_download_estimate;
237
    std::optional<uint64_t> m_bootstrap_store_bytes;
238

239
    // Set to point to an activated session object during actualization of the
240
    // session wrapper. Set to null during finalization of the session
241
    // wrapper. Both modifications are guaranteed to be performed by the event
242
    // loop thread.
243
    //
244
    // If a session specific post handler, that is submitted after the
245
    // initiation of the session wrapper, sees that `m_sess` is null, it can
246
    // conclude that the session wrapper has been both abandoned and
247
    // finalized. This is true, because the scheduling of such a post handler
248
    // will have been preceded by the triggering of
249
    // `ClientImpl::m_actualize_and_finalize` (in
250
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
251
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
252
    // before the post handler, so the session wrapper must have been actualized
253
    // unless it was already abandoned by the application. If it was abandoned
254
    // before it was actualized, it will already have been finalized by
255
    // finalize_before_actualization().
256
    //
257
    // Must only be accessed from the event loop thread.
258
    SessionImpl* m_sess = nullptr;
259

260
    // These must only be accessed from the event loop thread.
261
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
262
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
263
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
264

265
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
266
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
267
    // event loop thread.
268
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
269
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
270
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
271

272
    void on_upload_progress(bool only_if_new_uploadable_data = false);
273
    void on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes = {});
274
    void on_upload_completion();
275
    void on_download_completion();
276
    void on_suspended(const SessionErrorInfo& error_info);
277
    void on_resumed();
278
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
279
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
280
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
281
    void on_flx_sync_version_complete(int64_t version);
282

283
    void init_progress_handler();
284
    // only_if_new_uploadable_data can be true only if is_download is false
285
    void report_progress(bool is_download, bool only_if_new_uploadable_data = false);
286

287
    friend class SessionWrapperStack;
288
    friend class ClientImpl::Session;
289
};
290

291

292
// ################ SessionWrapperStack ################
293

294
inline bool SessionWrapperStack::empty() const noexcept
295
{
×
296
    return !m_back;
×
297
}
×
298

299

300
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
301
{
10,368✔
302
    REALM_ASSERT(!w->m_next);
10,368✔
303
    w->m_next = m_back;
10,368✔
304
    m_back = w.release();
10,368✔
305
}
10,368✔
306

307

308
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
309
{
25,684✔
310
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
25,684✔
311
    if (m_back) {
25,684✔
312
        m_back = m_back->m_next;
10,354✔
313
        w->m_next = nullptr;
10,354✔
314
    }
10,354✔
315
    return w;
25,684✔
316
}
25,684✔
317

318

319
inline void SessionWrapperStack::clear() noexcept
320
{
25,046✔
321
    while (m_back) {
25,046✔
322
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
323
        m_back = w->m_next;
×
324
    }
×
325
}
25,046✔
326

327

328
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
329
    : m_back{q.m_back}
330
{
331
    q.m_back = nullptr;
332
}
333

334

335
SessionWrapperStack::~SessionWrapperStack()
336
{
25,046✔
337
    clear();
25,046✔
338
}
25,046✔
339

340

341
// ################ ClientImpl ################
342

343
ClientImpl::~ClientImpl()
344
{
9,726✔
345
    // Since no other thread is allowed to be accessing this client or any of
346
    // its subobjects at this time, no mutex locking is necessary.
347

348
    shutdown_and_wait();
9,726✔
349
    // Session wrappers are removed from m_unactualized_session_wrappers as they
350
    // are abandoned.
351
    REALM_ASSERT(m_stopped);
9,726✔
352
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,726✔
353
}
9,726✔
354

355

356
void ClientImpl::cancel_reconnect_delay()
357
{
1,952✔
358
    // Thread safety required
359
    post([this](Status status) {
1,952✔
360
        if (status == ErrorCodes::OperationAborted)
1,952✔
361
            return;
×
362
        else if (!status.is_ok())
1,952✔
363
            throw Exception(status);
×
364

365
        for (auto& p : m_server_slots) {
1,952✔
366
            ServerSlot& slot = p.second;
1,952✔
367
            if (m_one_connection_per_session) {
1,952✔
368
                REALM_ASSERT(!slot.connection);
×
369
                for (const auto& p : slot.alt_connections) {
×
370
                    ClientImpl::Connection& conn = *p.second;
×
371
                    conn.resume_active_sessions(); // Throws
×
372
                    conn.cancel_reconnect_delay(); // Throws
×
373
                }
×
374
            }
×
375
            else {
1,952✔
376
                REALM_ASSERT(slot.alt_connections.empty());
1,952✔
377
                if (slot.connection) {
1,952✔
378
                    ClientImpl::Connection& conn = *slot.connection;
1,948✔
379
                    conn.resume_active_sessions(); // Throws
1,948✔
380
                    conn.cancel_reconnect_delay(); // Throws
1,948✔
381
                }
1,948✔
382
                else {
4✔
383
                    slot.reconnect_info.reset();
4✔
384
                }
4✔
385
            }
1,952✔
386
        }
1,952✔
387
    }); // Throws
1,952✔
388
}
1,952✔
389

390

391
void ClientImpl::voluntary_disconnect_all_connections()
392
{
12✔
393
    auto done_pf = util::make_promise_future<void>();
12✔
394
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
395
        if (status == ErrorCodes::OperationAborted) {
12✔
396
            return;
×
397
        }
×
398

399
        REALM_ASSERT(status.is_ok());
12✔
400

401
        try {
12✔
402
            for (auto& p : m_server_slots) {
12✔
403
                ServerSlot& slot = p.second;
12✔
404
                if (m_one_connection_per_session) {
12✔
405
                    REALM_ASSERT(!slot.connection);
×
406
                    for (const auto& p : slot.alt_connections) {
×
407
                        ClientImpl::Connection& conn = *p.second;
×
408
                        if (conn.get_state() == ConnectionState::disconnected) {
×
409
                            continue;
×
410
                        }
×
411
                        conn.voluntary_disconnect();
×
412
                    }
×
413
                }
×
414
                else {
12✔
415
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
416
                    if (!slot.connection) {
12✔
417
                        continue;
×
418
                    }
×
419
                    ClientImpl::Connection& conn = *slot.connection;
12✔
420
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
421
                        continue;
×
422
                    }
×
423
                    conn.voluntary_disconnect();
12✔
424
                }
12✔
425
            }
12✔
426
        }
12✔
427
        catch (...) {
12✔
428
            promise.set_error(exception_to_status());
×
429
            return;
×
430
        }
×
431
        promise.emplace_value();
12✔
432
    });
12✔
433
    done_pf.future.get();
12✔
434
}
12✔
435

436

437
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
438
{
9,288✔
439
    // Thread safety required
440

441
    {
9,288✔
442
        std::lock_guard lock{m_mutex};
9,288✔
443
        m_sessions_terminated = false;
9,288✔
444
    }
9,288✔
445

446
    // The technique employed here relies on the fact that
447
    // actualize_and_finalize_session_wrappers() must get to execute at least
448
    // once before the post handler submitted below gets to execute, but still
449
    // at a time where all session wrappers, that are abandoned prior to the
450
    // execution of wait_for_session_terminations_or_client_stopped(), have been
451
    // added to `m_abandoned_session_wrappers`.
452
    //
453
    // To see that this is the case, consider a session wrapper that was
454
    // abandoned before wait_for_session_terminations_or_client_stopped() was
455
    // invoked. Then the session wrapper will have been added to
456
    // `m_abandoned_session_wrappers`, and an invocation of
457
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
458
    // guarantees mentioned in the documentation of Trigger then ensure
459
    // that at least one execution of actualize_and_finalize_session_wrappers()
460
    // will happen after the session wrapper has been added to
461
    // `m_abandoned_session_wrappers`, but before the post handler submitted
462
    // below gets to execute.
463
    post([this](Status status) mutable {
9,288✔
464
        if (status == ErrorCodes::OperationAborted)
9,288✔
465
            return;
×
466
        else if (!status.is_ok())
9,288✔
467
            throw Exception(status);
×
468

469
        std::lock_guard lock{m_mutex};
9,288✔
470
        m_sessions_terminated = true;
9,288✔
471
        m_wait_or_client_stopped_cond.notify_all();
9,288✔
472
    }); // Throws
9,288✔
473

474
    bool completion_condition_was_satisfied;
9,288✔
475
    {
9,288✔
476
        std::unique_lock lock{m_mutex};
9,288✔
477
        while (!m_sessions_terminated && !m_stopped)
18,560✔
478
            m_wait_or_client_stopped_cond.wait(lock);
9,272✔
479
        completion_condition_was_satisfied = !m_stopped;
9,288✔
480
    }
9,288✔
481
    return completion_condition_was_satisfied;
9,288✔
482
}
9,288✔
483

484

485
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
486
util::Future<void> ClientImpl::notify_session_terminated()
487
{
56✔
488
    auto pf = util::make_promise_future<void>();
56✔
489
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
490
        // Includes operation_aborted
491
        if (!status.is_ok()) {
56✔
492
            promise.set_error(status);
×
493
            return;
×
494
        }
×
495

496
        promise.emplace_value();
56✔
497
    });
56✔
498

499
    return std::move(pf.future);
56✔
500
}
56✔
501

502
void ClientImpl::drain_connections_on_loop()
503
{
9,726✔
504
    post([this](Status status) mutable {
9,726✔
505
        REALM_ASSERT(status.is_ok());
9,712✔
506
        drain_connections();
9,712✔
507
    });
9,712✔
508
}
9,726✔
509

510
void ClientImpl::shutdown_and_wait()
511
{
10,498✔
512
    shutdown();
10,498✔
513
    std::unique_lock lock{m_drain_mutex};
10,498✔
514
    if (m_drained) {
10,498✔
515
        return;
768✔
516
    }
768✔
517

518
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,730✔
519
    m_drain_cv.wait(lock, [&] {
19,448✔
520
        return m_num_connections == 0 && m_outstanding_posts == 0;
19,448✔
521
    });
19,448✔
522

523
    m_drained = true;
9,730✔
524
}
9,730✔
525

526
void ClientImpl::shutdown() noexcept
527
{
20,302✔
528
    {
20,302✔
529
        std::lock_guard lock{m_mutex};
20,302✔
530
        if (m_stopped)
20,302✔
531
            return;
10,576✔
532
        m_stopped = true;
9,726✔
533
        m_wait_or_client_stopped_cond.notify_all();
9,726✔
534
    }
9,726✔
535

536
    drain_connections_on_loop();
×
537
}
9,726✔
538

539

540
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
541
{
10,542✔
542
    // Thread safety required.
543
    {
10,542✔
544
        std::lock_guard lock{m_mutex};
10,542✔
545
        REALM_ASSERT(m_actualize_and_finalize);
10,542✔
546
        wrapper->mark_initiated();
10,542✔
547
        m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
10,542✔
548
    }
10,542✔
549
    m_actualize_and_finalize->trigger();
10,542✔
550
}
10,542✔
551

552

553
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
554
{
10,544✔
555
    // Thread safety required.
556
    {
10,544✔
557
        std::lock_guard lock{m_mutex};
10,544✔
558
        REALM_ASSERT(m_actualize_and_finalize);
10,544✔
559
        wrapper->mark_abandoned();
10,544✔
560

561
        // If the session wrapper has not yet been actualized (on the event loop
562
        // thread), it can be immediately finalized. This ensures that we will
563
        // generally not actualize a session wrapper that has already been
564
        // abandoned.
565
        auto i = m_unactualized_session_wrappers.find(wrapper.get());
10,544✔
566
        if (i != m_unactualized_session_wrappers.end()) {
10,544✔
567
            m_unactualized_session_wrappers.erase(i);
176✔
568
            wrapper->finalize_before_actualization();
176✔
569
            return;
176✔
570
        }
176✔
571
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,368✔
572
    }
10,368✔
573
    m_actualize_and_finalize->trigger();
×
574
}
10,368✔
575

576

577
// Must be called from the event loop thread.
578
void ClientImpl::actualize_and_finalize_session_wrappers()
579
{
15,332✔
580
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
15,332✔
581
    SessionWrapperStack abandoned_session_wrappers;
15,332✔
582
    bool stopped;
15,332✔
583
    {
15,332✔
584
        std::lock_guard lock{m_mutex};
15,332✔
585
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
15,332✔
586
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
15,332✔
587
        stopped = m_stopped;
15,332✔
588
    }
15,332✔
589
    // Note, we need to finalize old session wrappers before we actualize new
590
    // ones. This ensures that deactivation of old sessions is initiated before
591
    // new session are activated. This, in turn, ensures that the server does
592
    // not see two overlapping sessions for the same local Realm file.
593
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
25,688✔
594
        wrapper->finalize(); // Throws
10,356✔
595
    if (stopped) {
15,332✔
596
        for (auto& p : unactualized_session_wrappers) {
670✔
597
            SessionWrapper& wrapper = *p.first;
4✔
598
            wrapper.finalize_before_actualization();
4✔
599
        }
4✔
600
        return;
670✔
601
    }
670✔
602
    for (auto& p : unactualized_session_wrappers) {
14,662✔
603
        SessionWrapper& wrapper = *p.first;
10,364✔
604
        ServerEndpoint server_endpoint = std::move(p.second);
10,364✔
605
        wrapper.actualize(std::move(server_endpoint)); // Throws
10,364✔
606
    }
10,364✔
607
}
14,662✔
608

609

610
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
611
                                                   const std::string& authorization_header_name,
612
                                                   const std::map<std::string, std::string>& custom_http_headers,
613
                                                   bool verify_servers_ssl_certificate,
614
                                                   Optional<std::string> ssl_trust_certificate_path,
615
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
616
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
617
{
10,360✔
618
    auto&& [server_slot_it, inserted] =
10,360✔
619
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,360✔
620
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,360✔
621

622
    // TODO: enable multiplexing with proxies
623
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,360✔
624
        // Use preexisting connection
625
        REALM_ASSERT(server_slot.alt_connections.empty());
7,580✔
626
        return *server_slot.connection;
7,580✔
627
    }
7,580✔
628

629
    // Create a new connection
630
    REALM_ASSERT(!server_slot.connection);
2,780✔
631
    connection_ident_type ident = m_prev_connection_ident + 1;
2,780✔
632
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,780✔
633
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,780✔
634
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,780✔
635
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,780✔
636
    ClientImpl::Connection& conn = *conn_2;
2,780✔
637
    if (!m_one_connection_per_session) {
2,780✔
638
        server_slot.connection = std::move(conn_2);
2,766✔
639
    }
2,766✔
640
    else {
14✔
641
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
14✔
642
    }
14✔
643
    m_prev_connection_ident = ident;
2,780✔
644
    was_created = true;
2,780✔
645
    {
2,780✔
646
        std::lock_guard lk(m_drain_mutex);
2,780✔
647
        ++m_num_connections;
2,780✔
648
    }
2,780✔
649
    return conn;
2,780✔
650
}
10,360✔
651

652

653
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
654
{
2,766✔
655
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,766✔
656
    auto i = m_server_slots.find(endpoint);
2,766✔
657
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,766✔
658
    ServerSlot& server_slot = i->second;
2,766✔
659
    if (!m_one_connection_per_session) {
2,766✔
660
        REALM_ASSERT(server_slot.alt_connections.empty());
2,754✔
661
        REALM_ASSERT(&*server_slot.connection == &conn);
2,754✔
662
        server_slot.reconnect_info = conn.get_reconnect_info();
2,754✔
663
        server_slot.connection.reset();
2,754✔
664
    }
2,754✔
665
    else {
12✔
666
        REALM_ASSERT(!server_slot.connection);
12✔
667
        connection_ident_type ident = conn.get_ident();
12✔
668
        auto j = server_slot.alt_connections.find(ident);
12✔
669
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
670
        REALM_ASSERT(&*j->second == &conn);
12✔
671
        server_slot.alt_connections.erase(j);
12✔
672
    }
12✔
673

674
    {
2,766✔
675
        std::lock_guard lk(m_drain_mutex);
2,766✔
676
        REALM_ASSERT(m_num_connections);
2,766✔
677
        --m_num_connections;
2,766✔
678
        m_drain_cv.notify_all();
2,766✔
679
    }
2,766✔
680
}
2,766✔
681

682

683
// ################ SessionImpl ################
684

685
void SessionImpl::force_close()
686
{
102✔
687
    // Allow force_close() if session is active or hasn't been activated yet.
688
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102!
689
        m_wrapper.force_close();
102✔
690
    }
102✔
691
}
102✔
692

693
void SessionImpl::on_connection_state_changed(ConnectionState state,
694
                                              const util::Optional<SessionErrorInfo>& error_info)
695
{
11,960✔
696
    // Only used to report errors back to the SyncSession while the Session is active
697
    if (m_state == SessionImpl::Active) {
11,960✔
698
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
11,960✔
699
    }
11,960✔
700
}
11,960✔
701

702

703
const std::string& SessionImpl::get_virt_path() const noexcept
704
{
7,604✔
705
    // Can only be called if the session is active or being activated
706
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,604!
707
    return m_wrapper.m_virt_path;
7,604✔
708
}
7,604✔
709

710
const std::string& SessionImpl::get_realm_path() const noexcept
711
{
10,648✔
712
    // Can only be called if the session is active or being activated
713
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,648✔
714
    return m_wrapper.m_db->get_path();
10,648✔
715
}
10,648✔
716

717
DBRef SessionImpl::get_db() const noexcept
718
{
26,212✔
719
    // Can only be called if the session is active or being activated
720
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
26,212!
721
    return m_wrapper.m_db;
26,212✔
722
}
26,212✔
723

724
ClientReplication& SessionImpl::get_repl() const noexcept
725
{
121,362✔
726
    // Can only be called if the session is active or being activated
727
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
121,362✔
728
    return m_wrapper.get_replication();
121,362✔
729
}
121,362✔
730

731
ClientHistory& SessionImpl::get_history() const noexcept
732
{
119,378✔
733
    return get_repl().get_history();
119,378✔
734
}
119,378✔
735

736
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
737
{
14,028✔
738
    // Can only be called if the session is active or being activated
739
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
14,028✔
740
    return m_wrapper.m_client_reset_config;
14,028✔
741
}
14,028✔
742

743
SessionReason SessionImpl::get_session_reason() noexcept
744
{
1,528✔
745
    // Can only be called if the session is active or being activated
746
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,528✔
747
    return m_wrapper.m_session_reason;
1,528✔
748
}
1,528✔
749

750
uint64_t SessionImpl::get_schema_version() noexcept
751
{
1,528✔
752
    // Can only be called if the session is active or being activated
753
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,528✔
754
    return m_wrapper.m_schema_version;
1,528✔
755
}
1,528✔
756

757
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
758
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
759
{
45,538✔
760
    // Ignore the call if the session is not active
761
    if (m_state != State::Active) {
45,538✔
762
        return;
×
763
    }
×
764

765
    try {
45,538✔
766
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
45,538!
767
        if (simulate_integration_error) {
45,538✔
768
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
769
        }
×
770
        version_type client_version;
45,538✔
771
        if (REALM_LIKELY(!get_client().is_dry_run())) {
45,538✔
772
            VersionInfo version_info;
45,536✔
773
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
45,536✔
774
            client_version = version_info.realm_version;
45,536✔
775
        }
45,536✔
776
        else {
2✔
777
            // Fake it for "dry run" mode
778
            client_version = m_last_version_available + 1;
2✔
779
        }
2✔
780
        on_changesets_integrated(client_version, progress, !changesets.empty()); // Throws
45,538✔
781
    }
45,538✔
782
    catch (const IntegrationException& e) {
45,538✔
783
        on_integration_failure(e);
24✔
784
    }
24✔
785
}
45,538✔
786

787

788
void SessionImpl::on_upload_completion()
789
{
15,036✔
790
    // Ignore the call if the session is not active
791
    if (m_state == State::Active) {
15,036✔
792
        m_wrapper.on_upload_completion(); // Throws
15,036✔
793
    }
15,036✔
794
}
15,036✔
795

796

797
void SessionImpl::on_download_completion()
798
{
16,414✔
799
    // Ignore the call if the session is not active
800
    if (m_state == State::Active) {
16,414✔
801
        m_wrapper.on_download_completion(); // Throws
16,414✔
802
    }
16,414✔
803
}
16,414✔
804

805

806
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
807
{
690✔
808
    // Ignore the call if the session is not active
809
    if (m_state == State::Active) {
690✔
810
        m_wrapper.on_suspended(error_info); // Throws
690✔
811
    }
690✔
812
}
690✔
813

814

815
void SessionImpl::on_resumed()
816
{
90✔
817
    // Ignore the call if the session is not active
818
    if (m_state == State::Active) {
90✔
819
        m_wrapper.on_resumed(); // Throws
90✔
820
    }
90✔
821
}
90✔
822

823
void SessionImpl::handle_pending_client_reset_acknowledgement()
824
{
306✔
825
    // Ignore the call if the session is not active
826
    if (m_state == State::Active) {
306✔
827
        m_wrapper.handle_pending_client_reset_acknowledgement();
306✔
828
    }
306✔
829
}
306✔
830

831
void SessionImpl::update_subscription_version_info()
832
{
288✔
833
    // Ignore the call if the session is not active
834
    if (m_state == State::Active) {
288✔
835
        m_wrapper.update_subscription_version_info();
288✔
836
    }
288✔
837
}
288✔
838

839
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
840
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
841
{
47,820✔
842
    // Ignore the message if the session is not active or a steady state message
843
    if (m_state != State::Active || batch_state == DownloadBatchState::SteadyState) {
47,820✔
844
        return false;
45,538✔
845
    }
45,538✔
846

847
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,282✔
848
    util::Optional<SyncProgress> maybe_progress;
2,282✔
849
    if (batch_state == DownloadBatchState::LastInBatch) {
2,282✔
850
        maybe_progress = progress;
1,972✔
851
    }
1,972✔
852

853
    bool new_batch = false;
2,282✔
854
    try {
2,282✔
855
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
2,282✔
856
    }
2,282✔
857
    catch (const LogicError& ex) {
2,282✔
858
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
859
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
860
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
861
                                    ProtocolError::bad_changeset_size);
×
862
            on_integration_failure(ex);
×
863
            return true;
×
864
        }
×
865
        throw;
×
866
    }
×
867

868
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
869
    // bootstrapping.
870
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
2,282✔
871
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
72✔
872
    }
72✔
873

874
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
2,282✔
875
                                       batch_state, received_changesets.size());
2,282✔
876
    if (hook_action == SyncClientHookAction::EarlyReturn) {
2,282✔
877
        return true;
12✔
878
    }
12✔
879
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
2,270✔
880

881
    if (batch_state == DownloadBatchState::MoreToCome) {
2,270✔
882
        notify_download_progress(bootstrap_store->pending_stats().pending_changeset_bytes);
306✔
883
        return true;
306✔
884
    }
306✔
885
    else {
1,964✔
886
        // FIXME (#7451) this variable is not needed in principle, and bootstrap store bytes could be passed just
887
        // through notify_download_progress, but since it is needed in report_progress, and it is also called on
888
        // upload progress for now until progress is reported separately. As soon as we understand here that there
889
        // are no more changesets for bootstrap store, and we want to process bootstrap, we don't need to notify
890
        // intermediate progress - so reset these bytes to not accidentally double report them.
891
        m_wrapper.m_bootstrap_store_bytes.reset();
1,964✔
892
    }
1,964✔
893

894
    try {
1,964✔
895
        process_pending_flx_bootstrap();
1,964✔
896
    }
1,964✔
897
    catch (const IntegrationException& e) {
1,964✔
898
        on_integration_failure(e);
12✔
899
    }
12✔
900
    catch (...) {
1,964✔
901
        on_integration_failure(IntegrationException(exception_to_status()));
×
902
    }
×
903

904
    return true;
1,964✔
905
}
1,964✔
906

907

908
void SessionImpl::process_pending_flx_bootstrap()
909
{
12,324✔
910
    // Ignore the call if not a flx session or session is not active
911
    if (!m_is_flx_sync_session || m_state != State::Active) {
12,324✔
912
        return;
8,826✔
913
    }
8,826✔
914
    // Should never be called if session is not active
915
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,498✔
916
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,498✔
917
    if (!bootstrap_store->has_pending()) {
3,498✔
918
        return;
1,514✔
919
    }
1,514✔
920

921
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,984✔
922
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,984✔
923
                "changeset size: %3)",
1,984✔
924
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,984✔
925
                pending_batch_stats.pending_changeset_bytes);
1,984✔
926
    auto& history = get_repl().get_history();
1,984✔
927
    VersionInfo new_version;
1,984✔
928
    SyncProgress progress;
1,984✔
929
    int64_t query_version = -1;
1,984✔
930
    size_t changesets_processed = 0;
1,984✔
931

932
    // Used to commit each batch after it was transformed.
933
    TransactionRef transact = get_db()->start_write();
1,984✔
934
    while (bootstrap_store->has_pending()) {
4,168✔
935
        auto start_time = std::chrono::steady_clock::now();
2,196✔
936
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,196✔
937
        if (!pending_batch.progress) {
2,196✔
938
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
939
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
940
            // bootstrap store requires a write transaction itself.
941
            transact->close();
8✔
942
            bootstrap_store->clear();
8✔
943
            return;
8✔
944
        }
8✔
945

946
        auto batch_state =
2,188✔
947
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,188✔
948
        uint64_t downloadable_bytes = 0;
2,188✔
949
        query_version = pending_batch.query_version;
2,188✔
950
        bool simulate_integration_error =
2,188✔
951
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,188✔
952
        if (simulate_integration_error) {
2,188✔
953
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
954
        }
4✔
955

956
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,184✔
957
                        batch_state, pending_batch.changesets.size());
2,184✔
958

959
        history.integrate_server_changesets(
2,184✔
960
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
2,184✔
961
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,184✔
962
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,172✔
963
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,172✔
964
            });
2,172✔
965
        progress = *pending_batch.progress;
2,184✔
966
        changesets_processed += pending_batch.changesets.size();
2,184✔
967
        auto duration = std::chrono::steady_clock::now() - start_time;
2,184✔
968

969
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,184✔
970
                                      batch_state, pending_batch.changesets.size());
2,184✔
971
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,184✔
972

973
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,184✔
974
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,184✔
975
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
2,184✔
976
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
2,184✔
977
                    pending_batch.remaining_changesets);
2,184✔
978
    }
2,184✔
979
    on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
1,972✔
980

981
    REALM_ASSERT_3(query_version, !=, -1);
1,972✔
982
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,972✔
983

984
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,972✔
985
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,972✔
986
    // NoAction/EarlyReturn are both valid no-op actions to take here.
987
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,972✔
988
}
1,972✔
989

990
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
991
{
20✔
992
    // Ignore the call if the session is not active
993
    if (m_state == State::Active) {
20✔
994
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
995
    }
20✔
996
}
20✔
997

998
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
999
{
2,032✔
1000
    // Ignore the call if the session is not active
1001
    if (m_state == State::Active) {
2,032✔
1002
        m_wrapper.on_flx_sync_progress(version, batch_state);
2,032✔
1003
    }
2,032✔
1004
}
2,032✔
1005

1006
SubscriptionStore* SessionImpl::get_flx_subscription_store()
1007
{
19,004✔
1008
    // Should never be called if session is not active
1009
    REALM_ASSERT_EX(m_state == State::Active, m_state);
19,004✔
1010
    return m_wrapper.get_flx_subscription_store();
19,004✔
1011
}
19,004✔
1012

1013
MigrationStore* SessionImpl::get_migration_store()
1014
{
68,536✔
1015
    // Should never be called if session is not active
1016
    REALM_ASSERT_EX(m_state == State::Active, m_state);
68,536✔
1017
    return m_wrapper.get_migration_store();
68,536✔
1018
}
68,536✔
1019

1020
void SessionImpl::on_flx_sync_version_complete(int64_t version)
1021
{
292✔
1022
    // Ignore the call if the session is not active
1023
    if (m_state == State::Active) {
292✔
1024
        m_wrapper.on_flx_sync_version_complete(version);
292✔
1025
    }
292✔
1026
}
292✔
1027

1028
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
1029
{
2,650✔
1030
    // Should never be called if session is not active
1031
    REALM_ASSERT_EX(m_state == State::Active, m_state);
2,650✔
1032

1033
    // Make sure we don't call the debug hook recursively.
1034
    if (m_wrapper.m_in_debug_hook) {
2,650✔
1035
        return SyncClientHookAction::NoAction;
24✔
1036
    }
24✔
1037
    m_wrapper.m_in_debug_hook = true;
2,626✔
1038
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
2,626✔
1039
        m_wrapper.m_in_debug_hook = false;
2,626✔
1040
    });
2,626✔
1041

1042
    auto action = m_wrapper.m_debug_hook(data);
2,626✔
1043
    switch (action) {
2,626✔
1044
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1045
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1046
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1047

1048
            auto err_processing_err = receive_error_message(err_info);
12✔
1049
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1050
            return SyncClientHookAction::EarlyReturn;
12✔
1051
        }
×
1052
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1053
            get_connection().voluntary_disconnect();
24✔
1054
            return SyncClientHookAction::EarlyReturn;
24✔
1055
        }
×
1056
        default:
2,582✔
1057
            return action;
2,582✔
1058
    }
2,626✔
1059
}
2,626✔
1060

1061
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1062
                                                  int64_t query_version, DownloadBatchState batch_state,
1063
                                                  size_t num_changesets)
1064
{
121,464✔
1065
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
121,464✔
1066
        return SyncClientHookAction::NoAction;
119,046✔
1067
    }
119,046✔
1068
    if (REALM_UNLIKELY(m_state != State::Active)) {
2,418✔
1069
        return SyncClientHookAction::NoAction;
×
1070
    }
×
1071

1072
    SyncClientHookData data;
2,418✔
1073
    data.event = event;
2,418✔
1074
    data.batch_state = batch_state;
2,418✔
1075
    data.progress = progress;
2,418✔
1076
    data.num_changesets = num_changesets;
2,418✔
1077
    data.query_version = query_version;
2,418✔
1078

1079
    return call_debug_hook(data);
2,418✔
1080
}
2,418✔
1081

1082
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1083
{
1,420✔
1084
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,420✔
1085
        return SyncClientHookAction::NoAction;
1,184✔
1086
    }
1,184✔
1087
    if (REALM_UNLIKELY(m_state != State::Active)) {
236✔
1088
        return SyncClientHookAction::NoAction;
×
1089
    }
×
1090

1091
    SyncClientHookData data;
236✔
1092
    data.event = event;
236✔
1093
    data.batch_state = DownloadBatchState::SteadyState;
236✔
1094
    data.progress = m_progress;
236✔
1095
    data.num_changesets = 0;
236✔
1096
    data.query_version = 0;
236✔
1097
    data.error_info = &error_info;
236✔
1098

1099
    return call_debug_hook(data);
236✔
1100
}
236✔
1101

1102
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version,
1103
                                                   bool same_remote_versions)
1104
{
47,834✔
1105
    // Should never be called if session is not active
1106
    REALM_ASSERT_EX(m_state == State::Active, m_state);
47,834✔
1107
    // Query version should always be the same or increasing
1108
    REALM_ASSERT_3(query_version, >=, m_wrapper.m_flx_active_version);
47,834✔
1109

1110
    // Return early if already steady state or PBS (doesn't use bootstraps)
1111
    if (batch_state == DownloadBatchState::SteadyState || !m_is_flx_sync_session) {
47,834✔
1112
        return true; // Steady state
44,130✔
1113
    }
44,130✔
1114

1115
    // Bootstrap messages (i.e. non-steady state) are identified by:
1116
    // * DownloadBatchState of MoreToCome
1117
    // * DownloadBatchState of LastInBatch, and
1118
    //   * first LastInBatch=true after one or more MoreToCome messages
1119
    //   * query_version greater than the active query_version
1120
    //   * LastInBatch=true message has 2 or more changesets and all have the same
1121
    //     remote_version
1122

1123
    // LastInBatch=False messages are always a bootstrap message
1124
    if (batch_state == DownloadBatchState::MoreToCome) {
3,704✔
1125
        return false; // Not steady state
314✔
1126
    }
314✔
1127

1128
    // Messages with query_version greater than the active version are always a
1129
    // bootstrap message
1130
    if (query_version > m_wrapper.m_flx_active_version) {
3,390✔
1131
        return false; // Not steady state
1,974✔
1132
    }
1,974✔
1133

1134
    // If this is the first LastInBatch=true message after one or more
1135
    // LastInBatch=false messages, this is the end of a bootstrap
1136
    if (m_last_download_batch_state == DownloadBatchState::MoreToCome) {
1,416✔
1137
        return false; // Not steady state
4✔
1138
    }
4✔
1139

1140
    // Otherwise, if this is a LastInBatch=true message whose query version matches
1141
    // the current active version and all the changesets in the message have the
1142
    // same remote version, then this is a server initiated single message bootstrap
1143
    if (same_remote_versions) {
1,412✔
1144
        return false; // Not steady state
×
1145
    }
×
1146

1147
    // If none of the previous checks were successful, then this is a steady state msg
1148
    return true; // Steady state
1,412✔
1149
}
1,412✔
1150

1151
void SessionImpl::init_progress_handler()
1152
{
10,648✔
1153
    if (m_state != State::Unactivated && m_state != State::Active)
10,648✔
1154
        return;
×
1155

1156
    m_wrapper.init_progress_handler();
10,648✔
1157
}
10,648✔
1158

1159
void SessionImpl::enable_progress_notifications()
1160
{
46,206✔
1161
    m_wrapper.m_reliable_download_progress = true;
46,206✔
1162
}
46,206✔
1163

1164
void SessionImpl::notify_upload_progress()
1165
{
34,524✔
1166
    if (m_state != State::Active)
34,524✔
1167
        return;
×
1168

1169
    m_wrapper.on_upload_progress();
34,524✔
1170
}
34,524✔
1171

1172
void SessionImpl::update_download_estimate(double download_estimate)
1173
{
3,692✔
1174
    if (m_state != State::Active)
3,692✔
1175
        return;
×
1176

1177
    m_wrapper.m_download_estimate = download_estimate;
3,692✔
1178
}
3,692✔
1179

1180
void SessionImpl::notify_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
1181
{
27,962✔
1182
    if (m_state != State::Active)
27,962✔
1183
        return;
×
1184

1185
    m_wrapper.on_download_progress(bootstrap_store_bytes); // Throws
27,962✔
1186
}
27,962✔
1187

1188
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1189
{
60✔
1190
    if (m_state != State::Active) {
60✔
1191
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1192
    }
×
1193

1194
    try {
60✔
1195
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1196
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1197
            return Status{ErrorCodes::LogicError,
4✔
1198
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1199
        }
4✔
1200
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
NEW
1201
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
NEW
1202
        }
×
1203
    }
56✔
1204
    catch (const nlohmann::json::parse_error& e) {
60✔
1205
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1206
    }
4✔
1207

1208
    auto pf = util::make_promise_future<std::string>();
52✔
1209

1210
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1211
        // Includes operation_aborted
1212
        if (!status.is_ok()) {
52✔
1213
            promise.set_error(status);
×
1214
            return;
×
1215
        }
×
1216

1217
        auto id = ++m_last_pending_test_command_ident;
52✔
1218
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1219
        ensure_enlisted_to_send();
52✔
1220
    });
52✔
1221

1222
    return std::move(pf.future);
52✔
1223
}
60✔
1224

1225
// ################ SessionWrapper ################
1226

1227
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1228
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1229
// the ClientImpl::Connection that owns the ClientImpl::Session.
1230
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1231
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1232
    : m_client{client}
5,676✔
1233
    , m_db(std::move(db))
5,676✔
1234
    , m_replication(m_db->get_replication())
5,676✔
1235
    , m_protocol_envelope{config.protocol_envelope}
5,676✔
1236
    , m_server_address{std::move(config.server_address)}
5,676✔
1237
    , m_server_port{config.server_port}
5,676✔
1238
    , m_server_verified{config.server_verified}
5,676✔
1239
    , m_user_id(std::move(config.user_id))
5,676✔
1240
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
5,676✔
1241
    , m_authorization_header_name{config.authorization_header_name}
5,676✔
1242
    , m_custom_http_headers{config.custom_http_headers}
5,676✔
1243
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
5,676✔
1244
    , m_simulate_integration_error{config.simulate_integration_error}
5,676✔
1245
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
5,676✔
1246
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
5,676✔
1247
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
5,676✔
1248
    , m_http_request_path_prefix{std::move(config.service_identifier)}
5,676✔
1249
    , m_virt_path{std::move(config.realm_identifier)}
5,676✔
1250
    , m_signed_access_token{std::move(config.signed_user_token)}
5,676✔
1251
    , m_client_reset_config{std::move(config.client_reset_config)}
5,676✔
1252
    , m_proxy_config{config.proxy_config} // Throws
5,676✔
1253
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
5,676✔
1254
    , m_session_reason(config.session_reason)
5,676✔
1255
    , m_schema_version(config.schema_version)
5,676✔
1256
    , m_flx_subscription_store(std::move(flx_sub_store))
5,676✔
1257
    , m_migration_store(std::move(migration_store))
5,676✔
1258
{
11,696✔
1259
    REALM_ASSERT(m_db);
11,696✔
1260
    REALM_ASSERT(m_db->get_replication());
11,696✔
1261
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
11,696✔
1262
    if (m_client_reset_config) {
11,696✔
1263
        m_session_reason = SessionReason::ClientReset;
372✔
1264
    }
372✔
1265
}
11,696✔
1266

1267
SessionWrapper::~SessionWrapper() noexcept
1268
{
11,684✔
1269
    if (m_db && m_actualized) {
11,684✔
1270
        m_db->remove_commit_listener(this);
176✔
1271
        m_db->release_sync_agent();
176✔
1272
    }
176✔
1273
}
11,684✔
1274

1275

1276
inline ClientReplication& SessionWrapper::get_replication() noexcept
1277
{
121,362✔
1278
    REALM_ASSERT(m_db);
121,362✔
1279
    return static_cast<ClientReplication&>(*m_replication);
121,362✔
1280
}
121,362✔
1281

1282

1283
inline ClientImpl& SessionWrapper::get_client() noexcept
1284
{
72✔
1285
    return m_client;
72✔
1286
}
72✔
1287

1288
bool SessionWrapper::has_flx_subscription_store() const
1289
{
2,032✔
1290
    return static_cast<bool>(m_flx_subscription_store);
2,032✔
1291
}
2,032✔
1292

1293
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1294
{
20✔
1295
    REALM_ASSERT(!m_finalized);
20✔
1296
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1297
}
20✔
1298

1299
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1300
{
2,248✔
1301
    REALM_ASSERT(!m_finalized);
2,248✔
1302
    m_flx_last_seen_version = version;
2,248✔
1303
    m_flx_active_version = version;
2,248✔
1304
}
2,248✔
1305

1306
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1307
{
2,032✔
1308
    if (!has_flx_subscription_store()) {
2,032✔
1309
        return;
×
1310
    }
×
1311
    REALM_ASSERT(!m_finalized);
2,032✔
1312
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
2,032✔
1313
    REALM_ASSERT(new_version >= m_flx_active_version);
2,032✔
1314
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
2,032✔
1315

1316
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
2,032✔
1317

1318
    switch (batch_state) {
2,032✔
1319
        case DownloadBatchState::SteadyState:
✔
1320
            // Cannot be called with this value.
1321
            REALM_UNREACHABLE();
1322
        case DownloadBatchState::LastInBatch:
1,960✔
1323
            if (m_flx_active_version == new_version) {
1,960✔
1324
                return;
4✔
1325
            }
4✔
1326
            on_flx_sync_version_complete(new_version);
1,956✔
1327
            if (new_version == 0) {
1,956✔
1328
                new_state = SubscriptionSet::State::Complete;
924✔
1329
            }
924✔
1330
            else {
1,032✔
1331
                new_state = SubscriptionSet::State::AwaitingMark;
1,032✔
1332
                m_flx_pending_mark_version = new_version;
1,032✔
1333
            }
1,032✔
1334
            break;
1,956✔
1335
        case DownloadBatchState::MoreToCome:
72✔
1336
            if (m_flx_last_seen_version == new_version) {
72✔
1337
                return;
4✔
1338
            }
4✔
1339

1340
            m_flx_last_seen_version = new_version;
68✔
1341
            new_state = SubscriptionSet::State::Bootstrapping;
68✔
1342
            break;
68✔
1343
    }
2,032✔
1344

1345
    get_flx_subscription_store()->update_state(new_version, new_state);
2,024✔
1346
}
2,024✔
1347

1348
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1349
{
21,048✔
1350
    REALM_ASSERT(!m_finalized);
21,048✔
1351
    return m_flx_subscription_store.get();
21,048✔
1352
}
21,048✔
1353

1354
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1355
{
5,780✔
1356
    REALM_ASSERT(!m_finalized);
5,780✔
1357
    return m_flx_pending_bootstrap_store.get();
5,780✔
1358
}
5,780✔
1359

1360
MigrationStore* SessionWrapper::get_migration_store()
1361
{
68,536✔
1362
    REALM_ASSERT(!m_finalized);
68,536✔
1363
    return m_migration_store.get();
68,536✔
1364
}
68,536✔
1365

1366
inline void SessionWrapper::mark_initiated()
1367
{
10,544✔
1368
    REALM_ASSERT(!m_initiated);
10,544✔
1369
    REALM_ASSERT(!m_abandoned);
10,544✔
1370
    m_initiated = true;
10,544✔
1371
}
10,544✔
1372

1373

1374
inline void SessionWrapper::mark_abandoned()
1375
{
10,544✔
1376
    REALM_ASSERT(!m_abandoned);
10,544✔
1377
    m_abandoned = true;
10,544✔
1378
}
10,544✔
1379

1380

1381
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1382
{
4,148✔
1383
    REALM_ASSERT(!m_initiated);
4,148✔
1384
    m_progress_handler = std::move(handler);
4,148✔
1385
}
4,148✔
1386

1387

1388
inline void
1389
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1390
{
11,800✔
1391
    REALM_ASSERT(!m_initiated);
11,800✔
1392
    m_connection_state_change_listener = std::move(listener);
11,800✔
1393
}
11,800✔
1394

1395

1396
void SessionWrapper::initiate()
1397
{
10,542✔
1398
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,542✔
1399
                                   m_user_id,           m_sync_mode,      m_server_verified};
10,542✔
1400
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
10,542✔
1401
    m_db->add_commit_listener(this);
10,542✔
1402
}
10,542✔
1403

1404

1405
void SessionWrapper::on_commit(version_type new_version)
1406
{
114,834✔
1407
    // Thread safety required
1408
    REALM_ASSERT(m_initiated);
114,834✔
1409

1410
    util::bind_ptr<SessionWrapper> self{this};
114,834✔
1411
    m_client.post([self = std::move(self), new_version](Status status) {
114,834✔
1412
        if (status == ErrorCodes::OperationAborted)
114,828✔
1413
            return;
×
1414
        else if (!status.is_ok())
114,828✔
1415
            throw Exception(status);
×
1416

1417
        REALM_ASSERT(self->m_actualized);
114,828✔
1418
        if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
114,828✔
1419
            return;
896✔
1420
        }
896✔
1421

1422
        if (REALM_UNLIKELY(!self->m_sess))
113,932✔
1423
            return; // Already finalized
×
1424
        SessionImpl& sess = *self->m_sess;
113,932✔
1425
        sess.recognize_sync_version(new_version);                           // Throws
113,932✔
1426
        self->on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
113,932✔
1427
    });
113,932✔
1428
}
114,834✔
1429

1430

1431
void SessionWrapper::cancel_reconnect_delay()
1432
{
20✔
1433
    // Thread safety required
1434
    REALM_ASSERT(m_initiated);
20✔
1435

1436
    util::bind_ptr<SessionWrapper> self{this};
20✔
1437
    m_client.post([self = std::move(self)](Status status) {
20✔
1438
        if (status == ErrorCodes::OperationAborted)
20✔
1439
            return;
×
1440
        else if (!status.is_ok())
20✔
1441
            throw Exception(status);
×
1442

1443
        REALM_ASSERT(self->m_actualized);
20✔
1444
        if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
20✔
1445
            return;
×
1446
        }
×
1447

1448
        if (REALM_UNLIKELY(!self->m_sess))
20✔
1449
            return; // Already finalized
×
1450
        SessionImpl& sess = *self->m_sess;
20✔
1451
        sess.cancel_resumption_delay(); // Throws
20✔
1452
        ClientImpl::Connection& conn = sess.get_connection();
20✔
1453
        conn.cancel_reconnect_delay(); // Throws
20✔
1454
    });                                // Throws
20✔
1455
}
20✔
1456

1457
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1458
                                    WaitOperCompletionHandler handler)
1459
{
5,358✔
1460
    REALM_ASSERT(upload_completion || download_completion);
5,358✔
1461
    REALM_ASSERT(m_initiated);
5,358✔
1462

1463
    util::bind_ptr<SessionWrapper> self{this};
5,358✔
1464
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
5,358✔
1465
                   download_completion](Status status) mutable {
5,358✔
1466
        if (status == ErrorCodes::OperationAborted)
5,358✔
1467
            return;
×
1468
        else if (!status.is_ok())
5,358✔
1469
            throw Exception(status);
×
1470

1471
        REALM_ASSERT(self->m_actualized);
5,358✔
1472
        if (REALM_UNLIKELY(!self->m_sess)) {
5,358✔
1473
            // Already finalized
1474
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
66✔
1475
            return;
66✔
1476
        }
66✔
1477
        if (upload_completion) {
5,292✔
1478
            if (download_completion) {
2,692✔
1479
                // Wait for upload and download completion
1480
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
298✔
1481
            }
298✔
1482
            else {
2,394✔
1483
                // Wait for upload completion only
1484
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,394✔
1485
            }
2,394✔
1486
        }
2,692✔
1487
        else {
2,600✔
1488
            // Wait for download completion only
1489
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,600✔
1490
        }
2,600✔
1491
        SessionImpl& sess = *self->m_sess;
5,292✔
1492
        if (upload_completion)
5,292✔
1493
            sess.request_upload_completion_notification(); // Throws
2,692✔
1494
        if (download_completion)
5,292✔
1495
            sess.request_download_completion_notification(); // Throws
2,898✔
1496
    });                                                      // Throws
5,292✔
1497
}
5,358✔
1498

1499

1500
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1501
{
13,024✔
1502
    // Thread safety required
1503
    REALM_ASSERT(m_initiated);
13,024✔
1504
    REALM_ASSERT(!m_abandoned);
13,024✔
1505

1506
    std::int_fast64_t target_mark;
13,024✔
1507
    {
13,024✔
1508
        std::lock_guard lock{m_client.m_mutex};
13,024✔
1509
        target_mark = ++m_target_upload_mark;
13,024✔
1510
    }
13,024✔
1511

1512
    util::bind_ptr<SessionWrapper> self{this};
13,024✔
1513
    m_client.post([self = std::move(self), target_mark](Status status) {
13,024✔
1514
        if (status == ErrorCodes::OperationAborted)
13,024✔
1515
            return;
×
1516
        else if (!status.is_ok())
13,024✔
1517
            throw Exception(status);
×
1518

1519
        REALM_ASSERT(self->m_actualized);
13,024✔
1520
        REALM_ASSERT(!self->m_finalized);
13,024✔
1521
        // The session wrapper may already have been finalized. This can only
1522
        // happen if it was abandoned, but in that case, the call of
1523
        // wait_for_upload_complete_or_client_stopped() must have returned
1524
        // already.
1525
        if (REALM_UNLIKELY(!self->m_sess))
13,024✔
1526
            return;
24✔
1527
        if (target_mark > self->m_staged_upload_mark) {
13,000✔
1528
            self->m_staged_upload_mark = target_mark;
13,000✔
1529
            SessionImpl& sess = *self->m_sess;
13,000✔
1530
            sess.request_upload_completion_notification(); // Throws
13,000✔
1531
        }
13,000✔
1532
    }); // Throws
13,000✔
1533

1534
    bool completion_condition_was_satisfied;
13,024✔
1535
    {
13,024✔
1536
        std::unique_lock lock{m_client.m_mutex};
13,024✔
1537
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
32,996✔
1538
            m_client.m_wait_or_client_stopped_cond.wait(lock);
19,972✔
1539
        completion_condition_was_satisfied = !m_client.m_stopped;
13,024✔
1540
    }
13,024✔
1541
    return completion_condition_was_satisfied;
13,024✔
1542
}
13,024✔
1543

1544

1545
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1546
{
10,120✔
1547
    // Thread safety required
1548
    REALM_ASSERT(m_initiated);
10,120✔
1549
    REALM_ASSERT(!m_abandoned);
10,120✔
1550

1551
    std::int_fast64_t target_mark;
10,120✔
1552
    {
10,120✔
1553
        std::lock_guard lock{m_client.m_mutex};
10,120✔
1554
        target_mark = ++m_target_download_mark;
10,120✔
1555
    }
10,120✔
1556

1557
    util::bind_ptr<SessionWrapper> self{this};
10,120✔
1558
    m_client.post([self = std::move(self), target_mark](Status status) {
10,120✔
1559
        if (status == ErrorCodes::OperationAborted)
10,120✔
1560
            return;
×
1561
        else if (!status.is_ok())
10,120✔
1562
            throw Exception(status);
×
1563

1564
        REALM_ASSERT(self->m_actualized);
10,120✔
1565
        REALM_ASSERT(!self->m_finalized);
10,120✔
1566
        // The session wrapper may already have been finalized. This can only
1567
        // happen if it was abandoned, but in that case, the call of
1568
        // wait_for_download_complete_or_client_stopped() must have returned
1569
        // already.
1570
        if (REALM_UNLIKELY(!self->m_sess))
10,120✔
1571
            return;
60✔
1572
        if (target_mark > self->m_staged_download_mark) {
10,060✔
1573
            self->m_staged_download_mark = target_mark;
10,058✔
1574
            SessionImpl& sess = *self->m_sess;
10,058✔
1575
            sess.request_download_completion_notification(); // Throws
10,058✔
1576
        }
10,058✔
1577
    }); // Throws
10,060✔
1578

1579
    bool completion_condition_was_satisfied;
10,120✔
1580
    {
10,120✔
1581
        std::unique_lock lock{m_client.m_mutex};
10,120✔
1582
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,506✔
1583
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,386✔
1584
        completion_condition_was_satisfied = !m_client.m_stopped;
10,120✔
1585
    }
10,120✔
1586
    return completion_condition_was_satisfied;
10,120✔
1587
}
10,120✔
1588

1589

1590
void SessionWrapper::refresh(std::string_view signed_access_token)
1591
{
216✔
1592
    // Thread safety required
1593
    REALM_ASSERT(m_initiated);
216✔
1594
    REALM_ASSERT(!m_abandoned);
216✔
1595

1596
    m_client.post([self = util::bind_ptr(this), token = std::string(signed_access_token)](Status status) {
216✔
1597
        if (status == ErrorCodes::OperationAborted)
216✔
1598
            return;
×
1599
        else if (!status.is_ok())
216✔
1600
            throw Exception(status);
×
1601

1602
        REALM_ASSERT(self->m_actualized);
216✔
1603
        if (REALM_UNLIKELY(!self->m_sess))
216✔
1604
            return; // Already finalized
×
1605
        self->m_signed_access_token = std::move(token);
216✔
1606
        SessionImpl& sess = *self->m_sess;
216✔
1607
        ClientImpl::Connection& conn = sess.get_connection();
216✔
1608
        // FIXME: This only makes sense when each session uses a separate connection.
1609
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
216✔
1610
        sess.cancel_resumption_delay();                                                          // Throws
216✔
1611
        conn.cancel_reconnect_delay();                                                           // Throws
216✔
1612
    });
216✔
1613
}
216✔
1614

1615

1616
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1617
{
11,696✔
1618
    if (wrapper->m_initiated) {
11,696✔
1619
        ClientImpl& client = wrapper->m_client;
10,544✔
1620
        client.register_abandoned_session_wrapper(std::move(wrapper));
10,544✔
1621
    }
10,544✔
1622
}
11,696✔
1623

1624

1625
// Must be called from event loop thread
1626
void SessionWrapper::actualize(ServerEndpoint endpoint)
1627
{
10,362✔
1628
    REALM_ASSERT_DEBUG(m_initiated);
10,362✔
1629
    REALM_ASSERT(!m_actualized);
10,362✔
1630
    REALM_ASSERT(!m_sess);
10,362✔
1631
    // Cannot be actualized if it's already been finalized or force closed
1632
    REALM_ASSERT(!m_finalized);
10,362✔
1633
    REALM_ASSERT(!m_force_closed);
10,362✔
1634
    try {
10,362✔
1635
        m_db->claim_sync_agent();
10,362✔
1636
    }
10,362✔
1637
    catch (const MultipleSyncAgents&) {
10,362✔
1638
        finalize_before_actualization();
4✔
1639
        throw;
4✔
1640
    }
4✔
1641
    auto sync_mode = endpoint.server_mode;
10,360✔
1642

1643
    bool was_created = false;
10,360✔
1644
    ClientImpl::Connection& conn = m_client.get_connection(
10,360✔
1645
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,360✔
1646
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,360✔
1647
        was_created); // Throws
10,360✔
1648
    try {
10,360✔
1649
        // FIXME: This only makes sense when each session uses a separate connection.
1650
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,360✔
1651
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,360✔
1652
        if (sync_mode == SyncServerMode::FLX) {
10,360✔
1653
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,534✔
1654
        }
1,534✔
1655

1656
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,360✔
1657
        m_sess = sess.get();
10,360✔
1658
        conn.activate_session(std::move(sess)); // Throws
10,360✔
1659
    }
10,360✔
1660
    catch (...) {
10,360✔
1661
        if (was_created)
×
1662
            m_client.remove_connection(conn);
×
1663

1664
        // finalize_before_actualization() expects m_sess to be nullptr, but it's possible that we
1665
        // reached its assignment above before throwing. Unset it here so we get a clean unhandled
1666
        // exception failure instead of a REALM_ASSERT in finalize_before_actualization().
1667
        m_sess = nullptr;
×
1668
        finalize_before_actualization();
×
1669
        throw;
×
1670
    }
×
1671

1672
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1673
    // session cannot change the state of the bootstrap store at the same time.
1674
    update_subscription_version_info();
10,358✔
1675

1676
    m_actualized = true;
10,358✔
1677
    if (was_created)
10,358✔
1678
        conn.activate(); // Throws
2,778✔
1679

1680
    if (m_connection_state_change_listener) {
10,358✔
1681
        ConnectionState state = conn.get_state();
10,348✔
1682
        if (state != ConnectionState::disconnected) {
10,348✔
1683
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,422✔
1684
            if (state == ConnectionState::connected)
7,422✔
1685
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
7,184✔
1686
        }
7,422✔
1687
    }
10,348✔
1688

1689
    if (!m_client_reset_config)
10,358✔
1690
        on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
9,988✔
1691
}
10,358✔
1692

1693
void SessionWrapper::force_close()
1694
{
102✔
1695
    if (m_force_closed || m_finalized) {
102✔
1696
        return;
×
1697
    }
×
1698
    REALM_ASSERT(m_actualized);
102✔
1699
    REALM_ASSERT(m_sess);
102✔
1700
    m_force_closed = true;
102✔
1701

1702
    ClientImpl::Connection& conn = m_sess->get_connection();
102✔
1703
    conn.initiate_session_deactivation(m_sess); // Throws
102✔
1704

1705
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1706
    m_flx_pending_bootstrap_store.reset();
102✔
1707
    // Clear the subscription and migration store refs since they are owned by SyncSession
1708
    m_flx_subscription_store.reset();
102✔
1709
    m_migration_store.reset();
102✔
1710
    m_sess = nullptr;
102✔
1711
    // Everything is being torn down, no need to report connection state anymore
1712
    m_connection_state_change_listener = {};
102✔
1713
}
102✔
1714

1715
// Must be called from event loop thread
1716
void SessionWrapper::finalize()
1717
{
10,354✔
1718
    REALM_ASSERT(m_actualized);
10,354✔
1719
    REALM_ASSERT(m_abandoned);
10,354✔
1720

1721
    // Already finalized?
1722
    if (m_finalized) {
10,354✔
1723
        return;
×
1724
    }
×
1725

1726
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
1727
    m_db->remove_commit_listener(this);
10,354✔
1728

1729
    m_finalized = true;
10,354✔
1730

1731
    if (!m_force_closed) {
10,354✔
1732
        REALM_ASSERT(m_sess);
10,246✔
1733
        ClientImpl::Connection& conn = m_sess->get_connection();
10,246✔
1734
        conn.initiate_session_deactivation(m_sess); // Throws
10,246✔
1735

1736
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1737
        m_flx_pending_bootstrap_store.reset();
10,246✔
1738
        // Clear the subscription and migration store refs since they are owned by SyncSession
1739
        m_flx_subscription_store.reset();
10,246✔
1740
        m_migration_store.reset();
10,246✔
1741
        m_sess = nullptr;
10,246✔
1742
    }
10,246✔
1743

1744
    // The Realm file can be closed now, as no access to the Realm file is
1745
    // supposed to happen on behalf of a session after initiation of
1746
    // deactivation.
1747
    m_db->release_sync_agent();
10,354✔
1748
    m_db = nullptr;
10,354✔
1749

1750
    // All outstanding wait operations must be canceled
1751
    while (!m_upload_completion_handlers.empty()) {
10,784✔
1752
        auto handler = std::move(m_upload_completion_handlers.back());
430✔
1753
        m_upload_completion_handlers.pop_back();
430✔
1754
        handler(
430✔
1755
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
430✔
1756
    }
430✔
1757
    while (!m_download_completion_handlers.empty()) {
10,582✔
1758
        auto handler = std::move(m_download_completion_handlers.back());
228✔
1759
        m_download_completion_handlers.pop_back();
228✔
1760
        handler(
228✔
1761
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
228✔
1762
    }
228✔
1763
    while (!m_sync_completion_handlers.empty()) {
10,366✔
1764
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1765
        m_sync_completion_handlers.pop_back();
12✔
1766
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
12✔
1767
    }
12✔
1768
}
10,354✔
1769

1770

1771
// Must be called only when an unactualized session wrapper becomes abandoned.
1772
//
1773
// Called with a lock on `m_client.m_mutex`.
1774
inline void SessionWrapper::finalize_before_actualization() noexcept
1775
{
184✔
1776
    REALM_ASSERT(!m_sess);
184✔
1777
    m_actualized = true;
184✔
1778
    m_force_closed = true;
184✔
1779
}
184✔
1780

1781
inline void SessionWrapper::on_upload_progress(bool only_if_new_uploadable_data)
1782
{
158,444✔
1783
    REALM_ASSERT(!m_finalized);
158,444✔
1784
    report_progress(/* is_download = */ false, only_if_new_uploadable_data); // Throws
158,444✔
1785
}
158,444✔
1786

1787
inline void SessionWrapper::on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
1788
{
27,962✔
1789
    REALM_ASSERT(!m_finalized);
27,962✔
1790
    m_bootstrap_store_bytes = bootstrap_store_bytes;
27,962✔
1791
    report_progress(/* is_download = */ true); // Throws
27,962✔
1792
}
27,962✔
1793

1794

1795
void SessionWrapper::on_upload_completion()
1796
{
15,036✔
1797
    REALM_ASSERT(!m_finalized);
15,036✔
1798
    while (!m_upload_completion_handlers.empty()) {
17,090✔
1799
        auto handler = std::move(m_upload_completion_handlers.back());
2,054✔
1800
        m_upload_completion_handlers.pop_back();
2,054✔
1801
        handler(Status::OK()); // Throws
2,054✔
1802
    }
2,054✔
1803
    while (!m_sync_completion_handlers.empty()) {
15,232✔
1804
        auto handler = std::move(m_sync_completion_handlers.back());
196✔
1805
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
196✔
1806
        m_sync_completion_handlers.pop_back();
196✔
1807
    }
196✔
1808
    std::lock_guard lock{m_client.m_mutex};
15,036✔
1809
    if (m_staged_upload_mark > m_reached_upload_mark) {
15,036✔
1810
        m_reached_upload_mark = m_staged_upload_mark;
12,990✔
1811
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,990✔
1812
    }
12,990✔
1813
}
15,036✔
1814

1815

1816
void SessionWrapper::on_download_completion()
1817
{
16,414✔
1818
    while (!m_download_completion_handlers.empty()) {
18,970✔
1819
        auto handler = std::move(m_download_completion_handlers.back());
2,556✔
1820
        m_download_completion_handlers.pop_back();
2,556✔
1821
        handler(Status::OK()); // Throws
2,556✔
1822
    }
2,556✔
1823
    while (!m_sync_completion_handlers.empty()) {
16,504✔
1824
        auto handler = std::move(m_sync_completion_handlers.back());
90✔
1825
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
90✔
1826
        m_sync_completion_handlers.pop_back();
90✔
1827
    }
90✔
1828

1829
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
16,414✔
1830
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
904✔
1831
                             m_flx_pending_mark_version);
904✔
1832
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
904✔
1833
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
904✔
1834
    }
904✔
1835

1836
    std::lock_guard lock{m_client.m_mutex};
16,414✔
1837
    if (m_staged_download_mark > m_reached_download_mark) {
16,414✔
1838
        m_reached_download_mark = m_staged_download_mark;
9,990✔
1839
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,990✔
1840
    }
9,990✔
1841
}
16,414✔
1842

1843

1844
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1845
{
690✔
1846
    REALM_ASSERT(!m_finalized);
690✔
1847
    m_suspended = true;
690✔
1848
    if (m_connection_state_change_listener) {
690✔
1849
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
690✔
1850
    }
690✔
1851
}
690✔
1852

1853

1854
void SessionWrapper::on_resumed()
1855
{
90✔
1856
    REALM_ASSERT(!m_finalized);
90✔
1857
    m_suspended = false;
90✔
1858
    if (m_connection_state_change_listener) {
90✔
1859
        ClientImpl::Connection& conn = m_sess->get_connection();
90✔
1860
        if (conn.get_state() != ConnectionState::disconnected) {
90✔
1861
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
82✔
1862
            if (conn.get_state() == ConnectionState::connected)
82✔
1863
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
76✔
1864
        }
82✔
1865
    }
90✔
1866
}
90✔
1867

1868

1869
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1870
                                                 const util::Optional<SessionErrorInfo>& error_info)
1871
{
11,962✔
1872
    if (m_connection_state_change_listener) {
11,962✔
1873
        if (!m_suspended)
11,946✔
1874
            m_connection_state_change_listener(state, error_info); // Throws
11,926✔
1875
    }
11,946✔
1876
}
11,962✔
1877

1878
void SessionWrapper::init_progress_handler()
1879
{
10,648✔
1880
    uint64_t unused = 0;
10,648✔
1881
    ClientHistory::get_upload_download_bytes(m_db.get(), m_reported_progress.final_downloaded, unused,
10,648✔
1882
                                             m_reported_progress.final_uploaded, unused, unused);
10,648✔
1883
}
10,648✔
1884

1885
void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadable_data)
1886
{
186,408✔
1887
    REALM_ASSERT(!m_finalized);
186,408✔
1888
    REALM_ASSERT(m_sess);
186,408✔
1889
    REALM_ASSERT(!(only_if_new_uploadable_data && is_download));
186,408✔
1890

1891
    if (!m_progress_handler)
186,408✔
1892
        return;
130,774✔
1893

1894
    // Ignore progress messages from before we first receive a DOWNLOAD message
1895
    if (!m_reliable_download_progress)
55,634✔
1896
        return;
27,650✔
1897

1898
    ReportedProgress p = m_reported_progress;
27,984✔
1899
    ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, p.downloadable, p.uploaded, p.uploadable,
27,984✔
1900
                                             p.snapshot);
27,984✔
1901

1902
    // If this progress notification was triggered by a commit being made we
1903
    // only want to send it if the uploadable bytes has actually increased,
1904
    // and not if it was an empty commit.
1905
    if (only_if_new_uploadable_data && m_reported_progress.uploadable == p.uploadable)
27,984✔
1906
        return;
18,954✔
1907

1908
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1909
    // is only the remaining to download. This is confusing, so make them use
1910
    // the same units.
1911
    p.downloadable += p.downloaded;
9,030✔
1912

1913
    bool is_completed = false;
9,030✔
1914
    if (is_download) {
9,030✔
1915
        if (m_download_estimate)
3,658✔
1916
            is_completed = *m_download_estimate >= 1.0;
1,512✔
1917
        else
2,146✔
1918
            is_completed = p.downloaded == p.downloadable;
2,146✔
1919
    }
3,658✔
1920
    else {
5,372✔
1921
        is_completed = p.uploaded == p.uploadable;
5,372✔
1922
    }
5,372✔
1923

1924
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
9,854✔
1925
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
9,854✔
1926
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
9,854✔
1927

1928
        // The effect of this calculation is that if new bytes are added for download/upload,
1929
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1930
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1931
        // before progress has reached 1.0.
1932
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1933
        // Example for upload progress reported:
1934
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1935

1936
        double progress_estimate = 1.0;
9,854✔
1937
        if (final_transferred < transferable && transferred < transferable)
9,854✔
1938
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
4,052✔
1939
        return progress_estimate;
9,854✔
1940
    };
9,854✔
1941

1942
    double upload_estimate = 1.0, download_estimate = 1.0;
9,030✔
1943

1944
    // calculate estimate for both download/upload since the progress is reported all at once
1945
    if (!is_completed || is_download)
9,030✔
1946
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, p.final_uploaded);
6,110✔
1947

1948
    // download estimate only known for flx
1949
    if (m_download_estimate) {
9,030✔
1950
        download_estimate = *m_download_estimate;
3,242✔
1951

1952
        // ... bootstrap store bytes should be null after initial sync when every changeset integrated immediately
1953
        if (m_bootstrap_store_bytes)
3,242✔
1954
            p.downloaded += *m_bootstrap_store_bytes;
306✔
1955

1956
        // FIXME for flx with download estimate these bytes are not known
1957
        // provide some sensible value for non-streaming version of object-store callbacks
1958
        // until these field are completely removed from the api after pbs deprecation
1959
        p.downloadable = p.downloaded;
3,242✔
1960
        if (0.01 <= download_estimate && download_estimate <= 0.99)
3,242✔
1961
            if (p.downloaded > p.final_downloaded)
318✔
1962
                p.downloadable =
318✔
1963
                    p.final_downloaded + uint64_t((p.downloaded - p.final_downloaded) / download_estimate);
318✔
1964
    }
3,242✔
1965
    else {
5,788✔
1966
        if (!is_completed || !is_download)
5,788✔
1967
            download_estimate = calculate_progress(p.downloaded, p.downloadable, p.final_downloaded);
3,744✔
1968
    }
5,788✔
1969

1970
    if (is_completed) {
9,030✔
1971
        if (is_download)
6,102✔
1972
            p.final_downloaded = p.downloaded;
3,192✔
1973
        else
2,910✔
1974
            p.final_uploaded = p.uploaded;
2,910✔
1975
    }
6,102✔
1976

1977
    m_reported_progress = p;
9,030✔
1978

1979
    if (m_sess->logger.would_log(Logger::Level::debug)) {
9,030✔
1980
        auto to_str = [](double d) {
17,528✔
1981
            std::ostringstream ss;
17,528✔
1982
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1983
            ss << std::fixed << std::setprecision(4) << d;
17,528✔
1984
            return ss.str();
17,528✔
1985
        };
17,528✔
1986
        m_sess->logger.debug("Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
8,764✔
1987
                             "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7",
8,764✔
1988
                             p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
8,764✔
1989
                             to_str(upload_estimate), p.snapshot);
8,764✔
1990
    }
8,764✔
1991

1992
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
9,030✔
1993
                       upload_estimate);
9,030✔
1994
}
9,030✔
1995

1996
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1997
{
60✔
1998
    if (!m_sess) {
60✔
1999
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
2000
    }
×
2001

2002
    return m_sess->send_test_command(std::move(body));
60✔
2003
}
60✔
2004

2005
void SessionWrapper::handle_pending_client_reset_acknowledgement()
2006
{
306✔
2007
    REALM_ASSERT(!m_finalized);
306✔
2008

2009
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
306✔
2010
    REALM_ASSERT(pending_reset);
306✔
2011
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
306✔
2012
                        pending_reset->time);
306✔
2013
    async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
306✔
2014
        if (status == ErrorCodes::OperationAborted) {
306✔
2015
            return;
138✔
2016
        }
138✔
2017
        auto& logger = self->m_sess->logger;
168✔
2018
        if (!status.is_ok()) {
168✔
2019
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
2020
            return;
×
2021
        }
×
2022

2023
        auto wt = self->m_db->start_write();
168✔
2024
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
168✔
2025
        if (!cur_pending_reset) {
168✔
2026
            logger.debug(
×
2027
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
2028
                pending_reset.type, pending_reset.time);
×
2029
            return;
×
2030
        }
×
2031
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
168✔
2032
            logger.debug(
×
2033
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
2034
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
2035
        }
×
2036
        else {
168✔
2037
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
168✔
2038
                         "Removing cycle detection tracker.",
168✔
2039
                         pending_reset.type, pending_reset.time);
168✔
2040
        }
168✔
2041
        _impl::client_reset::remove_pending_client_resets(*wt);
168✔
2042
        wt->commit();
168✔
2043
    });
168✔
2044
}
306✔
2045

2046
void SessionWrapper::update_subscription_version_info()
2047
{
10,646✔
2048
    if (!m_flx_subscription_store)
10,646✔
2049
        return;
9,004✔
2050
    auto versions_info = m_flx_subscription_store->get_version_info();
1,642✔
2051
    m_flx_active_version = versions_info.active;
1,642✔
2052
    m_flx_pending_mark_version = versions_info.pending_mark;
1,642✔
2053
}
1,642✔
2054

2055
std::string SessionWrapper::get_appservices_connection_id()
2056
{
72✔
2057
    auto pf = util::make_promise_future<std::string>();
72✔
2058
    REALM_ASSERT(m_initiated);
72✔
2059

2060
    util::bind_ptr<SessionWrapper> self(this);
72✔
2061
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
2062
        if (!status.is_ok()) {
72✔
2063
            promise.set_error(status);
×
2064
            return;
×
2065
        }
×
2066

2067
        if (!self->m_sess) {
72✔
2068
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
2069
            return;
×
2070
        }
×
2071

2072
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
2073
    });
72✔
2074

2075
    return pf.future.get();
72✔
2076
}
72✔
2077

2078
// ################ ClientImpl::Connection ################
2079

2080
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
2081
                                   const std::string& authorization_header_name,
2082
                                   const std::map<std::string, std::string>& custom_http_headers,
2083
                                   bool verify_servers_ssl_certificate,
2084
                                   Optional<std::string> ssl_trust_certificate_path,
2085
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
2086
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
2087
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1,322✔
2088
                                                      client.logger_ptr)} // Throws
1,322✔
2089
    , logger{*logger_ptr}
1,322✔
2090
    , m_client{client}
1,322✔
2091
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,322✔
2092
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,322✔
2093
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,322✔
2094
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,322✔
2095
    , m_reconnect_info{reconnect_info}
1,322✔
2096
    , m_session_history{}
1,322✔
2097
    , m_ident{ident}
1,322✔
2098
    , m_server_endpoint{std::move(endpoint)}
1,322✔
2099
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,322✔
2100
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,322✔
2101
{
2,780✔
2102
    m_on_idle = m_client.create_trigger([this](Status status) {
2,780✔
2103
        if (status == ErrorCodes::OperationAborted)
2,774✔
2104
            return;
×
2105
        else if (!status.is_ok())
2,774✔
2106
            throw Exception(status);
×
2107

2108
        REALM_ASSERT(m_activated);
2,774✔
2109
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,774✔
2110
            on_idle(); // Throws
2,766✔
2111
            // Connection object may be destroyed now.
2112
        }
2,766✔
2113
    });
2,774✔
2114
}
2,780✔
2115

2116
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
2117
{
12✔
2118
    return m_ident;
12✔
2119
}
12✔
2120

2121

2122
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
2123
{
2,766✔
2124
    return m_server_endpoint;
2,766✔
2125
}
2,766✔
2126

2127
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
2128
                                                        const std::string& signed_access_token)
2129
{
10,576✔
2130
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,576✔
2131
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,576✔
2132
}
10,576✔
2133

2134

2135
void ClientImpl::Connection::resume_active_sessions()
2136
{
1,948✔
2137
    auto handler = [=](ClientImpl::Session& sess) {
3,892✔
2138
        sess.cancel_resumption_delay(); // Throws
3,892✔
2139
    };
3,892✔
2140
    for_each_active_session(std::move(handler)); // Throws
1,948✔
2141
}
1,948✔
2142

2143
void ClientImpl::Connection::on_idle()
2144
{
2,764✔
2145
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,764✔
2146
    ClientImpl& client = get_client();
2,764✔
2147
    client.remove_connection(*this);
2,764✔
2148
    // NOTE: This connection object is now destroyed!
2149
}
2,764✔
2150

2151

2152
std::string ClientImpl::Connection::get_http_request_path() const
2153
{
3,766✔
2154
    using namespace std::string_view_literals;
3,766✔
2155
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,766✔
2156

2157
    std::string path;
3,766✔
2158
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,766✔
2159
    path += m_http_request_path_prefix;
3,766✔
2160
    path += param;
3,766✔
2161
    path += m_signed_access_token;
3,766✔
2162

2163
    return path;
3,766✔
2164
}
3,766✔
2165

2166

2167
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
2168
{
2,780✔
2169
    std::ostringstream out;
2,780✔
2170
    out.imbue(std::locale::classic());
2,780✔
2171
    out << "Connection[" << ident << "]: "; // Throws
2,780✔
2172
    return out.str();                       // Throws
2,780✔
2173
}
2,780✔
2174

2175

2176
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
2177
                                                            util::Optional<SessionErrorInfo> error_info)
2178
{
11,088✔
2179
    if (m_force_closed) {
11,088✔
2180
        return;
2,396✔
2181
    }
2,396✔
2182
    auto handler = [=](ClientImpl::Session& sess) {
11,810✔
2183
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
11,810✔
2184
        sess_2.on_connection_state_changed(state, error_info); // Throws
11,810✔
2185
    };
11,810✔
2186
    for_each_active_session(std::move(handler)); // Throws
8,692✔
2187
}
8,692✔
2188

2189

2190
Client::Client(Config config)
2191
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,794✔
2192
{
9,726✔
2193
}
9,726✔
2194

2195

2196
Client::Client(Client&& client) noexcept
2197
    : m_impl{std::move(client.m_impl)}
2198
{
×
2199
}
×
2200

2201

2202
Client::~Client() noexcept {}
9,726✔
2203

2204

2205
void Client::shutdown() noexcept
2206
{
9,804✔
2207
    m_impl->shutdown();
9,804✔
2208
}
9,804✔
2209

2210
void Client::shutdown_and_wait()
2211
{
772✔
2212
    m_impl->shutdown_and_wait();
772✔
2213
}
772✔
2214

2215
void Client::cancel_reconnect_delay()
2216
{
1,952✔
2217
    m_impl->cancel_reconnect_delay();
1,952✔
2218
}
1,952✔
2219

2220
void Client::voluntary_disconnect_all_connections()
2221
{
12✔
2222
    m_impl->voluntary_disconnect_all_connections();
12✔
2223
}
12✔
2224

2225
bool Client::wait_for_session_terminations_or_client_stopped()
2226
{
9,288✔
2227
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,288✔
2228
}
9,288✔
2229

2230
util::Future<void> Client::notify_session_terminated()
2231
{
56✔
2232
    return m_impl->notify_session_terminated();
56✔
2233
}
56✔
2234

2235
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2236
                                  port_type& port, std::string& path) const
2237
{
4,072✔
2238
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
4,072✔
2239
}
4,072✔
2240

2241

2242
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2243
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2244
{
11,696✔
2245
    util::bind_ptr<SessionWrapper> sess;
11,696✔
2246
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
11,696✔
2247
                                  std::move(config)}); // Throws
11,696✔
2248
    // The reference count passed back to the application is implicitly
2249
    // owned by a naked pointer. This is done to avoid exposing
2250
    // implementation details through the header file (that is, through the
2251
    // Session object).
2252
    m_impl = sess.release();
11,696✔
2253
}
11,696✔
2254

2255

2256
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2257
{
4,148✔
2258
    m_impl->set_progress_handler(std::move(handler)); // Throws
4,148✔
2259
}
4,148✔
2260

2261

2262
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2263
{
11,800✔
2264
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,800✔
2265
}
11,800✔
2266

2267

2268
void Session::bind()
2269
{
10,542✔
2270
    m_impl->initiate(); // Throws
10,542✔
2271
}
10,542✔
2272

2273

2274
void Session::nonsync_transact_notify(version_type new_version)
2275
{
17,996✔
2276
    m_impl->on_commit(new_version); // Throws
17,996✔
2277
}
17,996✔
2278

2279

2280
void Session::cancel_reconnect_delay()
2281
{
20✔
2282
    m_impl->cancel_reconnect_delay(); // Throws
20✔
2283
}
20✔
2284

2285

2286
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2287
{
5,052✔
2288
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
5,052✔
2289
}
5,052✔
2290

2291

2292
bool Session::wait_for_upload_complete_or_client_stopped()
2293
{
13,024✔
2294
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
13,024✔
2295
}
13,024✔
2296

2297

2298
bool Session::wait_for_download_complete_or_client_stopped()
2299
{
10,120✔
2300
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,120✔
2301
}
10,120✔
2302

2303

2304
void Session::refresh(std::string_view signed_access_token)
2305
{
216✔
2306
    m_impl->refresh(signed_access_token); // Throws
216✔
2307
}
216✔
2308

2309

2310
void Session::abandon() noexcept
2311
{
11,696✔
2312
    REALM_ASSERT(m_impl);
11,696✔
2313
    // Reabsorb the ownership assigned to the applications naked pointer by
2314
    // Session constructor
2315
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
11,696✔
2316
    SessionWrapper::abandon(std::move(wrapper));
11,696✔
2317
}
11,696✔
2318

2319
util::Future<std::string> Session::send_test_command(std::string body)
2320
{
60✔
2321
    return m_impl->send_test_command(std::move(body));
60✔
2322
}
60✔
2323

2324
std::string Session::get_appservices_connection_id()
2325
{
72✔
2326
    return m_impl->get_appservices_connection_id();
72✔
2327
}
72✔
2328

2329
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2330
{
×
2331
    switch (proxyType) {
×
2332
        case ProxyConfig::Type::HTTP:
×
2333
            return os << "HTTP";
×
2334
        case ProxyConfig::Type::HTTPS:
×
2335
            return os << "HTTPS";
×
2336
    }
×
2337
    REALM_TERMINATE("Invalid Proxy Type object.");
2338
}
×
2339

2340
} // namespace sync
2341
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc